Compare commits
3 Commits
1bdddda3b8
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 689fe55522 | |||
| 4619255a32 | |||
| 927578ecd3 |
188
control_mqtt.py
188
control_mqtt.py
@@ -1,4 +1,4 @@
|
|||||||
import paho.mqtt.subscribe as subscribe
|
import paho.mqtt.client as mqtt
|
||||||
import paho.mqtt.publish as publish
|
import paho.mqtt.publish as publish
|
||||||
import subprocess
|
import subprocess
|
||||||
import argparse
|
import argparse
|
||||||
@@ -8,6 +8,10 @@ import sys
|
|||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
# Configurazione Logging
|
||||||
|
logging.basicConfig(level=logging.INFO, format='- PID: %(process)d %(levelname)8s: %(message)s')
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class CurrentClients:
|
class CurrentClients:
|
||||||
def __init__(self, args):
|
def __init__(self, args):
|
||||||
with open(args.dyn_sec_conf, "r") as file:
|
with open(args.dyn_sec_conf, "r") as file:
|
||||||
@@ -17,8 +21,11 @@ class CurrentClients:
|
|||||||
self.venv_path = sys.prefix
|
self.venv_path = sys.prefix
|
||||||
for username in self.active_clients_list:
|
for username in self.active_clients_list:
|
||||||
if username.endswith("_ase"):
|
if username.endswith("_ase"):
|
||||||
self.start_client(username, args)
|
try:
|
||||||
logging.info(f"Init start client for {username}")
|
self.start_client(username, args)
|
||||||
|
logger.info(f"Init start client for {username}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error starting client {username}: {e}")
|
||||||
|
|
||||||
def list(self):
|
def list(self):
|
||||||
return self.active_clients_list
|
return self.active_clients_list
|
||||||
@@ -28,76 +35,117 @@ class CurrentClients:
|
|||||||
|
|
||||||
def add(self, client, args):
|
def add(self, client, args):
|
||||||
self.active_clients_list.append(client)
|
self.active_clients_list.append(client)
|
||||||
self.start_client(client, args) if client.endswith("_ase") else None
|
try:
|
||||||
|
self.start_client(client, args) if client.endswith("_ase") else None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error adding client {client}: {e}")
|
||||||
|
|
||||||
def remove(self, client):
|
def remove(self, client):
|
||||||
self.active_clients_list.remove(client)
|
self.active_clients_list.remove(client)
|
||||||
self.stop_client(client) if client.endswith("_ase") else None
|
try:
|
||||||
|
self.stop_client(client) if client.endswith("_ase") else None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error removing client {client}: {e}")
|
||||||
|
|
||||||
def start_client(self, client, args):
|
def start_client(self, client, args):
|
||||||
process = subprocess.Popen(
|
try:
|
||||||
[f'{self.venv_path}/bin/python3', args.ase_receiver, f'--client={client}'],
|
process = subprocess.Popen(
|
||||||
stdout=subprocess.PIPE,
|
[f'{self.venv_path}/bin/python3', args.ase_receiver, f'--client={client}'],
|
||||||
stderr=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
text=True
|
stderr=subprocess.PIPE,
|
||||||
)
|
text=True
|
||||||
'''
|
)
|
||||||
for line in process.stdout:
|
self.active_clients_pids[client] = process.pid
|
||||||
logging.info(f"Subtask stdout: {line.strip()}")
|
logger.info(f"Started process for {client}, PID: {process.pid}")
|
||||||
|
except Exception as e:
|
||||||
for line in process.stderr:
|
logger.error(f"Error starting process for {client}: {e}")
|
||||||
logging.error(f"Subtask stderr: {line.strip()}")
|
|
||||||
'''
|
|
||||||
self.active_clients_pids[client] = process.pid
|
|
||||||
logging.info(f"Started process for {client}, PID: {process.pid}")
|
|
||||||
|
|
||||||
def stop_client(self, client):
|
def stop_client(self, client):
|
||||||
logging.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})")
|
try:
|
||||||
os.kill(self.active_clients_pids[client], 9)
|
logger.info(f"Terminating process for {client} (PID: {self.active_clients_pids[client]})")
|
||||||
|
os.kill(self.active_clients_pids[client], 9)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error stopping client {client}: {e}")
|
||||||
|
|
||||||
def get_client_list(args, auth):
|
def get_client_list(args, auth):
|
||||||
publish.single(args.pub_topic, '{"commands":[{"command":"listClients"}]}', hostname=args.host, port=args.port, auth=auth)
|
try:
|
||||||
|
publish.single(args.pub_topic, '{"commands":[{"command":"listClients"}]}', hostname=args.host, port=args.port, auth=auth)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error publishing client list request: {e}")
|
||||||
|
|
||||||
def create_client(datas, userdata):
|
def create_client(datas, userdata):
|
||||||
get_client_list(userdata['args'], userdata['auth'])
|
try:
|
||||||
|
get_client_list(userdata['args'], userdata['auth'])
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error creating client: {e}")
|
||||||
|
|
||||||
def delete_client(datas, userdata):
|
def delete_client(datas, userdata):
|
||||||
get_client_list(userdata['args'], userdata['auth'])
|
try:
|
||||||
|
get_client_list(userdata['args'], userdata['auth'])
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error deleting client: {e}")
|
||||||
|
|
||||||
def list_clients(datas, userdata):
|
def list_clients(datas, userdata):
|
||||||
list_clients = datas['responses'][0]['data']['clients']
|
try:
|
||||||
|
list_clients = datas['responses'][0]['data']['clients']
|
||||||
|
|
||||||
delta_clients_add = set(list_clients) - set(userdata['cur_clients'].list())
|
delta_clients_add = set(list_clients) - set(userdata['cur_clients'].list())
|
||||||
[userdata['cur_clients'].add(item, userdata['args']) for item in delta_clients_add]
|
[userdata['cur_clients'].add(item, userdata['args']) for item in delta_clients_add]
|
||||||
|
|
||||||
delta_clients_del = set(userdata['cur_clients'].list()) - set(list_clients)
|
delta_clients_del = set(userdata['cur_clients'].list()) - set(list_clients)
|
||||||
[userdata['cur_clients'].remove(item) for item in delta_clients_del]
|
[userdata['cur_clients'].remove(item) for item in delta_clients_del]
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error listing clients: {e}")
|
||||||
|
|
||||||
def ctrl_client_mod(client, userdata, message):
|
def ctrl_client_mod(client, userdata, message):
|
||||||
command_functions = {
|
try:
|
||||||
"createClient": create_client,
|
command_functions = {
|
||||||
"deleteClient": delete_client,
|
"createClient": create_client,
|
||||||
"listClients": list_clients,
|
"deleteClient": delete_client,
|
||||||
}
|
"listClients": list_clients,
|
||||||
|
}
|
||||||
|
|
||||||
datas = json.loads(message.payload)
|
datas = json.loads(message.payload.decode('utf-8'))
|
||||||
target_commands = {"createClient", "deleteClient", "listClients"}
|
target_commands = {"createClient", "deleteClient", "listClients"}
|
||||||
found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands]
|
found_command = [item["command"] for item in datas['responses'] if item["command"] in target_commands]
|
||||||
if found_command:
|
if found_command:
|
||||||
command_functions[found_command[0]](datas, userdata)
|
command_functions[found_command[0]](datas, userdata)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing client command: {e}")
|
||||||
|
|
||||||
def get_credentials(args):
|
def get_credentials(args):
|
||||||
url = args.wallet + "get"
|
try:
|
||||||
data = {
|
url = args.wallet + "get"
|
||||||
"master_password": os.getenv('WALLET_MASTER_PASSWORD'),
|
data = {
|
||||||
"site": "mqtt_control"
|
"master_password": os.getenv('WALLET_MASTER_PASSWORD'),
|
||||||
}
|
"site": "mqtt_control"
|
||||||
response = requests.post(url, json=data)
|
}
|
||||||
if response.status_code != 200:
|
response = requests.post(url, json=data)
|
||||||
logging.error(f"Error to get pwd from wallet.")
|
if response.status_code != 200:
|
||||||
|
logger.error("Error to get pwd from wallet.")
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
return response.json().get('password')
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting credentials: {e}")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
return response.json().get('password')
|
def on_connect(client, userdata, flags, rc, properties=None):
|
||||||
|
try:
|
||||||
|
if rc == 0:
|
||||||
|
logger.info("Connected successfully")
|
||||||
|
client.subscribe(userdata['args'].sub_topic, userdata['args'].qos)
|
||||||
|
else:
|
||||||
|
logger.error(f"Connection failed with code {rc}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error on connect: {e}")
|
||||||
|
|
||||||
|
def on_message(client, userdata, message):
|
||||||
|
try:
|
||||||
|
#logger.info(f"Received message on {message.topic}: {message.payload}")
|
||||||
|
ctrl_client_mod(client, userdata, message)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error handling message: {e}")
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
@@ -109,32 +157,34 @@ def main():
|
|||||||
parser.add_argument('-u', '--username', default="admin")
|
parser.add_argument('-u', '--username', default="admin")
|
||||||
parser.add_argument('-w', '--wallet', default="http://localhost:5000/")
|
parser.add_argument('-w', '--wallet', default="http://localhost:5000/")
|
||||||
parser.add_argument('-P', '--port', type=int, default=1883)
|
parser.add_argument('-P', '--port', type=int, default=1883)
|
||||||
parser.add_argument('-L', '--log_level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO')
|
|
||||||
parser.add_argument('-s', '--dyn_sec_conf', default='./dynamic-security.json')
|
parser.add_argument('-s', '--dyn_sec_conf', default='./dynamic-security.json')
|
||||||
parser.add_argument('-r', '--ase_receiver', default='./subscribe_ase_receiver.py')
|
parser.add_argument('-r', '--ase_receiver', default='./subscribe_ase_receiver.py')
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
logging.basicConfig(
|
|
||||||
format="- PID: %(process)d %(levelname)8s: %(message)s ",
|
|
||||||
level=args.log_level
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
auth = {'username': args.username, 'password': get_credentials(args)}
|
|
||||||
cur_clients = CurrentClients(args)
|
|
||||||
userdata = {'args': args, 'cur_clients': cur_clients, 'auth': auth}
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
subscribe.callback(ctrl_client_mod, hostname=args.host, port=args.port,
|
auth = {'username': args.username, 'password': get_credentials(args)}
|
||||||
topics=args.sub_topic,
|
cur_clients = CurrentClients(args)
|
||||||
auth=auth, userdata=userdata)
|
userdata = {'args': args, 'cur_clients': cur_clients, 'auth': auth}
|
||||||
|
|
||||||
|
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)
|
||||||
|
client.username_pw_set(auth['username'], auth['password'])
|
||||||
|
|
||||||
|
client.on_connect = on_connect
|
||||||
|
client.on_message = on_message
|
||||||
|
|
||||||
|
client.user_data_set(userdata)
|
||||||
|
|
||||||
|
client.connect(args.host, args.port)
|
||||||
|
client.loop_forever()
|
||||||
except (KeyboardInterrupt, Exception) as e:
|
except (KeyboardInterrupt, Exception) as e:
|
||||||
logging.info("Terminating: ....")
|
logger.info("Terminating: ....")
|
||||||
|
logger.error(f"Error in main loop: {e}")
|
||||||
for client in cur_clients.list():
|
for client_name in cur_clients.list():
|
||||||
cur_clients.stop_client(client) if client.endswith("_ase") else None
|
try:
|
||||||
|
cur_clients.stop_client(client_name) if client_name.endswith("_ase") else None
|
||||||
|
except Exception as stop_e:
|
||||||
|
logger.error(f"Error stopping client {client_name}: {stop_e}")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
@@ -46,7 +46,11 @@ def init_db():
|
|||||||
id SERIAL PRIMARY KEY,
|
id SERIAL PRIMARY KEY,
|
||||||
site TEXT NOT NULL,
|
site TEXT NOT NULL,
|
||||||
username TEXT NOT NULL,
|
username TEXT NOT NULL,
|
||||||
password TEXT NOT NULL
|
password TEXT NOT NULL,
|
||||||
|
client_id TEXT NOT NULL,
|
||||||
|
topic TEXT NOT NULL,
|
||||||
|
created_at timestamptz DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
CONSTRAINT site_user_clientid_unique UNIQUE(site, username, client_id)
|
||||||
)
|
)
|
||||||
""")
|
""")
|
||||||
conn.commit()
|
conn.commit()
|
||||||
@@ -89,14 +93,14 @@ def authenticate(master_password):
|
|||||||
return auth_success
|
return auth_success
|
||||||
|
|
||||||
# Aggiungi una password al database
|
# Aggiungi una password al database
|
||||||
def add_password(site, username, password, cipher):
|
def add_password(site, username, password, client_id, topic, cipher):
|
||||||
conn = get_db_connection()
|
conn = get_db_connection()
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
encrypted_password = cipher.encrypt(password.encode()).decode()
|
encrypted_password = cipher.encrypt(password.encode()).decode()
|
||||||
try:
|
try:
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
f"INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (site, username, password) VALUES (%s, %s, %s)",
|
f"INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (site, username, password, client_id, topic) VALUES (%s, %s, %s, %s, %s)",
|
||||||
(site, username, encrypted_password))
|
(site, username, encrypted_password, client_id, topic))
|
||||||
conn.commit()
|
conn.commit()
|
||||||
logging.info(f"Password aggiunta per il sito: {site}.")
|
logging.info(f"Password aggiunta per il sito: {site}.")
|
||||||
except psycopg2.Error as e:
|
except psycopg2.Error as e:
|
||||||
@@ -109,18 +113,18 @@ def get_password(site, cipher):
|
|||||||
conn = get_db_connection()
|
conn = get_db_connection()
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
try:
|
try:
|
||||||
cursor.execute(f"SELECT username, password FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} WHERE site = %s", (site,))
|
cursor.execute(f"SELECT username, password, client_id, topic FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} WHERE site = %s", (site,))
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
if row:
|
if row:
|
||||||
username, encrypted_password = row
|
username, encrypted_password, client_id, topic = row
|
||||||
decrypted_password = cipher.decrypt(encrypted_password.encode()).decode()
|
decrypted_password = cipher.decrypt(encrypted_password.encode()).decode()
|
||||||
logging.info(f"Password recuperata per il sito: {site}.")
|
logging.info(f"Password recuperata per il sito: {site}.")
|
||||||
return username, decrypted_password
|
return username, decrypted_password, client_id, topic
|
||||||
logging.warning(f"Sito non trovato: {site}.")
|
logging.warning(f"Sito non trovato: {site}.")
|
||||||
return None, None
|
return None, None, None, None
|
||||||
except psycopg2.Error as e:
|
except psycopg2.Error as e:
|
||||||
logging.error(f"Errore durante il recupero della password: {e}")
|
logging.error(f"Errore durante il recupero della password: {e}")
|
||||||
return None, None
|
return None, None, None, None
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
@@ -162,6 +166,8 @@ def add_password_api():
|
|||||||
site = request.json.get('site')
|
site = request.json.get('site')
|
||||||
username = request.json.get('username')
|
username = request.json.get('username')
|
||||||
password = request.json.get('password')
|
password = request.json.get('password')
|
||||||
|
client_id = request.json.get('client_id')
|
||||||
|
topic = request.json.get('topic')
|
||||||
|
|
||||||
if not authenticate(master_password):
|
if not authenticate(master_password):
|
||||||
logging.warning("Tentativo di aggiungere una password con master password errata.")
|
logging.warning("Tentativo di aggiungere una password con master password errata.")
|
||||||
@@ -169,7 +175,7 @@ def add_password_api():
|
|||||||
|
|
||||||
key = derive_key(master_password)
|
key = derive_key(master_password)
|
||||||
cipher = Fernet(key)
|
cipher = Fernet(key)
|
||||||
add_password(site, username, password, cipher)
|
add_password(site, username, password, client_id, topic, cipher)
|
||||||
return jsonify({"message": "Password aggiunta con successo"})
|
return jsonify({"message": "Password aggiunta con successo"})
|
||||||
|
|
||||||
# Endpoint per recuperare una password
|
# Endpoint per recuperare una password
|
||||||
@@ -184,12 +190,12 @@ def get_password_api():
|
|||||||
|
|
||||||
key = derive_key(master_password)
|
key = derive_key(master_password)
|
||||||
cipher = Fernet(key)
|
cipher = Fernet(key)
|
||||||
username, password = get_password(site, cipher)
|
username, password, client_id, topic = get_password(site, cipher)
|
||||||
|
|
||||||
if username is None:
|
if username is None:
|
||||||
return jsonify({"error": "Sito non trovato"}), 404
|
return jsonify({"error": "Sito non trovato"}), 404
|
||||||
|
|
||||||
return jsonify({"site": site, "username": username, "password": password})
|
return jsonify({"site": site, "username": username, "password": password, "client_id": client_id, "topic": topic})
|
||||||
|
|
||||||
# Endpoint per cancellare una password
|
# Endpoint per cancellare una password
|
||||||
@app.route('/delete', methods=['POST'])
|
@app.route('/delete', methods=['POST'])
|
||||||
|
|||||||
@@ -1,14 +1,15 @@
|
|||||||
import paho.mqtt.subscribe as subscribe
|
import paho.mqtt.client as mqtt
|
||||||
|
from paho.mqtt.properties import Properties
|
||||||
|
from paho.mqtt.packettypes import PacketTypes
|
||||||
import argparse
|
import argparse
|
||||||
import requests
|
import requests
|
||||||
import psycopg2
|
import psycopg2
|
||||||
import json
|
import json
|
||||||
import sys
|
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
# Configurazione Logging
|
# Configurazione Logging
|
||||||
logging.basicConfig(level=logging.INFO, format='- PID: %(process)d %(levelname)8s: %(message)s', stream=sys.stderr)
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - PID: %(process)d %(levelname)8s: %(message)s', filename="/var/log/ase_receiver.log")
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
|
|
||||||
# Configurazione connessione PostgreSQL
|
# Configurazione connessione PostgreSQL
|
||||||
@@ -25,15 +26,15 @@ DB_CONFIG = {
|
|||||||
def get_credentials(args):
|
def get_credentials(args):
|
||||||
url = args.wallet + "get"
|
url = args.wallet + "get"
|
||||||
data = {
|
data = {
|
||||||
"master_password": os.getenv('WALLET_MASTER_PASSWORD'),
|
"master_password": os.getenv('WALLET_MASTER_PASSWORD'),
|
||||||
"site": f"{args.client}_site"
|
"site": f"{args.client}_site"
|
||||||
}
|
}
|
||||||
response = requests.post(url, json=data)
|
response = requests.post(url, json=data)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
logger.error(f"Error to get pwd from wallet.")
|
logger.error("Error to get pwd from wallet.")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
return response.json().get('password')
|
return response.json().get('password'), response.json().get('client_id'), response.json().get('topic')
|
||||||
|
|
||||||
def get_db_connection():
|
def get_db_connection():
|
||||||
return psycopg2.connect(
|
return psycopg2.connect(
|
||||||
@@ -44,8 +45,7 @@ def get_db_connection():
|
|||||||
port=DB_CONFIG["port"]
|
port=DB_CONFIG["port"]
|
||||||
)
|
)
|
||||||
|
|
||||||
# Inizializza il database
|
def init_db(args, main_topic):
|
||||||
def init_db(args):
|
|
||||||
try:
|
try:
|
||||||
conn = get_db_connection()
|
conn = get_db_connection()
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
@@ -54,7 +54,8 @@ def init_db(args):
|
|||||||
id bigserial NOT NULL,
|
id bigserial NOT NULL,
|
||||||
main_topic text NOT NULL,
|
main_topic text NOT NULL,
|
||||||
tt_data jsonb NULL,
|
tt_data jsonb NULL,
|
||||||
created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL
|
created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL,
|
||||||
|
CONSTRAINT {DB_CONFIG['dbtable']}_pkey PRIMARY KEY (id, main_topic)
|
||||||
)
|
)
|
||||||
PARTITION BY LIST (main_topic);
|
PARTITION BY LIST (main_topic);
|
||||||
""")
|
""")
|
||||||
@@ -65,8 +66,8 @@ def init_db(args):
|
|||||||
""")
|
""")
|
||||||
conn.commit()
|
conn.commit()
|
||||||
cursor.execute(f"""
|
cursor.execute(f"""
|
||||||
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_{args.client.removesuffix("_ase")} PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
|
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}_{main_topic} PARTITION OF {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
|
||||||
FOR VALUES IN ('{args.client.removesuffix("_ase")}')
|
FOR VALUES IN ('{main_topic}')
|
||||||
""")
|
""")
|
||||||
conn.commit()
|
conn.commit()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -77,15 +78,22 @@ def init_db(args):
|
|||||||
logger.info("Database inizializzato.")
|
logger.info("Database inizializzato.")
|
||||||
|
|
||||||
def create_nested_json(path, data):
|
def create_nested_json(path, data):
|
||||||
keys = path.split('/')[1:]
|
main_topic = path.split('/')[0]
|
||||||
|
keys = path.split('/')[1:]
|
||||||
nested_json = data
|
nested_json = data
|
||||||
for key in reversed(keys):
|
for key in reversed(keys):
|
||||||
nested_json = {key: nested_json}
|
nested_json = {key: nested_json}
|
||||||
return nested_json
|
return main_topic, nested_json
|
||||||
|
|
||||||
def receive_data(client, userdata, message):
|
def on_connect(client, userdata, flags, rc, properties):
|
||||||
|
if rc == 0:
|
||||||
|
logger.info("Connesso al broker MQTT")
|
||||||
|
else:
|
||||||
|
logger.error(f"Errore di connessione, codice: {rc}")
|
||||||
|
|
||||||
|
def on_message(client, userdata, message):
|
||||||
datastore = json.loads(message.payload)
|
datastore = json.loads(message.payload)
|
||||||
json_data = create_nested_json(message.topic, datastore)
|
main_topic, json_data = create_nested_json(message.topic, datastore)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn = get_db_connection()
|
conn = get_db_connection()
|
||||||
@@ -94,7 +102,7 @@ def receive_data(client, userdata, message):
|
|||||||
INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
|
INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}
|
||||||
(main_topic, tt_data)
|
(main_topic, tt_data)
|
||||||
VALUES
|
VALUES
|
||||||
('{userdata['args'].client.removesuffix("_ase")}', '{json.dumps(json_data)}'::jsonb);
|
('{main_topic}', '{json.dumps(json_data)}'::jsonb);
|
||||||
""")
|
""")
|
||||||
conn.commit()
|
conn.commit()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -102,31 +110,57 @@ def receive_data(client, userdata, message):
|
|||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
def on_disconnect(client, userdata, rc, properties=None):
|
||||||
|
if rc != 0:
|
||||||
|
logger.warning(f"Disconnesso dal broker con codice {rc}. Riconnessione...")
|
||||||
|
client.reconnect()
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
|
|
||||||
parser.add_argument('-H', '--host', default="mqtt")
|
parser.add_argument('-H', '--host', default="mqtt")
|
||||||
parser.add_argument('-q', '--qos', type=int,default=2)
|
parser.add_argument('-q', '--qos', type=int, default=1)
|
||||||
parser.add_argument('-P', '--port', type=int, default=1883)
|
parser.add_argument('-P', '--port', type=int, default=1883)
|
||||||
parser.add_argument('-c', '--client')
|
parser.add_argument('-c', '--client')
|
||||||
parser.add_argument('-w', '--wallet', default="http://mqtt:5000/")
|
parser.add_argument('-w', '--wallet', default="http://mqtt:5000/")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
init_db(args)
|
|
||||||
|
|
||||||
auth = {'username': args.client, 'password': get_credentials(args)}
|
password, client_id, topic = get_credentials(args)
|
||||||
|
|
||||||
|
main_topic = topic.split('/')[0]
|
||||||
|
|
||||||
|
init_db(args, main_topic)
|
||||||
|
|
||||||
userdata = {'args': args}
|
userdata = {'args': args}
|
||||||
|
|
||||||
|
properties=Properties(PacketTypes.CONNECT)
|
||||||
|
properties.SessionExpiryInterval=3600
|
||||||
|
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id, protocol=mqtt.MQTTv5)
|
||||||
|
client.username_pw_set(username=args.client, password=password)
|
||||||
|
client.user_data_set(userdata)
|
||||||
|
#client.logger = logger
|
||||||
|
|
||||||
|
client.on_connect = on_connect
|
||||||
|
client.on_message = on_message
|
||||||
|
client.on_disconnect = on_disconnect
|
||||||
|
|
||||||
|
client.reconnect_delay_set(min_delay=1, max_delay=120)
|
||||||
|
client.connect(args.host, args.port, clean_start=False) #, properties=properties)
|
||||||
|
|
||||||
|
client.subscribe(topic, qos=args.qos)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
subscribe.callback(receive_data, hostname=args.host, port=args.port,
|
logger.info("Avvio del loop MQTT.")
|
||||||
topics=f'{args.client.removesuffix("_ase")}/#',
|
client.loop_forever()
|
||||||
qos=args.qos, clean_session=False,
|
except KeyboardInterrupt:
|
||||||
auth=auth, client_id=f'{args.client.removesuffix("_ase")}_client_ase',
|
logger.info("Terminazione manuale.")
|
||||||
userdata=userdata)
|
except Exception as e:
|
||||||
except (KeyboardInterrupt, Exception) as e:
|
logger.error(f"Errore durante il ciclo MQTT: {e}")
|
||||||
logger.info(f"Terminating: ....{e}")
|
finally:
|
||||||
|
client.disconnect()
|
||||||
|
logger.info("Disconnesso dal broker.")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
Reference in New Issue
Block a user