ase receiver implemented

This commit is contained in:
2024-12-24 23:15:00 +01:00
parent 768b1d6db9
commit cf2b6356a9
4 changed files with 101 additions and 93 deletions

View File

@@ -35,7 +35,7 @@ class CurrentClients:
def start_client(self, client, args): def start_client(self, client, args):
process = subprocess.Popen( process = subprocess.Popen(
[f'{self.venv_path}/bin/python3', args.ase_receiver, client], [f'{self.venv_path}/bin/python3', args.ase_receiver, f'-c {client}'],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True text=True

View File

@@ -1 +1,8 @@
WALLET_MASTER_PASSWORD=Ase#2024@wallet! WALLET_MASTER_PASSWORD=Ase#2024@wallet!
DB_NAME=postgres
DB_SCHEMA=mqtt_data
DB_TABLE=received
DB_USER=postgres
DB_PASSWORD=BatManu#171017@mqtt!
DB_HOST=localhost
DB_PORT=5432

View File

@@ -5,6 +5,5 @@ DB_USER=postgres
DB_PASSWORD=BatManu#171017@mqtt! DB_PASSWORD=BatManu#171017@mqtt!
DB_HOST=localhost DB_HOST=localhost
DB_PORT=5432 DB_PORT=5432
DB_SCHEMA_DATA=mqtt_data
DB_TABLE_DATA=received

View File

@@ -1,89 +1,27 @@
import paho.mqtt.subscribe as subscribe import paho.mqtt.subscribe as subscribe
import paho.mqtt.publish as publish
import subprocess
import argparse import argparse
import requests import requests
import psycopg2
import json import json
import sys
import os import os
import logging import logging
class CurrentClients: # Configurazione connessione PostgreSQL
def __init__(self, args): DB_CONFIG = {
with open(args.dyn_sec_conf, "r") as file: "dbname": os.getenv("DB_NAME"),
data = json.load(file) "dbschema": os.getenv("DB_SCHEMA"),
self.active_clients_list = [client["username"] for client in data["clients"]] "dbtable": os.getenv("DB_TABLE"),
self.active_clients_pids = {} "user": os.getenv("DB_USER"),
self.venv_path = sys.prefix "password": os.getenv("DB_PASSWORD"),
for username in self.active_clients_list: "host": os.getenv("DB_HOST"),
if username.endswith("_ase"): "port": os.getenv("DB_PORT")
self.start_client(username, args)
def list(self):
return self.active_clients_list
def pids(self):
return self.active_clients_pids
def add(self, client, args):
self.active_clients_list.append(client)
self.start_client(client, args) if client.endswith("_ase") else None
def remove(self, client):
self.active_clients_list.remove(client)
self.stop_client(client) if client.endswith("_ase") else None
def start_client(self, client, args):
process = subprocess.Popen(
[f'{self.venv_path}/bin/python3', args.ase_receiver, client],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.active_clients_pids[client] = process.pid
logging.info(f"Started process for {client}, PID: {process.pid}")
def stop_client(self, client):
logging.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})")
os.kill(self.active_clients_pids[client], 9)
def get_client_list(args, auth):
publish.single(args.pub_topic, '{"commands":[{"command":"listClients"}]}', hostname=args.host, port=args.port, auth=auth)
def create_client(datas, userdata):
get_client_list(userdata['args'], userdata['auth'])
def delete_client(datas, userdata):
get_client_list(userdata['args'], userdata['auth'])
def list_clients(datas, userdata):
list_clients = datas['responses'][0]['data']['clients']
delta_clients_add = set(list_clients) - set(userdata['cur_clients'].list())
[userdata['cur_clients'].add(item, userdata['args']) for item in delta_clients_add]
delta_clients_del = set(userdata['cur_clients'].list()) - set(list_clients)
[userdata['cur_clients'].remove(item) for item in delta_clients_del]
def ctrl_client_mod(client, userdata, message):
command_functions = {
"createClient": create_client,
"deleteClient": delete_client,
"listClients": list_clients,
} }
datas = json.loads(message.payload)
target_commands = {"createClient", "deleteClient", "listClients"}
found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands]
if found_command:
command_functions[found_command[0]](datas, userdata)
def get_credentials(args): def get_credentials(args):
url = args.wallet + "get" url = args.wallet + "get"
data = { data = {
"master_password": os.getenv('WALLET_MASTER_PASSWORD'), "master_password": os.getenv('WALLET_MASTER_PASSWORD'),
"site": "mqtt_control" "site": f"{args.client}_site"
} }
response = requests.post(url, json=data) response = requests.post(url, json=data)
if response.status_code != 200: if response.status_code != 200:
@@ -92,19 +30,83 @@ def get_credentials(args):
return response.json().get('password') return response.json().get('password')
def get_db_connection():
return psycopg2.connect(
dbname=DB_CONFIG["dbname"],
user=DB_CONFIG["user"],
password=DB_CONFIG["password"],
host=DB_CONFIG["host"],
port=DB_CONFIG["port"]
)
# Inizializza il database
def init_db(args):
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (
id bigserial NOT NULL,
main_topic text NOT NULL,
tt_data jsonb NULL,
created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL
)
PARTITION BY LIST (main_topic);
""")
conn.commit()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_other PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
DEFAULT;
""")
conn.commit()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_{args.client.removesuffix("_ase")} PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
FOR VALUES IN ('{args.client.removesuffix("_ase")}')
""")
conn.commit()
except Exception as e:
logging.error(f"Errore durante l'inizializzazione del database: {e}")
exit(1)
finally:
conn.close()
logging.info("Database inizializzato.")
def create_nested_json(path, data):
keys = path.split('/')[1:]
nested_json = data
for key in reversed(keys):
nested_json = {key: nested_json}
return nested_json
def receive_data(client, userdata, message):
datastore = json.loads(message.payload)
json_data = create_nested_json(message.topic, datastore)
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"""
INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
(main_topic, tt_data)
VALUES
('{userdata['args'].client.removesuffix("_ase")}', '{json.dumps(json_data)}'::jsonb);
""")
conn.commit()
except Exception as e:
logging.error(f"Errore durante l'inserimento dei dati nel database: {e}")
finally:
conn.close()
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', default="mqtt") parser.add_argument('-H', '--host', default="mqtt")
parser.add_argument('-t', '--sub_topic', default="$CONTROL/dynamic-security/v1/response") parser.add_argument('-q', '--qos', type=int,default=2)
parser.add_argument('-T', '--pub_topic', default="$CONTROL/dynamic-security/v1")
parser.add_argument('-q', '--qos', type=int,default=0)
parser.add_argument('-u', '--username', default="admin")
parser.add_argument('-w', '--wallet', default="http://localhost:5000/")
parser.add_argument('-P', '--port', type=int, default=1883) parser.add_argument('-P', '--port', type=int, default=1883)
parser.add_argument('-c', '--client')
parser.add_argument('-w', '--wallet', default="http://mqtt:5000/")
parser.add_argument('-L', '--log_level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO') parser.add_argument('-L', '--log_level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO')
parser.add_argument('-s', '--dyn_sec_conf', default='./dynamic-security.json')
parser.add_argument('-r', '--ase_receiver', default='./subscribe_ase_receiver.py')
args = parser.parse_args() args = parser.parse_args()
@@ -112,19 +114,19 @@ def main():
format=" - PID: %(process)d %(levelname)8s: %(message)s ", format=" - PID: %(process)d %(levelname)8s: %(message)s ",
level=args.log_level level=args.log_level
) )
init_db(args)
auth = {'username': args.username, 'password': get_credentials(args)} auth = {'username': args.client, 'password': get_credentials(args)}
cur_clients = CurrentClients(args) userdata = {'args': args}
userdata = {'args': args, 'cur_clients': cur_clients, 'auth': auth}
try: try:
subscribe.callback(ctrl_client_mod, hostname=args.host, port=args.port, topics=args.sub_topic, auth=auth, userdata=userdata) subscribe.callback(receive_data, hostname=args.host, port=args.port,
topics=f'{args.client.removesuffix("_ase")}/#',
qos=args.qos, clean_session=False,
auth=auth, client_id=f'{args.client.removesuffix("_ase")}_client_ase',
userdata=userdata)
except (KeyboardInterrupt, Exception) as e: except (KeyboardInterrupt, Exception) as e:
logging.info("Terminating: ....") logging.info(f"Terminating: ....{e}")
for client in cur_clients.list():
cur_clients.stop_client(client) if client.endswith("_ase") else None
if __name__ == "__main__": if __name__ == "__main__":
main() main()