166 lines
5.3 KiB
Python
166 lines
5.3 KiB
Python
import paho.mqtt.client as mqtt
|
|
from paho.mqtt.properties import Properties
|
|
from paho.mqtt.packettypes import PacketTypes
|
|
import argparse
|
|
import requests
|
|
import psycopg2
|
|
import json
|
|
import os
|
|
import logging
|
|
|
|
# Configurazione Logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - PID: %(process)d %(levelname)8s: %(message)s', filename="/var/log/ase_receiver.log")
|
|
logger = logging.getLogger()
|
|
|
|
# Configurazione connessione PostgreSQL
|
|
DB_CONFIG = {
|
|
"dbname": os.getenv("DB_NAME"),
|
|
"dbschema": os.getenv("DB_SCHEMA"),
|
|
"dbtable": os.getenv("DB_TABLE"),
|
|
"user": os.getenv("DB_USER"),
|
|
"password": os.getenv("DB_PASSWORD"),
|
|
"host": os.getenv("DB_HOST"),
|
|
"port": os.getenv("DB_PORT")
|
|
}
|
|
|
|
def get_credentials(args):
|
|
url = args.wallet + "get"
|
|
data = {
|
|
"master_password": os.getenv('WALLET_MASTER_PASSWORD'),
|
|
"site": f"{args.client}_site"
|
|
}
|
|
response = requests.post(url, json=data)
|
|
if response.status_code != 200:
|
|
logger.error("Error to get pwd from wallet.")
|
|
exit(1)
|
|
|
|
return response.json().get('password'), response.json().get('client_id'), response.json().get('topic')
|
|
|
|
def get_db_connection():
|
|
return psycopg2.connect(
|
|
dbname=DB_CONFIG["dbname"],
|
|
user=DB_CONFIG["user"],
|
|
password=DB_CONFIG["password"],
|
|
host=DB_CONFIG["host"],
|
|
port=DB_CONFIG["port"]
|
|
)
|
|
|
|
def init_db(args, main_topic):
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"""
|
|
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (
|
|
id bigserial NOT NULL,
|
|
main_topic text NOT NULL,
|
|
tt_data jsonb NULL,
|
|
created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL,
|
|
CONSTRAINT {DB_CONFIG['dbtable']}_pkey PRIMARY KEY (id, main_topic)
|
|
)
|
|
PARTITION BY LIST (main_topic);
|
|
""")
|
|
conn.commit()
|
|
cursor.execute(f"""
|
|
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_other PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
|
|
DEFAULT;
|
|
""")
|
|
conn.commit()
|
|
cursor.execute(f"""
|
|
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_{main_topic} PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
|
|
FOR VALUES IN ('{main_topic}')
|
|
""")
|
|
conn.commit()
|
|
except Exception as e:
|
|
logger.error(f"Errore durante l'inizializzazione del database: {e}")
|
|
exit(1)
|
|
finally:
|
|
conn.close()
|
|
logger.info("Database inizializzato.")
|
|
|
|
def create_nested_json(path, data):
|
|
main_topic = path.split('/')[0]
|
|
keys = path.split('/')[1:]
|
|
nested_json = data
|
|
for key in reversed(keys):
|
|
nested_json = {key: nested_json}
|
|
return main_topic, nested_json
|
|
|
|
def on_connect(client, userdata, flags, rc, properties):
|
|
if rc == 0:
|
|
logger.info("Connesso al broker MQTT")
|
|
else:
|
|
logger.error(f"Errore di connessione, codice: {rc}")
|
|
|
|
def on_message(client, userdata, message):
|
|
datastore = json.loads(message.payload)
|
|
main_topic, json_data = create_nested_json(message.topic, datastore)
|
|
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"""
|
|
INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
|
|
(main_topic, tt_data)
|
|
VALUES
|
|
('{main_topic}', '{json.dumps(json_data)}'::jsonb);
|
|
""")
|
|
conn.commit()
|
|
except Exception as e:
|
|
logger.error(f"Errore durante l'inserimento dei dati nel database: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
def on_disconnect(client, userdata, rc, properties=None):
|
|
if rc != 0:
|
|
logger.warning(f"Disconnesso dal broker con codice {rc}. Riconnessione...")
|
|
client.reconnect()
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument('-H', '--host', default="mqtt")
|
|
parser.add_argument('-q', '--qos', type=int, default=1)
|
|
parser.add_argument('-P', '--port', type=int, default=1883)
|
|
parser.add_argument('-c', '--client')
|
|
parser.add_argument('-w', '--wallet', default="http://mqtt:5000/")
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
password, client_id, topic = get_credentials(args)
|
|
|
|
main_topic = topic.split('/')[0]
|
|
|
|
init_db(args, main_topic)
|
|
|
|
userdata = {'args': args}
|
|
|
|
properties=Properties(PacketTypes.CONNECT)
|
|
properties.SessionExpiryInterval=3600
|
|
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id, protocol=mqtt.MQTTv5)
|
|
client.username_pw_set(username=args.client, password=password)
|
|
client.user_data_set(userdata)
|
|
#client.logger = logger
|
|
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
client.on_disconnect = on_disconnect
|
|
|
|
client.reconnect_delay_set(min_delay=1, max_delay=120)
|
|
client.connect(args.host, args.port, clean_start=False) #, properties=properties)
|
|
|
|
client.subscribe(topic, qos=args.qos)
|
|
|
|
try:
|
|
logger.info("Avvio del loop MQTT.")
|
|
client.loop_forever()
|
|
except KeyboardInterrupt:
|
|
logger.info("Terminazione manuale.")
|
|
except Exception as e:
|
|
logger.error(f"Errore durante il ciclo MQTT: {e}")
|
|
finally:
|
|
client.disconnect()
|
|
logger.info("Disconnesso dal broker.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |