Files
ase_mqtt/control_mqtt.py
2024-12-25 20:15:03 +01:00

141 lines
5.1 KiB
Python
Executable File

import paho.mqtt.subscribe as subscribe
import paho.mqtt.publish as publish
import subprocess
import argparse
import requests
import json
import sys
import os
import logging
class CurrentClients:
def __init__(self, args):
with open(args.dyn_sec_conf, "r") as file:
data = json.load(file)
self.active_clients_list = [client["username"] for client in data["clients"]]
self.active_clients_pids = {}
self.venv_path = sys.prefix
for username in self.active_clients_list:
if username.endswith("_ase"):
self.start_client(username, args)
logging.info(f"Init start client for {username}")
def list(self):
return self.active_clients_list
def pids(self):
return self.active_clients_pids
def add(self, client, args):
self.active_clients_list.append(client)
self.start_client(client, args) if client.endswith("_ase") else None
def remove(self, client):
self.active_clients_list.remove(client)
self.stop_client(client) if client.endswith("_ase") else None
def start_client(self, client, args):
process = subprocess.Popen(
[f'{self.venv_path}/bin/python3', args.ase_receiver, f'--client={client}'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
'''
for line in process.stdout:
logging.info(f"Subtask stdout: {line.strip()}")
for line in process.stderr:
logging.error(f"Subtask stderr: {line.strip()}")
'''
self.active_clients_pids[client] = process.pid
logging.info(f"Started process for {client}, PID: {process.pid}")
def stop_client(self, client):
logging.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})")
os.kill(self.active_clients_pids[client], 9)
def get_client_list(args, auth):
publish.single(args.pub_topic, '{"commands":[{"command":"listClients"}]}', hostname=args.host, port=args.port, auth=auth)
def create_client(datas, userdata):
get_client_list(userdata['args'], userdata['auth'])
def delete_client(datas, userdata):
get_client_list(userdata['args'], userdata['auth'])
def list_clients(datas, userdata):
list_clients = datas['responses'][0]['data']['clients']
delta_clients_add = set(list_clients) - set(userdata['cur_clients'].list())
[userdata['cur_clients'].add(item, userdata['args']) for item in delta_clients_add]
delta_clients_del = set(userdata['cur_clients'].list()) - set(list_clients)
[userdata['cur_clients'].remove(item) for item in delta_clients_del]
def ctrl_client_mod(client, userdata, message):
command_functions = {
"createClient": create_client,
"deleteClient": delete_client,
"listClients": list_clients,
}
datas = json.loads(message.payload)
target_commands = {"createClient", "deleteClient", "listClients"}
found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands]
if found_command:
command_functions[found_command[0]](datas, userdata)
def get_credentials(args):
url = args.wallet + "get"
data = {
"master_password": os.getenv('WALLET_MASTER_PASSWORD'),
"site": "mqtt_control"
}
response = requests.post(url, json=data)
if response.status_code != 200:
logging.error(f"Error to get pwd from wallet.")
exit(1)
return response.json().get('password')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', default="mqtt")
parser.add_argument('-t', '--sub_topic', default="$CONTROL/dynamic-security/v1/response")
parser.add_argument('-T', '--pub_topic', default="$CONTROL/dynamic-security/v1")
parser.add_argument('-q', '--qos', type=int,default=1)
parser.add_argument('-u', '--username', default="admin")
parser.add_argument('-w', '--wallet', default="http://localhost:5000/")
parser.add_argument('-P', '--port', type=int, default=1883)
parser.add_argument('-L', '--log_level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO')
parser.add_argument('-s', '--dyn_sec_conf', default='./dynamic-security.json')
parser.add_argument('-r', '--ase_receiver', default='./subscribe_ase_receiver.py')
args = parser.parse_args()
logging.basicConfig(
format="- PID: %(process)d %(levelname)8s: %(message)s ",
level=args.log_level
)
auth = {'username': args.username, 'password': get_credentials(args)}
cur_clients = CurrentClients(args)
userdata = {'args': args, 'cur_clients': cur_clients, 'auth': auth}
try:
subscribe.callback(ctrl_client_mod, hostname=args.host, port=args.port,
topics=args.sub_topic,
auth=auth, userdata=userdata)
except (KeyboardInterrupt, Exception) as e:
logging.info("Terminating: ....")
for client in cur_clients.list():
cur_clients.stop_client(client) if client.endswith("_ase") else None
if __name__ == "__main__":
main()