"""
Logging configuration for MQTT transporter.
"""
import logging
import logging.handlers
import os
import sys
from datetime import datetime
from typing import Optional
[docs]
class MqttTransporterLogger:
"""Custom logger configuration for MQTT transporter with structured logging."""
[docs]
def __init__(
self,
name: str = "gecko_iot_mqtt",
level: int = logging.INFO,
log_file: Optional[str] = None,
max_file_size: int = 10 * 1024 * 1024, # 10MB
backup_count: int = 5,
include_console: bool = True,
):
"""
Initialize the logger configuration.
Args:
name: Logger name
level: Logging level
log_file: Path to log file (optional)
max_file_size: Maximum size of log file before rotation
backup_count: Number of backup files to keep
include_console: Whether to include console logging
"""
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# Clear any existing handlers
self.logger.handlers.clear()
# Create formatter
formatter = self._create_formatter()
# Add console handler if requested
if include_console:
console_handler = self._create_console_handler(formatter)
self.logger.addHandler(console_handler)
# Add file handler if log file specified
if log_file:
file_handler = self._create_file_handler(
log_file, formatter, max_file_size, backup_count
)
self.logger.addHandler(file_handler)
# Prevent propagation to root logger to avoid duplicate messages
self.logger.propagate = False
def _create_formatter(self) -> logging.Formatter:
"""Create a structured log formatter."""
return logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def _create_console_handler(
self, formatter: logging.Formatter
) -> logging.StreamHandler:
"""Create console handler with appropriate formatting."""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
return handler
def _create_file_handler(
self,
log_file: str,
formatter: logging.Formatter,
max_size: int,
backup_count: int,
) -> logging.Handler:
"""Create rotating file handler."""
# Ensure log directory exists
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=max_size, backupCount=backup_count
)
handler.setFormatter(formatter)
return handler
[docs]
def get_logger(self) -> logging.Logger:
"""Get the configured logger instance."""
return self.logger
[docs]
def add_connection_context(self, client_id: str, broker_url: str):
"""Add connection context to all log messages."""
# Create a custom adapter that adds connection context
adapter = logging.LoggerAdapter(
self.logger, {"client_id": client_id, "broker_url": broker_url}
)
return adapter
[docs]
class ConnectionEventLogger:
"""Specialized logger for connection events with metrics tracking."""
[docs]
def __init__(self, base_logger: logging.Logger):
self.logger = base_logger
self.connection_attempts = 0
self.successful_connections = 0
self.failed_connections = 0
self.disconnections = 0
self.token_refreshes = 0
self.start_time = datetime.now()
[docs]
def log_connection_attempt(self, broker_url: str, client_id: str):
"""Log a connection attempt."""
self.connection_attempts += 1
self.logger.info(
f"Connection attempt #{self.connection_attempts} to {broker_url} with client ID: {client_id}"
)
[docs]
def log_connection_success(self, client_id: str, duration_ms: float):
"""Log successful connection."""
self.successful_connections += 1
self.logger.info(
f"Connection successful for {client_id} in {duration_ms:.2f}ms "
f"(Success rate: {self.get_success_rate():.1f}%)"
)
[docs]
def log_connection_failure(
self, client_id: str, error: Exception, duration_ms: float
):
"""Log connection failure."""
self.failed_connections += 1
self.logger.error(
f"Connection failed for {client_id} after {duration_ms:.2f}ms: {error} "
f"(Success rate: {self.get_success_rate():.1f}%)"
)
[docs]
def log_disconnection(self, client_id: str, reason: str):
"""Log disconnection event."""
self.disconnections += 1
self.logger.info(f"Disconnected {client_id}: {reason}")
[docs]
def log_token_refresh(self, success: bool, error: Optional[Exception] = None):
"""Log token refresh event."""
self.token_refreshes += 1
if success:
self.logger.info(f"Token refresh #{self.token_refreshes} successful")
else:
self.logger.error(f"Token refresh #{self.token_refreshes} failed: {error}")
[docs]
def log_reconnection_attempt(self, attempt: int, max_attempts: int, delay: float):
"""Log reconnection attempt."""
self.logger.info(
f"Reconnection attempt {attempt}/{max_attempts} in {delay:.1f}s"
)
[docs]
def get_success_rate(self) -> float:
"""Calculate connection success rate."""
if self.connection_attempts == 0:
return 0.0
return (self.successful_connections / self.connection_attempts) * 100
[docs]
def get_connection_stats(self) -> dict:
"""Get connection statistics."""
uptime = (datetime.now() - self.start_time).total_seconds()
return {
"uptime_seconds": uptime,
"connection_attempts": self.connection_attempts,
"successful_connections": self.successful_connections,
"failed_connections": self.failed_connections,
"disconnections": self.disconnections,
"token_refreshes": self.token_refreshes,
"success_rate": self.get_success_rate(),
}
[docs]
def log_periodic_stats(self):
"""Log periodic connection statistics."""
stats = self.get_connection_stats()
self.logger.info(
f"Connection Stats - Uptime: {stats['uptime_seconds']:.1f}s, "
f"Attempts: {stats['connection_attempts']}, "
f"Success Rate: {stats['success_rate']:.1f}%, "
f"Token Refreshes: {stats['token_refreshes']}"
)
[docs]
def setup_mqtt_logging(
log_level: str = "INFO", log_file: Optional[str] = None, enable_console: bool = True
) -> tuple[logging.Logger, ConnectionEventLogger]:
"""
Setup logging for MQTT transporter.
Args:
log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
log_file: Optional path to log file
enable_console: Whether to enable console logging
Returns:
Tuple of (main_logger, event_logger)
"""
# Convert string level to logging constant
level = getattr(logging, log_level.upper(), logging.INFO)
# Create main logger
mqtt_logger = MqttTransporterLogger(
name="gecko_iot_mqtt",
level=level,
log_file=log_file,
include_console=enable_console,
)
main_logger = mqtt_logger.get_logger()
# Create specialized event logger
event_logger = ConnectionEventLogger(main_logger)
# Log startup message
main_logger.info("MQTT Transporter logging initialized")
return main_logger, event_logger