#!/usr/bin/env python3 import sys import os import re from datetime import datetime import json import mysql.connector as mysql import logging from utils.time import timestamp_fmt as ts from utils.config import loader as setting def conn_db(cfg): return mysql.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport ) def extract_value(patterns, source, default='Not Defined'): ip = {} for pattern in patterns: s_pattern = rf'{pattern}:\s*(\d{{1,3}}(?:\.\d{{1,3}}){{3}})' matches = re.search(s_pattern, source, re.IGNORECASE) if matches: ip.update({pattern: matches.group(1)}) else: ip.update({pattern: default}) return ip def write_db(records, cfg): insert_values = [ ( record["unit_name"], record["unit_type"], record["tool_name"], record["tool_type"], record["unit_ip"], record["unit_subnet"], record["unit_gateway"], record["event_timestamp"], record["battery_level"], record["temperature"], record["nodes_jsonb"] ) for record in records ] query = f""" INSERT IGNORE INTO {cfg.dbname}.{cfg.dbdataraw} ( unit_name, unit_type, tool_name, tool_type, unit_ip, unit_subnet, unit_gateway, event_timestamp, battery_level, temperature, nodes_jsonb ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ try: with conn_db(cfg) as conn: conn.autocommit = True with conn.cursor() as cur: try: cur.executemany(query, insert_values) cur.close() conn.commit() except Exception as e: logging.error(f'Records not inserted: {e}') logging.info('Exit') exit() except Exception as e: logging.error(f'Records not inserted: {e}') exit() def elab_csv(cfg): try: with conn_db(cfg) as conn: cur = conn.cursor() cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = 0 limit 1') id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}') data_list = str(tool_data).strip("('{\"").strip("\"}\',)").split('","') # Estrarre le informazioni degli ip dalla header infos = extract_value(cfg.csv_infos, str(data_list[:9])) except Exception as e: logging.error(f'{e}') records = [] # Definizione dei pattern timestamp_pattern1 = r'(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2});' timestamp_pattern2 = r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2});' # Formato desiderato per il timestamp output_format = "%Y-%m-%d %H:%M:%S" for line in list(set(data_list)): if (match := re.search(timestamp_pattern1, line)): timestamp = datetime.strptime(match.group(1), "%Y/%m/%d %H:%M:%S").strftime(output_format) elif (match := re.search(timestamp_pattern2, line)): timestamp = datetime.strptime(match.group(1), "%d/%m/%Y %H:%M:%S").strftime(output_format) else: continue line_without_timestamp = (line[match.end():]).strip('|;') match_values = re.findall(r'[-+]?\d*\.\d+|\d+', line_without_timestamp) battery_level, temperature = match_values[0], match_values[1] remainder = ";".join(line_without_timestamp.split(";")[2:]).strip('|;') # Rimuovi spazi bianchi o caratteri di nuova riga nodes = remainder.strip().replace('\\n', '').split(";|;") # Estrai i valori di ciascun nodo e formatta i dati come JSON node_list = [] for i, node_data in enumerate(nodes, start=1): node_dict = {"num": i} # Dividi ogni nodo in valori separati da ";" node_values = node_data.split(';') for j, value in enumerate(node_values, start=0): # Imposta i valori a -9999 se trovi "Dis." node_dict['val' + str(j)] = -9999 if (value == "Dis." or value == "Err1" or value == "Err2" or value == "---" or value == "NotAv" or value == "No RX" or value == "DMUXe" or value == "CH n. Error" or value == "-") else float(value) node_list.append(node_dict) # Prepara i dati per l'inserimento/aggiornamento record = { "unit_name": unit_name.upper(), "unit_type": unit_type.upper(), "tool_name": tool_name.upper(), "tool_type": tool_type.upper(), "unit_ip": infos['IP'], "unit_subnet": infos['Subnet'], "unit_gateway": infos['Gateway'], "event_timestamp": timestamp, "battery_level": float(battery_level), "temperature": float(temperature), "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON } records.append(record) # Se abbiamo raggiunto 500 record, esegui l'inserimento in batch if len(records) >= 500: logging.info("Raggiunti 500 record scrivo sul DB") write_db(records, cfg) records = [] write_db(records, cfg) def main(): # Load the configuration settings cfg = setting.config() try: # Configura la connessione al database PostgreSQL # Configure logging logging.basicConfig( format="%(asctime)s %(message)s", filename=cfg.elablog, level=logging.INFO, ) elab_csv(cfg) except KeyboardInterrupt: logging.info( "Info: {}.".format("Shutdown requested...exiting") ) except Exception: print( "{} - PID {:>5} >> Error: {}.".format( ts.timestamp("log"), os.getpid(), sys.exc_info()[1] ) ) if __name__ == "__main__": main()