111 lines
3.0 KiB
Python
111 lines
3.0 KiB
Python
"""
|
|
Helper per costruire topic MQTT con la struttura standardizzata
|
|
|
|
Topic structure:
|
|
- terrain/{cliente_id}/{sito_id}/alarms - Allarmi critici/warning/info
|
|
- terrain/{cliente_id}/{sito_id}/telemetry - Dati sensori periodici
|
|
- terrain/{cliente_id}/{sito_id}/status - Stato gateway/sensori
|
|
"""
|
|
from typing import Literal
|
|
|
|
TopicType = Literal["alarms", "telemetry", "status"]
|
|
|
|
|
|
class MQTTTopics:
|
|
"""Helper per costruire topic MQTT standardizzati"""
|
|
|
|
BASE = "terrain"
|
|
|
|
@staticmethod
|
|
def build_topic(
|
|
cliente_id: int,
|
|
sito_id: int,
|
|
topic_type: TopicType = "alarms"
|
|
) -> str:
|
|
"""
|
|
Costruisce un topic MQTT standardizzato
|
|
|
|
Args:
|
|
cliente_id: ID del cliente
|
|
sito_id: ID del sito
|
|
topic_type: Tipo di topic (alarms, telemetry, status)
|
|
|
|
Returns:
|
|
Topic MQTT formattato
|
|
|
|
Examples:
|
|
>>> MQTTTopics.build_topic(5, 17, "alarms")
|
|
'terrain/5/17/alarms'
|
|
|
|
>>> MQTTTopics.build_topic(5, 17, "telemetry")
|
|
'terrain/5/17/telemetry'
|
|
"""
|
|
return f"{MQTTTopics.BASE}/{cliente_id}/{sito_id}/{topic_type}"
|
|
|
|
@staticmethod
|
|
def parse_topic(topic: str) -> dict:
|
|
"""
|
|
Parse un topic MQTT ed estrae le informazioni
|
|
|
|
Args:
|
|
topic: Topic MQTT da parsare
|
|
|
|
Returns:
|
|
Dict con cliente_id, sito_id, topic_type
|
|
|
|
Raises:
|
|
ValueError: Se il topic è malformato
|
|
|
|
Examples:
|
|
>>> MQTTTopics.parse_topic("terrain/5/17/alarms")
|
|
{'cliente_id': 5, 'sito_id': 17, 'topic_type': 'alarms'}
|
|
"""
|
|
parts = topic.split('/')
|
|
|
|
if len(parts) < 4:
|
|
raise ValueError(
|
|
f"Topic malformato: {topic}. "
|
|
f"Atteso: terrain/{{cliente_id}}/{{sito_id}}/{{type}}"
|
|
)
|
|
|
|
if parts[0] != MQTTTopics.BASE:
|
|
raise ValueError(
|
|
f"Topic base errato: {parts[0]}. "
|
|
f"Atteso: {MQTTTopics.BASE}"
|
|
)
|
|
|
|
try:
|
|
return {
|
|
'cliente_id': int(parts[1]),
|
|
'sito_id': int(parts[2]),
|
|
'topic_type': parts[3]
|
|
}
|
|
except (ValueError, IndexError) as e:
|
|
raise ValueError(
|
|
f"Errore nel parsing del topic {topic}: {e}"
|
|
)
|
|
|
|
@staticmethod
|
|
def get_subscription_pattern(topic_type: TopicType | None = None) -> str:
|
|
"""
|
|
Ottiene il pattern di sottoscrizione MQTT
|
|
|
|
Args:
|
|
topic_type: Se specificato, sottoscrive solo a quel tipo
|
|
|
|
Returns:
|
|
Pattern MQTT con wildcards
|
|
|
|
Examples:
|
|
>>> MQTTTopics.get_subscription_pattern()
|
|
'terrain/+/+/#'
|
|
|
|
>>> MQTTTopics.get_subscription_pattern("alarms")
|
|
'terrain/+/+/alarms'
|
|
"""
|
|
if topic_type:
|
|
return f"{MQTTTopics.BASE}/+/+/{topic_type}"
|
|
else:
|
|
# Sottoscrivi a tutti i tipi
|
|
return f"{MQTTTopics.BASE}/+/+/#"
|