app backend prima

This commit is contained in:
2025-10-20 19:10:08 +02:00
commit 438255d27b
42 changed files with 4622 additions and 0 deletions

0
app/mqtt/__init__.py Normal file
View File

139
app/mqtt/client.py Normal file
View File

@@ -0,0 +1,139 @@
import json
import logging
from typing import Callable, Optional
import paho.mqtt.client as mqtt
from app.core.config import settings
logger = logging.getLogger(__name__)
class MQTTClient:
"""Client MQTT per ricevere allarmi dal sistema di monitoraggio"""
def __init__(self, message_handler: Optional[Callable] = None):
self.client = mqtt.Client(client_id="terrain_monitor_backend")
self.message_handler = message_handler
self._setup_callbacks()
def _setup_callbacks(self):
"""Configura i callback MQTT"""
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
def _on_connect(self, client, userdata, flags, rc):
"""Callback chiamato quando il client si connette al broker"""
if rc == 0:
logger.info("Connesso al broker MQTT")
# Sottoscrizione ai topic (pattern: terrain/{cliente_id}/{sito_id}/alarms)
client.subscribe(settings.MQTT_TOPIC_ALARMS, qos=1)
logger.info(f"Sottoscritto al topic allarmi: {settings.MQTT_TOPIC_ALARMS}")
# Opzionale: sottoscrivi anche telemetry e status se necessario
# client.subscribe(settings.MQTT_TOPIC_TELEMETRY, qos=0)
# client.subscribe(settings.MQTT_TOPIC_STATUS, qos=0)
else:
logger.error(f"Connessione fallita con codice: {rc}")
def _on_disconnect(self, client, userdata, rc):
"""Callback chiamato quando il client si disconnette"""
if rc != 0:
logger.warning(f"Disconnessione inaspettata. Codice: {rc}")
else:
logger.info("Disconnesso dal broker MQTT")
def _on_message(self, client, userdata, msg):
"""
Callback chiamato quando arriva un messaggio
Topic pattern: terrain/{cliente_id}/{sito_id}/alarms
Formato messaggio atteso (JSON):
{
"tipo": "movimento_terreno",
"severita": "critical",
"titolo": "Movimento terreno rilevato",
"descrizione": "Rilevato movimento anomalo...",
"valore_rilevato": 12.5,
"valore_soglia": 10.0,
"unita_misura": "mm",
"timestamp": "2025-10-18T10:30:00Z",
"dati_sensori": {
"sensore_1": 12.5,
"sensore_2": 8.3
}
}
Nota: sito_id e cliente_id vengono estratti dal topic, non dal payload
"""
try:
logger.info(f"Messaggio ricevuto sul topic {msg.topic}")
# Parse topic per estrarre cliente_id e sito_id
# Formato: terrain/{cliente_id}/{sito_id}/alarms
topic_parts = msg.topic.split('/')
if len(topic_parts) >= 4:
cliente_id_str = topic_parts[1]
sito_id_str = topic_parts[2]
# Decodifica e parse payload
payload = json.loads(msg.payload.decode())
# Aggiungi informazioni dal topic al payload
payload['sito_id'] = int(sito_id_str)
payload['cliente_id'] = int(cliente_id_str)
logger.debug(f"Cliente: {cliente_id_str}, Sito: {sito_id_str}, Payload: {payload}")
if self.message_handler:
self.message_handler(payload)
else:
logger.warning("Nessun handler configurato per processare il messaggio")
else:
logger.error(f"Topic malformato: {msg.topic}. Atteso: terrain/{{cliente_id}}/{{sito_id}}/alarms")
except json.JSONDecodeError:
logger.error(f"Errore nel parsing del JSON: {msg.payload}")
except ValueError as e:
logger.error(f"Errore nella conversione di cliente_id o sito_id: {e}")
except Exception as e:
logger.error(f"Errore nel processamento del messaggio: {e}")
def connect(self):
"""Connette al broker MQTT"""
try:
if settings.MQTT_USERNAME and settings.MQTT_PASSWORD:
self.client.username_pw_set(
settings.MQTT_USERNAME,
settings.MQTT_PASSWORD
)
self.client.connect(
settings.MQTT_BROKER_HOST,
settings.MQTT_BROKER_PORT,
keepalive=60
)
logger.info(
f"Connessione a {settings.MQTT_BROKER_HOST}:"
f"{settings.MQTT_BROKER_PORT}"
)
except Exception as e:
logger.error(f"Errore nella connessione al broker MQTT: {e}")
raise
def start(self):
"""Avvia il loop MQTT in background"""
self.connect()
self.client.loop_start()
logger.info("Loop MQTT avviato")
def stop(self):
"""Ferma il loop MQTT"""
self.client.loop_stop()
self.client.disconnect()
logger.info("Loop MQTT fermato")
def set_message_handler(self, handler: Callable):
"""Imposta l'handler per i messaggi ricevuti"""
self.message_handler = handler

150
app/mqtt/handler.py Normal file
View File

@@ -0,0 +1,150 @@
import logging
from datetime import datetime, timezone
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models import Allarme, Sito, Utente
from app.services.firebase import firebase_service
logger = logging.getLogger(__name__)
class AlarmHandler:
"""Handler per processare gli allarmi ricevuti via MQTT e inviarli tramite FCM"""
def __init__(self):
self.db: Session = None
def _get_db(self) -> Session:
"""Ottiene una nuova sessione database"""
if self.db is None or not self.db.is_active:
self.db = SessionLocal()
return self.db
def handle_alarm(self, payload: dict):
"""
Processa un allarme ricevuto via MQTT
Steps:
1. Valida il payload
2. Salva l'allarme nel database
3. Recupera gli utenti del cliente associato al sito
4. Invia notifiche push tramite FCM
Args:
payload: Dizionario con i dati dell'allarme
"""
db = self._get_db()
try:
# 1. Validazione
sito_id = payload.get("sito_id")
if not sito_id:
logger.error("Payload senza sito_id")
return
# Verifica che il sito esista
sito = db.query(Sito).filter(Sito.id == sito_id).first()
if not sito:
logger.error(f"Sito {sito_id} non trovato")
return
# 2. Salva allarme nel database
timestamp_str = payload.get("timestamp")
timestamp = datetime.fromisoformat(
timestamp_str.replace("Z", "+00:00")
) if timestamp_str else datetime.now(timezone.utc)
allarme = Allarme(
sito_id=sito_id,
tipo=payload.get("tipo"),
severita=payload.get("severita"),
titolo=payload.get("titolo"),
descrizione=payload.get("descrizione"),
messaggio=payload.get("messaggio"),
valore_rilevato=payload.get("valore_rilevato"),
valore_soglia=payload.get("valore_soglia"),
unita_misura=payload.get("unita_misura"),
dati_sensori=payload.get("dati_sensori"),
timestamp_rilevamento=timestamp,
timestamp_notifica=datetime.now(timezone.utc),
)
db.add(allarme)
db.commit()
db.refresh(allarme)
logger.info(f"Allarme {allarme.id} salvato per sito {sito.nome}")
# 3. Recupera utenti del cliente
utenti = (
db.query(Utente)
.filter(
Utente.cliente_id == sito.cliente_id,
Utente.attivo == True,
Utente.fcm_token.isnot(None),
)
.all()
)
if not utenti:
logger.warning(
f"Nessun utente attivo con FCM token per cliente {sito.cliente_id}"
)
return
# 4. Invia notifiche push
self._send_notifications(allarme, sito, utenti)
except Exception as e:
logger.error(f"Errore nel processamento dell'allarme: {e}")
db.rollback()
finally:
db.close()
def _send_notifications(self, allarme: Allarme, sito: Sito, utenti: list[Utente]):
"""Invia notifiche push agli utenti"""
# Determina priorità basata sulla severità
priority = "high" if allarme.severita == "critical" else "normal"
# Prepara dati per la notifica
notification_data = {
"alarm_id": str(allarme.id),
"sito_id": str(sito.id),
"sito_nome": sito.nome,
"tipo": allarme.tipo,
"severita": allarme.severita,
"timestamp": allarme.timestamp_rilevamento.isoformat(),
}
# Prepara titolo e messaggio
title = f"{allarme.severita.upper()}: {sito.nome}"
body = allarme.titolo or allarme.descrizione or "Nuovo allarme rilevato"
# Raccogli tutti i token
tokens = [u.fcm_token for u in utenti if u.fcm_token]
if not tokens:
logger.warning("Nessun token FCM valido trovato")
return
logger.info(f"Invio notifica a {len(tokens)} dispositivi")
# Invia notifica multicast
result = firebase_service.send_multicast(
tokens=tokens,
title=title,
body=body,
data=notification_data,
priority=priority,
)
logger.info(
f"Notifiche inviate per allarme {allarme.id}: "
f"{result['success']} successi, {result['failure']} fallimenti"
)
# Singleton instance
alarm_handler = AlarmHandler()

110
app/mqtt/topics.py Normal file
View File

@@ -0,0 +1,110 @@
"""
Helper per costruire topic MQTT con la struttura standardizzata
Topic structure:
- terrain/{cliente_id}/{sito_id}/alarms - Allarmi critici/warning/info
- terrain/{cliente_id}/{sito_id}/telemetry - Dati sensori periodici
- terrain/{cliente_id}/{sito_id}/status - Stato gateway/sensori
"""
from typing import Literal
TopicType = Literal["alarms", "telemetry", "status"]
class MQTTTopics:
"""Helper per costruire topic MQTT standardizzati"""
BASE = "terrain"
@staticmethod
def build_topic(
cliente_id: int,
sito_id: int,
topic_type: TopicType = "alarms"
) -> str:
"""
Costruisce un topic MQTT standardizzato
Args:
cliente_id: ID del cliente
sito_id: ID del sito
topic_type: Tipo di topic (alarms, telemetry, status)
Returns:
Topic MQTT formattato
Examples:
>>> MQTTTopics.build_topic(5, 17, "alarms")
'terrain/5/17/alarms'
>>> MQTTTopics.build_topic(5, 17, "telemetry")
'terrain/5/17/telemetry'
"""
return f"{MQTTTopics.BASE}/{cliente_id}/{sito_id}/{topic_type}"
@staticmethod
def parse_topic(topic: str) -> dict:
"""
Parse un topic MQTT ed estrae le informazioni
Args:
topic: Topic MQTT da parsare
Returns:
Dict con cliente_id, sito_id, topic_type
Raises:
ValueError: Se il topic è malformato
Examples:
>>> MQTTTopics.parse_topic("terrain/5/17/alarms")
{'cliente_id': 5, 'sito_id': 17, 'topic_type': 'alarms'}
"""
parts = topic.split('/')
if len(parts) < 4:
raise ValueError(
f"Topic malformato: {topic}. "
f"Atteso: terrain/{{cliente_id}}/{{sito_id}}/{{type}}"
)
if parts[0] != MQTTTopics.BASE:
raise ValueError(
f"Topic base errato: {parts[0]}. "
f"Atteso: {MQTTTopics.BASE}"
)
try:
return {
'cliente_id': int(parts[1]),
'sito_id': int(parts[2]),
'topic_type': parts[3]
}
except (ValueError, IndexError) as e:
raise ValueError(
f"Errore nel parsing del topic {topic}: {e}"
)
@staticmethod
def get_subscription_pattern(topic_type: TopicType | None = None) -> str:
"""
Ottiene il pattern di sottoscrizione MQTT
Args:
topic_type: Se specificato, sottoscrive solo a quel tipo
Returns:
Pattern MQTT con wildcards
Examples:
>>> MQTTTopics.get_subscription_pattern()
'terrain/+/+/#'
>>> MQTTTopics.get_subscription_pattern("alarms")
'terrain/+/+/alarms'
"""
if topic_type:
return f"{MQTTTopics.BASE}/+/+/{topic_type}"
else:
# Sottoscrivi a tutti i tipi
return f"{MQTTTopics.BASE}/+/+/#"