149 lines
5.4 KiB
Python
149 lines
5.4 KiB
Python
import json
|
|
import logging
|
|
import socket
|
|
import time
|
|
from typing import Callable, Optional
|
|
import paho.mqtt.client as mqtt
|
|
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MQTTClient:
|
|
"""Client MQTT per ricevere allarmi dal sistema di monitoraggio"""
|
|
|
|
def __init__(self, message_handler: Optional[Callable] = None):
|
|
# Genera un client_id unico basato su hostname + timestamp
|
|
# Questo permette di avere più istanze del backend senza conflitti
|
|
hostname = socket.gethostname()
|
|
timestamp = int(time.time() * 1000) # milliseconds
|
|
client_id = f"terrain_backend_{hostname}_{timestamp}"
|
|
|
|
self.client = mqtt.Client(client_id=client_id)
|
|
self.message_handler = message_handler
|
|
logger.info(f"Client MQTT creato con ID: {client_id}")
|
|
self._setup_callbacks()
|
|
|
|
def _setup_callbacks(self):
|
|
"""Configura i callback MQTT"""
|
|
self.client.on_connect = self._on_connect
|
|
self.client.on_disconnect = self._on_disconnect
|
|
self.client.on_message = self._on_message
|
|
|
|
def _on_connect(self, client, userdata, flags, rc):
|
|
"""Callback chiamato quando il client si connette al broker"""
|
|
if rc == 0:
|
|
logger.info("Connesso al broker MQTT")
|
|
# Sottoscrizione ai topic (pattern: terrain/{cliente_id}/{sito_id}/alarms)
|
|
client.subscribe(settings.MQTT_TOPIC_ALARMS, qos=1)
|
|
logger.info(f"Sottoscritto al topic allarmi: {settings.MQTT_TOPIC_ALARMS}")
|
|
|
|
# Opzionale: sottoscrivi anche telemetry e status se necessario
|
|
# client.subscribe(settings.MQTT_TOPIC_TELEMETRY, qos=0)
|
|
# client.subscribe(settings.MQTT_TOPIC_STATUS, qos=0)
|
|
else:
|
|
logger.error(f"Connessione fallita con codice: {rc}")
|
|
|
|
def _on_disconnect(self, client, userdata, rc):
|
|
"""Callback chiamato quando il client si disconnette"""
|
|
if rc != 0:
|
|
logger.warning(f"Disconnessione inaspettata. Codice: {rc}")
|
|
else:
|
|
logger.info("Disconnesso dal broker MQTT")
|
|
|
|
def _on_message(self, client, userdata, msg):
|
|
"""
|
|
Callback chiamato quando arriva un messaggio
|
|
|
|
Topic pattern: terrain/{cliente_id}/{sito_id}/alarms
|
|
|
|
Formato messaggio atteso (JSON):
|
|
{
|
|
"tipo": "movimento_terreno",
|
|
"severita": "critical",
|
|
"titolo": "Movimento terreno rilevato",
|
|
"descrizione": "Rilevato movimento anomalo...",
|
|
"valore_rilevato": 12.5,
|
|
"valore_soglia": 10.0,
|
|
"unita_misura": "mm",
|
|
"timestamp": "2025-10-18T10:30:00Z",
|
|
"dati_sensori": {
|
|
"sensore_1": 12.5,
|
|
"sensore_2": 8.3
|
|
}
|
|
}
|
|
|
|
Nota: sito_id e cliente_id vengono estratti dal topic, non dal payload
|
|
"""
|
|
try:
|
|
logger.info(f"Messaggio ricevuto sul topic {msg.topic}")
|
|
|
|
# Parse topic per estrarre cliente_id e sito_id
|
|
# Formato: terrain/{cliente_id}/{sito_id}/alarms
|
|
topic_parts = msg.topic.split('/')
|
|
if len(topic_parts) >= 4:
|
|
cliente_id_str = topic_parts[1]
|
|
sito_id_str = topic_parts[2]
|
|
|
|
# Decodifica e parse payload
|
|
payload = json.loads(msg.payload.decode())
|
|
|
|
# Aggiungi informazioni dal topic al payload
|
|
payload['sito_id'] = int(sito_id_str)
|
|
payload['cliente_id'] = int(cliente_id_str)
|
|
|
|
logger.debug(f"Cliente: {cliente_id_str}, Sito: {sito_id_str}, Payload: {payload}")
|
|
|
|
if self.message_handler:
|
|
self.message_handler(payload)
|
|
else:
|
|
logger.warning("Nessun handler configurato per processare il messaggio")
|
|
else:
|
|
logger.error(f"Topic malformato: {msg.topic}. Atteso: terrain/{{cliente_id}}/{{sito_id}}/alarms")
|
|
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Errore nel parsing del JSON: {msg.payload}")
|
|
except ValueError as e:
|
|
logger.error(f"Errore nella conversione di cliente_id o sito_id: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Errore nel processamento del messaggio: {e}")
|
|
|
|
def connect(self):
|
|
"""Connette al broker MQTT"""
|
|
try:
|
|
if settings.MQTT_USERNAME and settings.MQTT_PASSWORD:
|
|
self.client.username_pw_set(
|
|
settings.MQTT_USERNAME,
|
|
settings.MQTT_PASSWORD
|
|
)
|
|
|
|
self.client.connect(
|
|
settings.MQTT_BROKER_HOST,
|
|
settings.MQTT_BROKER_PORT,
|
|
keepalive=60
|
|
)
|
|
logger.info(
|
|
f"Connessione a {settings.MQTT_BROKER_HOST}:"
|
|
f"{settings.MQTT_BROKER_PORT}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Errore nella connessione al broker MQTT: {e}")
|
|
raise
|
|
|
|
def start(self):
|
|
"""Avvia il loop MQTT in background"""
|
|
self.connect()
|
|
self.client.loop_start()
|
|
logger.info("Loop MQTT avviato")
|
|
|
|
def stop(self):
|
|
"""Ferma il loop MQTT"""
|
|
self.client.loop_stop()
|
|
self.client.disconnect()
|
|
logger.info("Loop MQTT fermato")
|
|
|
|
def set_message_handler(self, handler: Callable):
|
|
"""Imposta l'handler per i messaggi ricevuti"""
|
|
self.message_handler = handler
|