import paho.mqtt.client as mqtt from paho.mqtt.properties import Properties from paho.mqtt.packettypes import PacketTypes import argparse import requests import psycopg2 import json import os import logging # Configurazione Logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - PID: %(process)d %(levelname)8s: %(message)s', filename="/var/log/ase_receiver.log") logger = logging.getLogger() # Configurazione connessione PostgreSQL DB_CONFIG = { "dbname": os.getenv("DB_NAME"), "dbschema": os.getenv("DB_SCHEMA"), "dbtable": os.getenv("DB_TABLE"), "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "host": os.getenv("DB_HOST"), "port": os.getenv("DB_PORT") } def get_credentials(args): url = args.wallet + "get" data = { "master_password": os.getenv('WALLET_MASTER_PASSWORD'), "site": f"{args.client}_site" } response = requests.post(url, json=data) if response.status_code != 200: logger.error("Error to get pwd from wallet.") exit(1) return response.json().get('password'), response.json().get('client_id'), response.json().get('topic') def get_db_connection(): return psycopg2.connect( dbname=DB_CONFIG["dbname"], user=DB_CONFIG["user"], password=DB_CONFIG["password"], host=DB_CONFIG["host"], port=DB_CONFIG["port"] ) def init_db(args, main_topic): try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(f""" CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} ( id bigserial NOT NULL, main_topic text NOT NULL, tt_data jsonb NULL, created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL, CONSTRAINT {DB_CONFIG['dbtable']}_pkey PRIMARY KEY (id, main_topic) ) PARTITION BY LIST (main_topic); """) conn.commit() cursor.execute(f""" CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_other PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} DEFAULT; """) conn.commit() cursor.execute(f""" CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_{main_topic} PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} FOR VALUES IN ('{main_topic}') """) conn.commit() except Exception as e: logger.error(f"Errore durante l'inizializzazione del database: {e}") exit(1) finally: conn.close() logger.info("Database inizializzato.") def create_nested_json(path, data): main_topic = path.split('/')[0] keys = path.split('/')[1:] nested_json = data for key in reversed(keys): nested_json = {key: nested_json} return main_topic, nested_json def on_connect(client, userdata, flags, rc, properties): if rc == 0: logger.info("Connesso al broker MQTT") else: logger.error(f"Errore di connessione, codice: {rc}") def on_message(client, userdata, message): datastore = json.loads(message.payload) main_topic, json_data = create_nested_json(message.topic, datastore) try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(f""" INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (main_topic, tt_data) VALUES ('{main_topic}', '{json.dumps(json_data)}'::jsonb); """) conn.commit() except Exception as e: logger.error(f"Errore durante l'inserimento dei dati nel database: {e}") finally: conn.close() def on_disconnect(client, userdata, rc, properties=None): if rc != 0: logger.warning(f"Disconnesso dal broker con codice {rc}. Riconnessione...") client.reconnect() def main(): parser = argparse.ArgumentParser() parser.add_argument('-H', '--host', default="mqtt") parser.add_argument('-q', '--qos', type=int, default=1) parser.add_argument('-P', '--port', type=int, default=1883) parser.add_argument('-c', '--client') parser.add_argument('-w', '--wallet', default="http://mqtt:5000/") args = parser.parse_args() password, client_id, topic = get_credentials(args) main_topic = topic.split('/')[0] init_db(args, main_topic) userdata = {'args': args} properties=Properties(PacketTypes.CONNECT) properties.SessionExpiryInterval=3600 client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id, protocol=mqtt.MQTTv5) client.username_pw_set(username=args.client, password=password) client.user_data_set(userdata) #client.logger = logger client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect client.reconnect_delay_set(min_delay=1, max_delay=120) client.connect(args.host, args.port, clean_start=False) #, properties=properties) client.subscribe(topic, qos=args.qos) try: logger.info("Avvio del loop MQTT.") client.loop_forever() except KeyboardInterrupt: logger.info("Terminazione manuale.") except Exception as e: logger.error(f"Errore durante il ciclo MQTT: {e}") finally: client.disconnect() logger.info("Disconnesso dal broker.") if __name__ == "__main__": main()