import logging
from typing import Any, Callable, Dict, List
from .models.connectivity import ConnectivityStatus
from .models.events import EventChannel, EventEmitter
from .models.operation_mode import OperationMode, OperationModeStatus
from .models.operation_mode_controller import OperationModeController
from .models.zone_parser import ZoneConfigurationParser
from .models.zone_types import (
AbstractZone,
FlowZone,
LightingZone,
TemperatureControlZone,
ZoneType,
)
from .transporters import AbstractTransporter
from .transporters.exceptions import ConfigurationTimeoutError
from .api import GeckoApiClient
# Make key classes available at package level
__all__ = [
"GeckoIotClient",
"AbstractTransporter",
"ConfigurationTimeoutError",
"AbstractZone",
"ZoneType",
"TemperatureControlZone",
"FlowZone",
"LightingZone",
"EventChannel",
"EventEmitter",
"ConnectivityStatus",
"OperationMode",
"OperationModeStatus",
"OperationModeController",
"GeckoApiClient",
]
# Get version from setuptools-scm
try:
from importlib.metadata import version
__version__ = version("gecko-iot-client")
except Exception:
# Fallback for development/testing
__version__ = "0.0.0.dev0"
[docs]
class GeckoIotClient:
"""
Main client for interacting with Gecko IoT devices.
The GeckoIotClient provides a high-level interface for connecting to and controlling
Gecko IoT devices through various transport protocols (e.g., MQTT via AWS IoT).
It handles device configuration, state management, and zone control.
Args:
idd: Unique identifier for the device/client
transporter: Transport layer implementation for communication
config_timeout: Maximum time to wait for configuration loading in seconds (default: 30.0)
Example:
>>> from gecko_iot_client import GeckoIotClient
>>> from gecko_iot_client.transporters.mqtt import MqttTransporter
>>>
>>> transporter = MqttTransporter(
... endpoint="your-endpoint.amazonaws.com",
... certificate_path="cert.pem.crt",
... private_key_path="private.pem.key",
... ca_file_path="AmazonRootCA1.pem"
... )
>>>
>>> with GeckoIotClient("device-123", transporter) as client:
... zones = client.get_zones()
... print(f"Found {len(zones)} zone types")
"""
[docs]
def __init__(
self, idd: str, transporter: AbstractTransporter, config_timeout: float = 5.0
):
self.id = idd
self.transporter = transporter
self.config_timeout = config_timeout
self._zones: Dict[ZoneType, List[AbstractZone]] = {}
self._zone_parser = ZoneConfigurationParser()
self._logger = logging.getLogger(self.__class__.__name__)
self._configuration = None
self._state = None
# Event system
self._event_emitter = EventEmitter()
self._connectivity_status = ConnectivityStatus()
self._operation_mode_controller = OperationModeController()
# State handlers registration for automatic processing
self._state_handlers = [
{
"status_obj": self._connectivity_status,
"event_channel": EventChannel.CONNECTIVITY_UPDATE,
"log_formatter": lambda status: (
f"Device connectivity changed: gateway={status.gateway_status}, vessel={status.vessel_status}"
),
},
{
"status_obj": self._operation_mode_controller,
"event_channel": EventChannel.OPERATION_MODE_UPDATE,
"log_formatter": lambda controller: (
f"Operation mode changed to: {controller.mode_name} ({controller.operation_mode.value})"
),
},
]
# Set up connectivity monitoring if supported by transporter
if hasattr(self.transporter, "on_connectivity_change"):
self.transporter.on_connectivity_change(
self._on_transporter_connectivity_change
)
[docs]
def connect(self):
"""
Establish connection to the device and initialize configuration.
This method sets up event handlers for configuration and state changes,
connects to the transport layer, and automatically loads the device
configuration.
Raises:
Exception: If connection or configuration loading fails
"""
self.transporter.on_configuration_loaded(self._on_configuration_loaded)
self.transporter.on_state_change(self._on_state_change)
self.transporter.on_state_loaded(self._on_state_loaded)
self.transporter.connect()
self.transporter.load_configuration(timeout=self.config_timeout)
[docs]
def __enter__(self):
"""
Enter the context manager.
Automatically calls connect() when entering the context.
Returns:
GeckoIotClient: Self for use in with statement
"""
self._logger.info("Entering GeckoIotClient context manager...")
self.connect()
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit the context manager.
Automatically calls disconnect() when exiting the context.
Args:
exc_type: Exception type if an exception occurred
exc_val: Exception value if an exception occurred
exc_tb: Exception traceback if an exception occurred
"""
self.disconnect()
@property
def is_connected(self) -> bool:
"""
Check if the client is currently connected.
This checks both MQTT connectivity and device shadow connectivity status.
Returns:
bool: True if fully connected (MQTT + gateway + vessel), False otherwise
"""
return self._connectivity_status.is_fully_connected
[docs]
def disconnect(self):
"""
Disconnect from the device and clean up resources.
This method properly closes the transport connection and performs
any necessary cleanup.
"""
self.transporter.disconnect()
def _on_transporter_connectivity_change(self, is_connected: bool):
"""Handle transporter connectivity changes (transport-agnostic)."""
self._connectivity_status.transport_connected = is_connected
self._logger.info(f"Transporter connectivity changed: {is_connected}")
# Emit connectivity update event
self._event_emitter.emit(
EventChannel.CONNECTIVITY_UPDATE, self._connectivity_status
)
def _process_state_updates(self, state_data: Dict[str, Any]) -> None:
"""Process all registered state handlers for the given state data."""
for handler in self._state_handlers:
status_obj = handler["status_obj"]
event_channel = handler["event_channel"]
log_formatter = handler["log_formatter"]
if status_obj.update_from_state_data(state_data):
self._logger.info(log_formatter(status_obj))
self._event_emitter.emit(event_channel, status_obj)
# Apply state updates to zones
if self._zones:
try:
self._zone_parser.apply_state_to_zones(self._zones, state_data)
self._logger.info("State updates applied to zones")
self._notify_zone_updates()
except Exception as e:
self._logger.error(f"Failed to apply state updates to zones: {e}")
@property
def connectivity_status(self) -> ConnectivityStatus:
"""
Get current connectivity status.
Returns:
ConnectivityStatus: Current connectivity including MQTT, gateway, and vessel status
"""
return self._connectivity_status
@property
def operation_mode_controller(self) -> OperationModeController:
"""
Get the operation mode controller for read/write operations.
Returns:
OperationModeController: Controller for operation mode functionality
"""
return self._operation_mode_controller
@property
def operation_mode_status(self) -> OperationModeController:
"""
Get current operation mode status (legacy property - use operation_mode_controller instead).
Returns:
OperationModeController: Current operation mode controller
"""
return self._operation_mode_controller
[docs]
def on(self, channel: EventChannel, callback: Callable) -> None:
"""
Register a callback for a specific event channel.
Args:
channel: The event channel to listen to
callback: Function to call when the event occurs
"""
self._event_emitter.on(channel, callback)
[docs]
def off(self, channel: EventChannel, callback: Callable) -> None:
"""
Unregister a callback from a specific event channel.
Args:
channel: The event channel to stop listening to
callback: The callback function to remove
"""
self._event_emitter.off(channel, callback)
def _on_configuration_loaded(self, configuration):
"""Handle configuration loading and zone parsing."""
self._logger.info("Configuration loaded.")
self._configuration = configuration
zones_config = configuration.get("zones", {})
self._logger.debug(f"Raw configuration: {configuration}")
try:
self._zones = self._zone_parser.parse_zones_configuration(zones_config)
# Setup zone control after zones are parsed
self.setup_zone_control()
except Exception as e:
self._logger.error(f"Error during zone parsing: {e}")
self._zones = {} # Reset to empty state on failure
# Automatically load state after configuration is processed
try:
self._logger.info("Automatically loading state after configuration...")
self.transporter.load_state()
except Exception as e:
self._logger.error(f"Failed to load state: {e}")
def _on_state_change(self, new_state):
"""Handle state changes."""
self._logger.debug(f"State changed to: {new_state}")
# Process all state updates using unified handler (includes zone updates)
self._process_state_updates(new_state)
def _on_state_loaded(self, state_data):
"""Handle state loading from AWS IoT Device Shadow."""
self._logger.info("State loaded from AWS IoT Device Shadow.")
self._state = state_data
self._logger.debug(f"State data: {state_data}")
# Process all state updates using unified handler (includes zone updates)
self._process_state_updates(state_data)
# Ensure zone control is set up after state is applied
if self._zones:
self.setup_zone_control()
[docs]
def get_zones(self) -> Dict[ZoneType, List[AbstractZone]]:
"""
Return the parsed zones organized by type.
Returns:
Dict mapping zone types to lists of zones of that type.
Returns a copy to prevent external modification.
"""
return self._zones.copy()
[docs]
def get_zones_by_type(self, zone_type: ZoneType) -> List[AbstractZone]:
"""
Return all zones of a specific type.
Args:
zone_type: The type of zones to retrieve
Returns:
List of zones matching the specified type
"""
return self._zones.get(zone_type, [])
[docs]
def get_zone_by_id_and_type(
self, zone_type: ZoneType, zone_id: str
) -> AbstractZone:
"""
Find and return a zone by its type and ID.
Args:
zone_type: The type of the zone
zone_id: The zone ID to search for
Returns:
The zone with matching type and ID
"""
zone = next(
(z for z in self.get_zones_by_type(zone_type) if z.id == zone_id), None
)
if not zone:
raise ValueError(f"No zone found with type {zone_type} and ID: {zone_id}")
return zone
[docs]
def on_zone_update(
self, callback: Callable[[Dict[ZoneType, List[AbstractZone]]], None]
):
"""
Register callback for zone updates (legacy method).
This method is maintained for backward compatibility.
New code should use: client.on(EventChannel.ZONE_UPDATE, callback)
Args:
callback: Function that takes a dictionary of zones organized by type
"""
# Use the new event system internally
self.on(EventChannel.ZONE_UPDATE, callback)
def _notify_zone_updates(self):
"""Notify all registered callbacks that zones were updated."""
self._logger.info("Notifying zone update callbacks")
# Use the new event system to notify all callbacks
self._event_emitter.emit(EventChannel.ZONE_UPDATE, self._zones.copy())
[docs]
def register_zone_callbacks(self):
"""
Register callbacks for zone monitoring (legacy method).
This method is maintained for backward compatibility.
For new code, use the event system: client.on(EventChannel.ZONE_UPDATE, callback)
"""
# Set up basic zone monitoring
for zone_type, zone_list in self._zones.items():
for zone in zone_list:
self._logger.debug(f"Zone ready for monitoring: {zone.name}")
# Register a zone update callback using the new event system for monitoring
def zone_update_handler(zones: Dict[ZoneType, List[AbstractZone]]):
self._logger.debug(
f"Zone update received: {len(zones)} zone types available"
)
self.on(EventChannel.ZONE_UPDATE, zone_update_handler)
[docs]
def setup_zone_control(self) -> None:
"""
Set up zone and feature control functionality for publishing desired state updates.
"""
def _publish_if_connected(publish_func, error_context: str, *args, **kwargs):
"""Helper to publish only if connected."""
if self.is_connected:
try:
publish_func(*args, **kwargs)
self._logger.info(f"✅ Published desired state for {error_context}")
except Exception as e:
self._logger.error(
f"❌ Failed to publish desired state for {error_context}: {e}"
)
else:
self._logger.error(
f"Failed to publish change to {error_context}, not connected."
)
# Zone control callback
def zone_callback(
zone_type: str, zone_id: str, updates: Dict[str, Any]
) -> None:
# Build the zone structure and use the generic transport method
desired_state = {"zones": {zone_type: {zone_id: updates}}}
_publish_if_connected(
self.transporter.publish_desired_state, f"zone {zone_id}", desired_state
)
# Feature control callback (for operation mode, etc.)
def feature_callback(feature_name: str, updates: Dict[str, Any]) -> None:
# Build the feature structure and use the generic transport method
desired_state = {"features": updates}
_publish_if_connected(
self.transporter.publish_desired_state, feature_name, desired_state
)
# Set callbacks for zones
for zone_type, zone_list in self._zones.items():
for zone in zone_list:
zone.set_publish_callback(zone_callback)
# Set callback for operation mode
self._operation_mode_controller.set_publish_callback(feature_callback)
self._logger.info("Zone and feature control setup completed")
[docs]
def list_zones(self) -> List[Dict[str, Any]]:
"""
Get a simple list of all zones with basic info.
Returns:
List of dictionaries with zone information
"""
zones_info = []
for zone_type, zone_list in self._zones.items():
for zone in zone_list:
zone_info = {
"id": zone.id,
"name": zone.name,
"type": zone_type.value,
"has_control": zone._publish_callback is not None,
}
zones_info.append(zone_info)
return zones_info