From 768b1d6db94f68ed33f9b36f7a45d8108c719665 Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 23 Dec 2024 23:44:38 +0100 Subject: [PATCH] fix services --- control_mqtt.py | 2 +- data/.env | 7 -- data/.env_password_wallet | 10 +++ password_wallet_api.py | 6 +- services/mqtt_ase_receiver.service | 3 + services/password_wallet.service | 2 +- subscribe_ase_receiver.py | 140 +++++++++++++++++++++++++---- 7 files changed, 143 insertions(+), 27 deletions(-) create mode 100644 data/.env_password_wallet diff --git a/control_mqtt.py b/control_mqtt.py index 4135927..b11f673 100755 --- a/control_mqtt.py +++ b/control_mqtt.py @@ -109,7 +109,7 @@ def main(): args = parser.parse_args() logging.basicConfig( - format="%(asctime)s - PID: %(process)d %(levelname)8s: %(message)s ", + format="- PID: %(process)d %(levelname)8s: %(message)s ", level=args.log_level ) diff --git a/data/.env b/data/.env index d9b922e..4a8d619 100644 --- a/data/.env +++ b/data/.env @@ -1,8 +1 @@ WALLET_MASTER_PASSWORD=Ase#2024@wallet! -DB_NAME=postgres -DB_SCHEMA=mqtt_sec -DB_TABLE=passwords -DB_USER=postgres -DB_PASSWORD=BatManu#171017@mqtt! -DB_HOST=localhost -DB_PORT=5432 diff --git a/data/.env_password_wallet b/data/.env_password_wallet new file mode 100644 index 0000000..e89df19 --- /dev/null +++ b/data/.env_password_wallet @@ -0,0 +1,10 @@ +DB_NAME=postgres +DB_SCHEMA=mqtt_sec +DB_TABLE=passwords +DB_USER=postgres +DB_PASSWORD=BatManu#171017@mqtt! +DB_HOST=localhost +DB_PORT=5432 +DB_SCHEMA_DATA=mqtt_data +DB_TABLE_DATA=received + diff --git a/password_wallet_api.py b/password_wallet_api.py index d794137..fbff8a0 100644 --- a/password_wallet_api.py +++ b/password_wallet_api.py @@ -8,6 +8,8 @@ import logging from cryptography.fernet import Fernet from waitress import serve +app = Flask(__name__) + # Configurazione hash_file = "data/master_hash.txt" @@ -21,10 +23,9 @@ DB_CONFIG = { "host": os.getenv("DB_HOST"), "port": os.getenv("DB_PORT") } -app = Flask(__name__) # Configura il logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - PID: %(process)d %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format='- PID: %(process)d %(levelname)s - %(message)s') def get_db_connection(): return psycopg2.connect( @@ -218,5 +219,4 @@ def list_sites_api(): # Avvio dell'app if __name__ == '__main__': init_db() - #app.run(host='0.0.0.0', port=5000) serve(app, host='0.0.0.0', port=5000) diff --git a/services/mqtt_ase_receiver.service b/services/mqtt_ase_receiver.service index 85ed889..601fea8 100644 --- a/services/mqtt_ase_receiver.service +++ b/services/mqtt_ase_receiver.service @@ -1,6 +1,9 @@ [Unit] Description=MQTT ASE Receiver manager After=network.target +After=mosquitto.service +After=password_wallet.service +Wants=password_wallet.service [Service] WorkingDirectory=/var/lib/mosquitto diff --git a/services/password_wallet.service b/services/password_wallet.service index b8e7dce..5cb62d5 100644 --- a/services/password_wallet.service +++ b/services/password_wallet.service @@ -6,7 +6,7 @@ After=network.target User=mosquitto WorkingDirectory=/var/lib/mosquitto ExecStart=/var/lib/mosquitto/.venv/bin/python3 /var/lib/mosquitto/password_wallet_api.py -EnvironmentFile=/var/lib/mosquitto/data/.env +EnvironmentFile=/var/lib/mosquitto/data/.env_password_wallet Restart=always SyslogIdentifier=password_wallet diff --git a/subscribe_ase_receiver.py b/subscribe_ase_receiver.py index 2fc65a3..7c73596 100644 --- a/subscribe_ase_receiver.py +++ b/subscribe_ase_receiver.py @@ -1,20 +1,130 @@ +import paho.mqtt.subscribe as subscribe +import paho.mqtt.publish as publish +import subprocess +import argparse +import requests +import json import sys -import time import os +import logging -# Verifica degli argomenti passati -if len(sys.argv) != 2: - print("Uso: python programma.py ") - sys.exit(1) +class CurrentClients: + def __init__(self, args): + with open(args.dyn_sec_conf, "r") as file: + data = json.load(file) + self.active_clients_list = [client["username"] for client in data["clients"]] + self.active_clients_pids = {} + self.venv_path = sys.prefix + for username in self.active_clients_list: + if username.endswith("_ase"): + self.start_client(username, args) -# Recupera il nome del client dall'argomento -username = sys.argv[1] + def list(self): + return self.active_clients_list -# Simula un processo in esecuzione -print(f"Inizio elaborazione per {username} (PID: {os.getpid()})") -try: - while True: - time.sleep(5) # Simula un lavoro in corso - print(os.getenv('WALLET_MASTER_PASSWORD')) -except KeyboardInterrupt: - print(f"Processo per {username} terminato") \ No newline at end of file + def pids(self): + return self.active_clients_pids + + def add(self, client, args): + self.active_clients_list.append(client) + self.start_client(client, args) if client.endswith("_ase") else None + + def remove(self, client): + self.active_clients_list.remove(client) + self.stop_client(client) if client.endswith("_ase") else None + + def start_client(self, client, args): + process = subprocess.Popen( + [f'{self.venv_path}/bin/python3', args.ase_receiver, client], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + self.active_clients_pids[client] = process.pid + logging.info(f"Started process for {client}, PID: {process.pid}") + + def stop_client(self, client): + logging.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})") + os.kill(self.active_clients_pids[client], 9) + +def get_client_list(args, auth): + publish.single(args.pub_topic, '{"commands":[{"command":"listClients"}]}', hostname=args.host, port=args.port, auth=auth) + +def create_client(datas, userdata): + get_client_list(userdata['args'], userdata['auth']) + +def delete_client(datas, userdata): + get_client_list(userdata['args'], userdata['auth']) + +def list_clients(datas, userdata): + list_clients = datas['responses'][0]['data']['clients'] + + delta_clients_add = set(list_clients) - set(userdata['cur_clients'].list()) + [userdata['cur_clients'].add(item, userdata['args']) for item in delta_clients_add] + + delta_clients_del = set(userdata['cur_clients'].list()) - set(list_clients) + [userdata['cur_clients'].remove(item) for item in delta_clients_del] + +def ctrl_client_mod(client, userdata, message): + + command_functions = { + "createClient": create_client, + "deleteClient": delete_client, + "listClients": list_clients, + } + + datas = json.loads(message.payload) + target_commands = {"createClient", "deleteClient", "listClients"} + found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands] + if found_command: + command_functions[found_command[0]](datas, userdata) + +def get_credentials(args): + url = args.wallet + "get" + data = { + "master_password": os.getenv('WALLET_MASTER_PASSWORD'), + "site": "mqtt_control" + } + response = requests.post(url, json=data) + if response.status_code != 200: + logging.error(f"Error to get pwd from wallet.") + exit(1) + + return response.json().get('password') + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument('-H', '--host', default="mqtt") + parser.add_argument('-t', '--sub_topic', default="$CONTROL/dynamic-security/v1/response") + parser.add_argument('-T', '--pub_topic', default="$CONTROL/dynamic-security/v1") + parser.add_argument('-q', '--qos', type=int,default=0) + parser.add_argument('-u', '--username', default="admin") + parser.add_argument('-w', '--wallet', default="http://localhost:5000/") + parser.add_argument('-P', '--port', type=int, default=1883) + parser.add_argument('-L', '--log_level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO') + parser.add_argument('-s', '--dyn_sec_conf', default='./dynamic-security.json') + parser.add_argument('-r', '--ase_receiver', default='./subscribe_ase_receiver.py') + + args = parser.parse_args() + + logging.basicConfig( + format=" - PID: %(process)d %(levelname)8s: %(message)s ", + level=args.log_level + ) + + auth = {'username': args.username, 'password': get_credentials(args)} + cur_clients = CurrentClients(args) + userdata = {'args': args, 'cur_clients': cur_clients, 'auth': auth} + + try: + subscribe.callback(ctrl_client_mod, hostname=args.host, port=args.port, topics=args.sub_topic, auth=auth, userdata=userdata) + except (KeyboardInterrupt, Exception) as e: + logging.info("Terminating: ....") + + for client in cur_clients.list(): + cur_clients.stop_client(client) if client.endswith("_ase") else None + + +if __name__ == "__main__": + main() \ No newline at end of file