""" Script per testare l'invio di un allarme via MQTT con nuova struttura topic Topic pattern: terrain/{cliente_id}/{sito_id}/alarms Questo script usa la nuova architettura MQTT migliorata dove: - cliente_id e sito_id sono nel topic (non nel payload) - Supporto per diversi tipi di messaggi (alarms, telemetry, status) - QoS 1 per garantire delivery """ import json import time from datetime import datetime, timezone import paho.mqtt.client as mqtt # Configurazione (modifica secondo il tuo setup) MQTT_BROKER = "94.177.199.207" MQTT_PORT = 1883 # ID dal database (verifica con: SELECT id, cliente_id, nome FROM siti;) CLIENTE_ID = 1 # Cliente ID del sito SITO_ID = 1 # ID del sito "Ponte Morandi" # Topic con nuova struttura: terrain/{cliente_id}/{sito_id}/alarms MQTT_TOPIC = f"terrain/{CLIENTE_ID}/{SITO_ID}/alarms" # Allarme di test (sito_id e cliente_id vengono estratti dal topic) alarm_data = { "tipo": "movimento_terreno", "severita": "critical", # critical, warning, info "titolo": "๐Ÿšจ TEST: Movimento terreno critico", "descrizione": "Allarme di test per verificare notifiche push - Sistema funzionante!", "messaggio": "Rilevato movimento anomalo superiore alla soglia di sicurezza", "valore_rilevato": 25.5, "valore_soglia": 10.0, "unita_misura": "mm", "timestamp": datetime.now(timezone.utc).isoformat(), "dati_sensori": { "sensore_inclinometro_1": 25.5, "sensore_inclinometro_2": 18.3, "temperatura": 15.5, "umidita": 45.0 } } def on_connect(client, userdata, flags, rc): """Callback quando il client si connette""" if rc == 0: print("โœ“ Connesso al broker MQTT") else: print(f"โœ— Connessione fallita con codice: {rc}") def on_publish(client, userdata, mid): """Callback quando il messaggio viene pubblicato""" print(f"โœ“ Messaggio pubblicato (message_id: {mid})") def send_test_alarm(): """Invia un allarme di test""" print("=" * 70) print("TEST INVIO ALLARME VIA MQTT - NUOVA ARCHITETTURA") print("=" * 70) # Crea client MQTT client = mqtt.Client(client_id="test_alarm_sender_v2") client.on_connect = on_connect client.on_publish = on_publish try: # Connetti al broker print(f"\nConnessione a {MQTT_BROKER}:{MQTT_PORT}...") client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60) # Start loop client.loop_start() time.sleep(1) # Aspetta la connessione # Pubblica l'allarme print(f"\nInvio allarme:") print(f" Topic: {MQTT_TOPIC}") print(f" Cliente ID: {CLIENTE_ID}") print(f" Sito ID: {SITO_ID}") print("\nPayload:") print(json.dumps(alarm_data, indent=2, ensure_ascii=False)) print() result = client.publish( MQTT_TOPIC, json.dumps(alarm_data), qos=1 # QoS 1 per garantire delivery ) # Aspetta che il messaggio sia pubblicato result.wait_for_publish() print("\nโœ“ Allarme inviato con successo!") print("\nControlla:") print(" 1. I log del backend per vedere se รจ stato ricevuto") print(" 2. Il database per verificare che l'allarme sia stato salvato") print(" 3. L'app mobile per verificare la ricezione della notifica push") print("\nComandi utili:") print(" - Logs backend: tail -f logs/app.log") print(" - DB query: SELECT * FROM allarmi ORDER BY created_at DESC LIMIT 5;") # Cleanup time.sleep(1) client.loop_stop() client.disconnect() except Exception as e: print(f"\nโœ— Errore: {e}") client.loop_stop() raise print("\n" + "=" * 70) if __name__ == "__main__": send_test_alarm()