diff --git a/control_mqtt.py b/control_mqtt.py index b11f673..d6083ba 100755 --- a/control_mqtt.py +++ b/control_mqtt.py @@ -35,7 +35,7 @@ class CurrentClients: def start_client(self, client, args): process = subprocess.Popen( - [f'{self.venv_path}/bin/python3', args.ase_receiver, client], + [f'{self.venv_path}/bin/python3', args.ase_receiver, f'-c {client}'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True diff --git a/data/.env b/data/.env index 4a8d619..a0edd9f 100644 --- a/data/.env +++ b/data/.env @@ -1 +1,8 @@ WALLET_MASTER_PASSWORD=Ase#2024@wallet! +DB_NAME=postgres +DB_SCHEMA=mqtt_data +DB_TABLE=received +DB_USER=postgres +DB_PASSWORD=BatManu#171017@mqtt! +DB_HOST=localhost +DB_PORT=5432 diff --git a/data/.env_password_wallet b/data/.env_password_wallet index e89df19..463f759 100644 --- a/data/.env_password_wallet +++ b/data/.env_password_wallet @@ -5,6 +5,5 @@ DB_USER=postgres DB_PASSWORD=BatManu#171017@mqtt! DB_HOST=localhost DB_PORT=5432 -DB_SCHEMA_DATA=mqtt_data -DB_TABLE_DATA=received + diff --git a/subscribe_ase_receiver.py b/subscribe_ase_receiver.py index 7c73596..174650e 100644 --- a/subscribe_ase_receiver.py +++ b/subscribe_ase_receiver.py @@ -1,89 +1,27 @@ import paho.mqtt.subscribe as subscribe -import paho.mqtt.publish as publish -import subprocess import argparse import requests +import psycopg2 import json -import sys import os import logging -class CurrentClients: - def __init__(self, args): - with open(args.dyn_sec_conf, "r") as file: - data = json.load(file) - self.active_clients_list = [client["username"] for client in data["clients"]] - self.active_clients_pids = {} - self.venv_path = sys.prefix - for username in self.active_clients_list: - if username.endswith("_ase"): - self.start_client(username, args) - - def list(self): - return self.active_clients_list - - def pids(self): - return self.active_clients_pids - - def add(self, client, args): - self.active_clients_list.append(client) - self.start_client(client, args) if client.endswith("_ase") else None - - def remove(self, client): - self.active_clients_list.remove(client) - self.stop_client(client) if client.endswith("_ase") else None - - def start_client(self, client, args): - process = subprocess.Popen( - [f'{self.venv_path}/bin/python3', args.ase_receiver, client], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) - self.active_clients_pids[client] = process.pid - logging.info(f"Started process for {client}, PID: {process.pid}") - - def stop_client(self, client): - logging.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})") - os.kill(self.active_clients_pids[client], 9) - -def get_client_list(args, auth): - publish.single(args.pub_topic, '{"commands":[{"command":"listClients"}]}', hostname=args.host, port=args.port, auth=auth) - -def create_client(datas, userdata): - get_client_list(userdata['args'], userdata['auth']) - -def delete_client(datas, userdata): - get_client_list(userdata['args'], userdata['auth']) - -def list_clients(datas, userdata): - list_clients = datas['responses'][0]['data']['clients'] - - delta_clients_add = set(list_clients) - set(userdata['cur_clients'].list()) - [userdata['cur_clients'].add(item, userdata['args']) for item in delta_clients_add] - - delta_clients_del = set(userdata['cur_clients'].list()) - set(list_clients) - [userdata['cur_clients'].remove(item) for item in delta_clients_del] - -def ctrl_client_mod(client, userdata, message): - - command_functions = { - "createClient": create_client, - "deleteClient": delete_client, - "listClients": list_clients, - } - - datas = json.loads(message.payload) - target_commands = {"createClient", "deleteClient", "listClients"} - found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands] - if found_command: - command_functions[found_command[0]](datas, userdata) +# Configurazione connessione PostgreSQL +DB_CONFIG = { + "dbname": os.getenv("DB_NAME"), + "dbschema": os.getenv("DB_SCHEMA"), + "dbtable": os.getenv("DB_TABLE"), + "user": os.getenv("DB_USER"), + "password": os.getenv("DB_PASSWORD"), + "host": os.getenv("DB_HOST"), + "port": os.getenv("DB_PORT") +} def get_credentials(args): url = args.wallet + "get" data = { "master_password": os.getenv('WALLET_MASTER_PASSWORD'), - "site": "mqtt_control" + "site": f"{args.client}_site" } response = requests.post(url, json=data) if response.status_code != 200: @@ -92,19 +30,83 @@ def get_credentials(args): return response.json().get('password') +def get_db_connection(): + return psycopg2.connect( + dbname=DB_CONFIG["dbname"], + user=DB_CONFIG["user"], + password=DB_CONFIG["password"], + host=DB_CONFIG["host"], + port=DB_CONFIG["port"] + ) + +# Inizializza il database +def init_db(args): + try: + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute(f""" + CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} ( + id bigserial NOT NULL, + main_topic text NOT NULL, + tt_data jsonb NULL, + created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL + ) + PARTITION BY LIST (main_topic); + """) + conn.commit() + cursor.execute(f""" + CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_other PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} + DEFAULT; + """) + conn.commit() + cursor.execute(f""" + CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_{args.client.removesuffix("_ase")} PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} + FOR VALUES IN ('{args.client.removesuffix("_ase")}') + """) + conn.commit() + except Exception as e: + logging.error(f"Errore durante l'inizializzazione del database: {e}") + exit(1) + finally: + conn.close() + logging.info("Database inizializzato.") + +def create_nested_json(path, data): + keys = path.split('/')[1:] + nested_json = data + for key in reversed(keys): + nested_json = {key: nested_json} + return nested_json + +def receive_data(client, userdata, message): + datastore = json.loads(message.payload) + json_data = create_nested_json(message.topic, datastore) + + try: + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute(f""" + INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} + (main_topic, tt_data) + VALUES + ('{userdata['args'].client.removesuffix("_ase")}', '{json.dumps(json_data)}'::jsonb); + """) + conn.commit() + except Exception as e: + logging.error(f"Errore durante l'inserimento dei dati nel database: {e}") + finally: + conn.close() + + def main(): parser = argparse.ArgumentParser() parser.add_argument('-H', '--host', default="mqtt") - parser.add_argument('-t', '--sub_topic', default="$CONTROL/dynamic-security/v1/response") - parser.add_argument('-T', '--pub_topic', default="$CONTROL/dynamic-security/v1") - parser.add_argument('-q', '--qos', type=int,default=0) - parser.add_argument('-u', '--username', default="admin") - parser.add_argument('-w', '--wallet', default="http://localhost:5000/") + parser.add_argument('-q', '--qos', type=int,default=2) parser.add_argument('-P', '--port', type=int, default=1883) + parser.add_argument('-c', '--client') + parser.add_argument('-w', '--wallet', default="http://mqtt:5000/") parser.add_argument('-L', '--log_level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO') - parser.add_argument('-s', '--dyn_sec_conf', default='./dynamic-security.json') - parser.add_argument('-r', '--ase_receiver', default='./subscribe_ase_receiver.py') args = parser.parse_args() @@ -112,19 +114,19 @@ def main(): format=" - PID: %(process)d %(levelname)8s: %(message)s ", level=args.log_level ) + init_db(args) - auth = {'username': args.username, 'password': get_credentials(args)} - cur_clients = CurrentClients(args) - userdata = {'args': args, 'cur_clients': cur_clients, 'auth': auth} + auth = {'username': args.client, 'password': get_credentials(args)} + userdata = {'args': args} try: - subscribe.callback(ctrl_client_mod, hostname=args.host, port=args.port, topics=args.sub_topic, auth=auth, userdata=userdata) + subscribe.callback(receive_data, hostname=args.host, port=args.port, + topics=f'{args.client.removesuffix("_ase")}/#', + qos=args.qos, clean_session=False, + auth=auth, client_id=f'{args.client.removesuffix("_ase")}_client_ase', + userdata=userdata) except (KeyboardInterrupt, Exception) as e: - logging.info("Terminating: ....") - - for client in cur_clients.list(): - cur_clients.stop_client(client) if client.endswith("_ase") else None - + logging.info(f"Terminating: ....{e}") if __name__ == "__main__": main() \ No newline at end of file