From 927578ecd3ac0d2a8e595505ad8c7593e7339ba7 Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 29 Dec 2024 19:52:31 +0100 Subject: [PATCH] full client --- control_mqtt.py | 189 ++++++++++++++++++++++++++++++------------------ 1 file changed, 119 insertions(+), 70 deletions(-) diff --git a/control_mqtt.py b/control_mqtt.py index ad54b2c..fd1da3a 100755 --- a/control_mqtt.py +++ b/control_mqtt.py @@ -1,5 +1,4 @@ -import paho.mqtt.subscribe as subscribe -import paho.mqtt.publish as publish +import paho.mqtt.client as mqtt import subprocess import argparse import requests @@ -8,6 +7,10 @@ import sys import os import logging +# Configurazione Logging +logging.basicConfig(level=logging.INFO, format='- PID: %(process)d %(levelname)8s: %(message)s') +logger = logging.getLogger(__name__) + class CurrentClients: def __init__(self, args): with open(args.dyn_sec_conf, "r") as file: @@ -17,8 +20,11 @@ class CurrentClients: self.venv_path = sys.prefix for username in self.active_clients_list: if username.endswith("_ase"): - self.start_client(username, args) - logging.info(f"Init start client for {username}") + try: + self.start_client(username, args) + logger.info(f"Init start client for {username}") + except Exception as e: + logger.error(f"Error starting client {username}: {e}") def list(self): return self.active_clients_list @@ -28,76 +34,117 @@ class CurrentClients: def add(self, client, args): self.active_clients_list.append(client) - self.start_client(client, args) if client.endswith("_ase") else None + try: + self.start_client(client, args) if client.endswith("_ase") else None + except Exception as e: + logger.error(f"Error adding client {client}: {e}") def remove(self, client): self.active_clients_list.remove(client) - self.stop_client(client) if client.endswith("_ase") else None + try: + self.stop_client(client) if client.endswith("_ase") else None + except Exception as e: + logger.error(f"Error removing client {client}: {e}") def start_client(self, client, args): - process = subprocess.Popen( - [f'{self.venv_path}/bin/python3', args.ase_receiver, f'--client={client}'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) - ''' - for line in process.stdout: - logging.info(f"Subtask stdout: {line.strip()}") - - for line in process.stderr: - logging.error(f"Subtask stderr: {line.strip()}") - ''' - self.active_clients_pids[client] = process.pid - logging.info(f"Started process for {client}, PID: {process.pid}") + try: + process = subprocess.Popen( + [f'{self.venv_path}/bin/python3', args.ase_receiver, f'--client={client}'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + self.active_clients_pids[client] = process.pid + logger.info(f"Started process for {client}, PID: {process.pid}") + except Exception as e: + logger.error(f"Error starting process for {client}: {e}") def stop_client(self, client): - logging.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})") - os.kill(self.active_clients_pids[client], 9) + try: + logger.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})") + os.kill(self.active_clients_pids[client], 9) + except Exception as e: + logger.error(f"Error stopping client {client}: {e}") def get_client_list(args, auth): - publish.single(args.pub_topic, '{"commands":[{"command":"listClients"}]}', hostname=args.host, port=args.port, auth=auth) + try: + mqtt.publish.single(args.pub_topic, '{"commands":[{"command":"listClients"}]}', hostname=args.host, port=args.port, auth=auth) + except Exception as e: + logger.error(f"Error publishing client list request: {e}") def create_client(datas, userdata): - get_client_list(userdata['args'], userdata['auth']) + try: + get_client_list(userdata['args'], userdata['auth']) + except Exception as e: + logger.error(f"Error creating client: {e}") def delete_client(datas, userdata): - get_client_list(userdata['args'], userdata['auth']) + try: + get_client_list(userdata['args'], userdata['auth']) + except Exception as e: + logger.error(f"Error deleting client: {e}") def list_clients(datas, userdata): - list_clients = datas['responses'][0]['data']['clients'] + try: + list_clients = datas['responses'][0]['data']['clients'] - delta_clients_add = set(list_clients) - set(userdata['cur_clients'].list()) - [userdata['cur_clients'].add(item, userdata['args']) for item in delta_clients_add] + delta_clients_add = set(list_clients) - set(userdata['cur_clients'].list()) + [userdata['cur_clients'].add(item, userdata['args']) for item in delta_clients_add] - delta_clients_del = set(userdata['cur_clients'].list()) - set(list_clients) - [userdata['cur_clients'].remove(item) for item in delta_clients_del] + delta_clients_del = set(userdata['cur_clients'].list()) - set(list_clients) + [userdata['cur_clients'].remove(item) for item in delta_clients_del] + except Exception as e: + logger.error(f"Error listing clients: {e}") def ctrl_client_mod(client, userdata, message): - command_functions = { - "createClient": create_client, - "deleteClient": delete_client, - "listClients": list_clients, - } + try: + command_functions = { + "createClient": create_client, + "deleteClient": delete_client, + "listClients": list_clients, + } - datas = json.loads(message.payload) - target_commands = {"createClient", "deleteClient", "listClients"} - found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands] - if found_command: - command_functions[found_command[0]](datas, userdata) + datas = json.loads(message.payload.decode('utf-8')) + target_commands = {"createClient", "deleteClient", "listClients"} + found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands] + if found_command: + command_functions[found_command[0]](datas, userdata) + except Exception as e: + logger.error(f"Error processing client command: {e}") def get_credentials(args): - url = args.wallet + "get" - data = { - "master_password": os.getenv('WALLET_MASTER_PASSWORD'), - "site": "mqtt_control" - } - response = requests.post(url, json=data) - if response.status_code != 200: - logging.error(f"Error to get pwd from wallet.") + try: + url = args.wallet + "get" + data = { + "master_password": os.getenv('WALLET_MASTER_PASSWORD'), + "site": "mqtt_control" + } + response = requests.post(url, json=data) + if response.status_code != 200: + logger.error("Error to get pwd from wallet.") + exit(1) + + return response.json().get('password') + except Exception as e: + logger.error(f"Error getting credentials: {e}") exit(1) - return response.json().get('password') +def on_connect(client, userdata, flags, rc, properties=None): + try: + if rc == 0: + logger.info("Connected successfully") + client.subscribe(userdata['args'].sub_topic, userdata['args'].qos) + else: + logger.error(f"Connection failed with code {rc}") + except Exception as e: + logger.error(f"Error on connect: {e}") + +def on_message(client, userdata, message): + try: + #logger.info(f"Received message on {message.topic}: {message.payload}") + ctrl_client_mod(client, userdata, message) + except Exception as e: + logger.error(f"Error handling message: {e}") def main(): parser = argparse.ArgumentParser() @@ -109,32 +156,34 @@ def main(): parser.add_argument('-u', '--username', default="admin") parser.add_argument('-w', '--wallet', default="http://localhost:5000/") parser.add_argument('-P', '--port', type=int, default=1883) - parser.add_argument('-L', '--log_level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO') parser.add_argument('-s', '--dyn_sec_conf', default='./dynamic-security.json') parser.add_argument('-r', '--ase_receiver', default='./subscribe_ase_receiver.py') args = parser.parse_args() - logging.basicConfig( - format="- PID: %(process)d %(levelname)8s: %(message)s ", - level=args.log_level - ) - - - auth = {'username': args.username, 'password': get_credentials(args)} - cur_clients = CurrentClients(args) - userdata = {'args': args, 'cur_clients': cur_clients, 'auth': auth} - try: - subscribe.callback(ctrl_client_mod, hostname=args.host, port=args.port, - topics=args.sub_topic, - auth=auth, userdata=userdata) + auth = {'username': args.username, 'password': get_credentials(args)} + cur_clients = CurrentClients(args) + userdata = {'args': args, 'cur_clients': cur_clients, 'auth': auth} + + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5) + client.username_pw_set(auth['username'], auth['password']) + + client.on_connect = on_connect + client.on_message = on_message + + client.user_data_set(userdata) + + client.connect(args.host, args.port) + client.loop_forever() except (KeyboardInterrupt, Exception) as e: - logging.info("Terminating: ....") - - for client in cur_clients.list(): - cur_clients.stop_client(client) if client.endswith("_ase") else None - + logger.info("Terminating: ....") + logger.error(f"Error in main loop: {e}") + for client_name in cur_clients.list(): + try: + cur_clients.stop_client(client_name) if client_name.endswith("_ase") else None + except Exception as stop_e: + logger.error(f"Error stopping client {client_name}: {stop_e}") if __name__ == "__main__": - main() \ No newline at end of file + main()