Source code for gecko_iot_client.transporters

from concurrent.futures import Future
from typing import Any, Dict

_NOT_IMPLEMENTED_MSG = "Subclasses must implement this method"


[docs] class AbstractTransporter: """ Abstract base class for transport layer implementations. This class defines the interface for communicating with Gecko IoT devices through various protocols (MQTT, HTTP, etc.). Concrete implementations handle the protocol-specific details while providing a unified interface for the client. """
[docs] def connect(self): """ Establish connection to the transport medium. Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def disconnect(self): """ Close connection and clean up resources. Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def on_state_change(self, callback): """ Register callback for state change notifications. Args: callback: Function to call when state changes occur Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def change_state(self, new_state): """ Request a state change on the device. Args: new_state: The new state to apply Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def load_configuration(self, timeout: float = 30.0): """ Load device configuration from the transport medium. Args: timeout: Maximum time to wait for configuration response in seconds Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def on_configuration_loaded(self, callback): """ Register callback for configuration load events. Args: callback: Function to call when configuration is loaded Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def load_state(self): """ Load current device state from the transport medium. Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def on_state_loaded(self, callback): """ Register callback for state load events. Args: callback: Function to call when state is loaded Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def on_connectivity_change(self, callback): """ Register callback for connectivity changes. Args: callback: Function to call when connectivity status changes. Callback should accept a boolean (True=connected, False=disconnected). Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def is_connected(self) -> bool: """ Check if the transport is currently connected. Returns: bool: True if connected, False otherwise Raises: NotImplementedError: Must be implemented by subclasses """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
# Generic desired state interface
[docs] def publish_desired_state(self, desired_state: Dict[str, Any]) -> Future: """ Publish desired state updates to the transport medium. Args: desired_state: Dictionary of desired state structure to publish Returns: Future that resolves when update is published Note: The actual transport implementation (MQTT, HTTP, etc.) handles the protocol-specific details like topics, payloads, field naming. This method is transport-agnostic and doesn't impose any business logic about zones, features, or other domain concepts. """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)
[docs] def publish_batch_desired_state( self, zone_updates: Dict[str, Dict[str, Dict[str, Any]]] ) -> Future: """ Publish desired state updates for multiple zones in a single operation. Args: zone_updates: Nested dict {zone_type: {zone_id: updates}} Returns: Future that resolves when batch update is published """ raise NotImplementedError(_NOT_IMPLEMENTED_MSG)