#!/usr/bin/env python3 # -*- coding: utf-8 -*- import paho.mqtt.subscribe as subscribe import paho.mqtt.publish as publish import subprocess import argparse import json import os import logging class CurrentClients: def __init__(self, args): with open(args.config_json, "r") as file: data = json.load(file) self.active_clients_list = [client["username"] for client in data["clients"]] self.active_clients_pids = {} for username in self.active_clients_list: if username.endswith("_ase"): self.start_client(username, args) def list(self): return self.active_clients_list def pids(self): return self.active_clients_pids def add(self, client, args): self.active_clients_list.append(client) self.start_client(client, args) if client.endswith("_ase") else None def remove(self, client): self.active_clients_list.remove(client) self.stop_client(client) if client.endswith("_ase") else None def start_client(self, client, args): process = subprocess.Popen( ["python3", args.sub_process, client], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) self.active_clients_pids[client] = process.pid logging.info(f"Started process for {client}, PID: {process.pid}") def stop_client(self, client): logging.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})") os.kill(self.active_clients_pids[client], 9) def get_client_list(args): publish.single(args.pub_topic, args.list_clients_cmd, hostname=args.host, port=args.port, auth={'username': args.username, 'password': args.password}) def create_client(datas, args, cur_clients): get_client_list(args) def delete_client(datas, args, cur_clients): get_client_list(args) def list_client(datas, args, cur_clients): list_clients = datas['responses'][0]['data']['clients'] delta_clients_add = set(list_clients) - set(cur_clients.list()) [cur_clients.add(item,args) for item in delta_clients_add] delta_clients_del = set(cur_clients.list()) - set(list_clients) [cur_clients.remove(item) for item in delta_clients_del] def ctrl_client_mod(client, userdata, message): command_functions = { "createClient": create_client, "deleteClient": delete_client, "listClients": list_client, } datas = json.loads(message.payload) target_commands = {"createClient", "deleteClient", "listClients"} found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands] if found_command: command_functions[found_command[0]](datas, args, userdata) parser = argparse.ArgumentParser() parser.add_argument('-H', '--host', required=False, default="mqtt") parser.add_argument('-t', '--sub_topic', required=False, default="$CONTROL/dynamic-security/v1/response") parser.add_argument('-T', '--pub_topic', required=False, default="$CONTROL/dynamic-security/v1") parser.add_argument('-q', '--qos', required=False, type=int,default=0) parser.add_argument('-u', '--username', required=False, default="admin") parser.add_argument('-p', '--password', required=False, default="BatManu#171017") parser.add_argument('-P', '--port', required=False, type=int, default=1883) args = parser.parse_args() args.list_clients_cmd = '{"commands":[{"command":"listClients"}]}' args.config_json = './dynamic-security.json' args.sub_process = './subscribe_ase_receiver.py' # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') cur_clients = CurrentClients(args) try: subscribe.callback(ctrl_client_mod, hostname=args.host, port=args.port, topics=args.sub_topic, auth={'username': args.username, 'password': args.password}, userdata=cur_clients) except (KeyboardInterrupt, Exception) as e: logging.info("Terminating: ....") for client in cur_clients.list(): cur_clients.stop_client(client) if client.endswith("_ase") else None