optimized control_mqtt

This commit is contained in:
2024-12-21 18:25:54 +01:00
parent 1a1b6f36d3
commit 282cb119ea
7 changed files with 177 additions and 112 deletions

119
control_mqtt.py Executable file
View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import paho.mqtt.subscribe as subscribe
import paho.mqtt.publish as publish
import subprocess
import argparse
import json
import os
import logging
class CurrentClients:
def __init__(self, args):
with open(args.dyn_sec_conf, "r") as file:
data = json.load(file)
self.active_clients_list = [client["username"] for client in data["clients"]]
self.active_clients_pids = {}
for username in self.active_clients_list:
if username.endswith("_ase"):
self.start_client(username, args)
def list(self):
return self.active_clients_list
def pids(self):
return self.active_clients_pids
def add(self, client, args):
self.active_clients_list.append(client)
self.start_client(client, args) if client.endswith("_ase") else None
def remove(self, client):
self.active_clients_list.remove(client)
self.stop_client(client) if client.endswith("_ase") else None
def start_client(self, client, args):
process = subprocess.Popen(
["python3", args.ase_receiver, client],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.active_clients_pids[client] = process.pid
logging.info(f"Started process for {client}, PID: {process.pid}")
def stop_client(self, client):
logging.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})")
os.kill(self.active_clients_pids[client], 9)
def get_client_list(args, auth):
publish.single(args.pub_topic, '{"commands":[{"command":"listClients"}]}', hostname=args.host, port=args.port, auth=auth)
def create_client(datas, userdata):
get_client_list(userdata['args'], userdata['auth'])
def delete_client(datas, userdata):
get_client_list(userdata['args'], userdata['auth'])
def list_clients(datas, userdata):
list_clients = datas['responses'][0]['data']['clients']
delta_clients_add = set(list_clients) - set(userdata['cur_clients'].list())
[userdata['cur_clients'].add(item, userdata['args']) for item in delta_clients_add]
delta_clients_del = set(userdata['cur_clients'].list()) - set(list_clients)
[userdata['cur_clients'].remove(item) for item in delta_clients_del]
def ctrl_client_mod(client, userdata, message):
command_functions = {
"createClient": create_client,
"deleteClient": delete_client,
"listClients": list_clients,
}
datas = json.loads(message.payload)
target_commands = {"createClient", "deleteClient", "listClients"}
found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands]
if found_command:
command_functions[found_command[0]](datas, userdata)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', default="mqtt")
parser.add_argument('-t', '--sub_topic', default="$CONTROL/dynamic-security/v1/response")
parser.add_argument('-T', '--pub_topic', default="$CONTROL/dynamic-security/v1")
parser.add_argument('-q', '--qos', type=int,default=0)
parser.add_argument('-u', '--username', default="admin")
parser.add_argument('-p', '--password', default="BatManu#171017")
parser.add_argument('-P', '--port', type=int, default=1883)
parser.add_argument('-l', '--log', default='/var/log/mosquitto/subscriber_manager.log')
parser.add_argument('-L', '--log_level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO')
parser.add_argument('-s', '--dyn_sec_conf', default='./dynamic-security.json')
parser.add_argument('-r', '--ase_receiver', default='./subscribe_ase_receiver.py')
args = parser.parse_args()
logging.basicConfig(
format="%(asctime)s - PID: %(process)d %(levelname)8s: %(message)s ",
level=args.log_level,
filename=args.log
)
cur_clients = CurrentClients(args)
auth = {'username': args.username, 'password': args.password}
userdata = {'args': args, 'cur_clients': cur_clients, 'auth': auth}
try:
subscribe.callback(ctrl_client_mod, hostname=args.host, port=args.port, topics=args.sub_topic, auth=auth, userdata=userdata)
except (KeyboardInterrupt, Exception) as e:
logging.info("Terminating: ....")
for client in cur_clients.list():
cur_clients.stop_client(client) if client.endswith("_ase") else None
if __name__ == "__main__":
main()