import json import logging import socket import time from typing import Callable, Optional import paho.mqtt.client as mqtt from app.core.config import settings logger = logging.getLogger(__name__) class MQTTClient: """Client MQTT per ricevere allarmi dal sistema di monitoraggio""" def __init__(self, message_handler: Optional[Callable] = None): # Genera un client_id unico basato su hostname + timestamp # Questo permette di avere più istanze del backend senza conflitti hostname = socket.gethostname() timestamp = int(time.time() * 1000) # milliseconds client_id = f"terrain_backend_{hostname}_{timestamp}" self.client = mqtt.Client(client_id=client_id) self.message_handler = message_handler logger.info(f"Client MQTT creato con ID: {client_id}") self._setup_callbacks() def _setup_callbacks(self): """Configura i callback MQTT""" self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect self.client.on_message = self._on_message def _on_connect(self, client, userdata, flags, rc): """Callback chiamato quando il client si connette al broker""" if rc == 0: logger.info("Connesso al broker MQTT") # Sottoscrizione ai topic (pattern: terrain/{cliente_id}/{sito_id}/alarms) client.subscribe(settings.MQTT_TOPIC_ALARMS, qos=1) logger.info(f"Sottoscritto al topic allarmi: {settings.MQTT_TOPIC_ALARMS}") # Opzionale: sottoscrivi anche telemetry e status se necessario # client.subscribe(settings.MQTT_TOPIC_TELEMETRY, qos=0) # client.subscribe(settings.MQTT_TOPIC_STATUS, qos=0) else: logger.error(f"Connessione fallita con codice: {rc}") def _on_disconnect(self, client, userdata, rc): """Callback chiamato quando il client si disconnette""" if rc != 0: logger.warning(f"Disconnessione inaspettata. Codice: {rc}") else: logger.info("Disconnesso dal broker MQTT") def _on_message(self, client, userdata, msg): """ Callback chiamato quando arriva un messaggio Topic pattern: terrain/{cliente_id}/{sito_id}/alarms Formato messaggio atteso (JSON): { "tipo": "movimento_terreno", "severita": "critical", "titolo": "Movimento terreno rilevato", "descrizione": "Rilevato movimento anomalo...", "valore_rilevato": 12.5, "valore_soglia": 10.0, "unita_misura": "mm", "timestamp": "2025-10-18T10:30:00Z", "dati_sensori": { "sensore_1": 12.5, "sensore_2": 8.3 } } Nota: sito_id e cliente_id vengono estratti dal topic, non dal payload """ try: logger.info(f"Messaggio ricevuto sul topic {msg.topic}") # Parse topic per estrarre cliente_id e sito_id # Formato: terrain/{cliente_id}/{sito_id}/alarms topic_parts = msg.topic.split('/') if len(topic_parts) >= 4: cliente_id_str = topic_parts[1] sito_id_str = topic_parts[2] # Decodifica e parse payload payload = json.loads(msg.payload.decode()) # Aggiungi informazioni dal topic al payload payload['sito_id'] = int(sito_id_str) payload['cliente_id'] = int(cliente_id_str) logger.debug(f"Cliente: {cliente_id_str}, Sito: {sito_id_str}, Payload: {payload}") if self.message_handler: self.message_handler(payload) else: logger.warning("Nessun handler configurato per processare il messaggio") else: logger.error(f"Topic malformato: {msg.topic}. Atteso: terrain/{{cliente_id}}/{{sito_id}}/alarms") except json.JSONDecodeError: logger.error(f"Errore nel parsing del JSON: {msg.payload}") except ValueError as e: logger.error(f"Errore nella conversione di cliente_id o sito_id: {e}") except Exception as e: logger.error(f"Errore nel processamento del messaggio: {e}") def connect(self): """Connette al broker MQTT""" try: if settings.MQTT_USERNAME and settings.MQTT_PASSWORD: self.client.username_pw_set( settings.MQTT_USERNAME, settings.MQTT_PASSWORD ) self.client.connect( settings.MQTT_BROKER_HOST, settings.MQTT_BROKER_PORT, keepalive=60 ) logger.info( f"Connessione a {settings.MQTT_BROKER_HOST}:" f"{settings.MQTT_BROKER_PORT}" ) except Exception as e: logger.error(f"Errore nella connessione al broker MQTT: {e}") raise def start(self): """Avvia il loop MQTT in background""" self.connect() self.client.loop_start() logger.info("Loop MQTT avviato") def stop(self): """Ferma il loop MQTT""" self.client.loop_stop() self.client.disconnect() logger.info("Loop MQTT fermato") def set_message_handler(self, handler: Callable): """Imposta l'handler per i messaggi ricevuti""" self.message_handler = handler