Files
ase_mqtt/subscribe_ase_receiver.py
2024-12-29 18:39:46 +01:00

132 lines
4.2 KiB
Python

import paho.mqtt.subscribe as subscribe
import argparse
import requests
import psycopg2
import json
import sys
import os
import logging
# Configurazione Logging
logging.basicConfig(level=logging.INFO, format='- PID: %(process)d %(levelname)8s: %(message)s', stream=sys.stderr)
logger = logging.getLogger()
# Configurazione connessione PostgreSQL
DB_CONFIG = {
"dbname": os.getenv("DB_NAME"),
"dbschema": os.getenv("DB_SCHEMA"),
"dbtable": os.getenv("DB_TABLE"),
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"host": os.getenv("DB_HOST"),
"port": os.getenv("DB_PORT")
}
def get_credentials(args):
url = args.wallet + "get"
data = {
"master_password": os.getenv('WALLET_MASTER_PASSWORD'),
"site": f"{args.client}_site"
}
response = requests.post(url, json=data)
if response.status_code != 200:
logger.error(f"Error to get pwd from wallet.")
exit(1)
return response.json().get('password')
def get_db_connection():
return psycopg2.connect(
dbname=DB_CONFIG["dbname"],
user=DB_CONFIG["user"],
password=DB_CONFIG["password"],
host=DB_CONFIG["host"],
port=DB_CONFIG["port"]
)
# Inizializza il database
def init_db(args):
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (
id bigserial NOT NULL,
main_topic text NOT NULL,
tt_data jsonb NULL,
created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL
)
PARTITION BY LIST (main_topic);
""")
conn.commit()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_other PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
DEFAULT;
""")
conn.commit()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_{args.client.removesuffix("_ase")} PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
FOR VALUES IN ('{args.client.removesuffix("_ase")}')
""")
conn.commit()
except Exception as e:
logger.error(f"Errore durante l'inizializzazione del database: {e}")
exit(1)
finally:
conn.close()
logger.info("Database inizializzato.")
def create_nested_json(path, data):
keys = path.split('/')[1:]
nested_json = data
for key in reversed(keys):
nested_json = {key: nested_json}
return nested_json
def receive_data(client, userdata, message):
datastore = json.loads(message.payload)
json_data = create_nested_json(message.topic, datastore)
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"""
INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
(main_topic, tt_data)
VALUES
('{userdata['args'].client.removesuffix("_ase")}', '{json.dumps(json_data)}'::jsonb);
""")
conn.commit()
except Exception as e:
logger.error(f"Errore durante l'inserimento dei dati nel database: {e}")
finally:
conn.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', default="mqtt")
parser.add_argument('-q', '--qos', type=int,default=2)
parser.add_argument('-P', '--port', type=int, default=1883)
parser.add_argument('-c', '--client')
parser.add_argument('-w', '--wallet', default="http://mqtt:5000/")
args = parser.parse_args()
init_db(args)
auth = {'username': args.client, 'password': get_credentials(args)}
userdata = {'args': args}
try:
subscribe.callback(receive_data, hostname=args.host, port=args.port,
topics=f'{args.client.removesuffix("_ase")}/#',
qos=args.qos, clean_session=False,
auth=auth, client_id=f'{args.client.removesuffix("_ase")}_client_ase',
userdata=userdata)
except (KeyboardInterrupt, Exception) as e:
logger.info(f"Terminating: ....{e}")
if __name__ == "__main__":
main()