This commit is contained in:
2024-12-26 15:47:21 +01:00
parent 8b8766c609
commit ddd45d7276
10 changed files with 232 additions and 109 deletions

View File

@@ -4,117 +4,148 @@ import sys
import os
import re
from datetime import datetime
import json
import psycopg2
from psycopg2.extras import execute_values
import logging
from sqlalchemy import create_engine, text
import psycopg2.sql
from utils.time import timestamp_fmt as ts
from utils.config import set_config as setting
def conn_db(cfg):
return psycopg2.connect(dbname=cfg.dbname, user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport )
def extract_value(patterns, source, default='Not Defined'):
ip = {}
for pattern in patterns:
pattern = f'r"{pattern}:\s*(\d{1,3}(?:\.\d{1,3}){3})"'
matches = re.search(pattern, source, re.IGNORECASE)
s_pattern = rf'{pattern}:\s*(\d{{1,3}}(?:\.\d{{1,3}}){{3}})'
matches = re.search(s_pattern, source, re.IGNORECASE)
if matches:
ip.update({pattern: matches.group(1)})
else:
ip.update({pattern: default})
return ip
def write_db(engine, records):
with engine.connect() as conn:
conn.execute(text("""
INSERT INTO dataraw (unit_name, unit_type, tool_name, tool_type, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb)
VALUES
""" + ",".join([
f"(:{i}_unit_name, :{i}_unit_type, :{i}_tool_name, :{i}_tool_type, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)"
for i in range(len(records))
]) + """
ON CONFLICT ON CONSTRAINT dataraw_unique
DO UPDATE SET
unit_type = EXCLUDED.unit_type,
tool_type = EXCLUDED.tool_type,
ip_centralina = EXCLUDED.ip_centralina,
ip_gateway = EXCLUDED.ip_gateway,
battery_level = EXCLUDED.battery_level,
temperature = EXCLUDED.temperature,
nodes_jsonb = EXCLUDED.nodes_jsonb;
"""), {f"{i}_{key}": value for i, record in enumerate(records) for key, value in record.items()})
def write_db(records, cfg):
insert_values = [
(
record["unit_name"], record["unit_type"], record["tool_name"], record["tool_type"],
record["unit_ip"], record["unit_subnet"], record["unit_gateway"], record["event_timestamp"],
record["battery_level"], record["temperature"], record["nodes_jsonb"]
)
for record in records
]
conn.commit()
query = f"""
INSERT INTO {cfg.dbschema}.{cfg.dbdataraw} (
unit_name, unit_type, tool_name, tool_type, unit_ip, unit_subnet, unit_gateway,
event_timestamp, battery_level, temperature, nodes_jsonb
)
VALUES %s
ON CONFLICT ON CONSTRAINT dataraw_unique
DO UPDATE SET
unit_type = EXCLUDED.unit_type,
tool_type = EXCLUDED.tool_type,
unit_ip = EXCLUDED.unit_ip,
unit_subnet = EXCLUDED.unit_subnet,
unit_gateway = EXCLUDED.unit_gateway,
battery_level = EXCLUDED.battery_level,
temperature = EXCLUDED.temperature,
nodes_jsonb = EXCLUDED.nodes_jsonb;
"""
def elab_csv(engine, cfg):
with engine.connect() as conn:
cur = conn.cursor()
cur.execute(f'select unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbrectable } r ')
try:
with conn_db(cfg) as conn:
conn.autocommit = True
with conn.cursor() as cur:
try:
execute_values(cur, query, insert_values)
cur.close()
conn.commit()
except psycopg2.Error as e:
logging.error(f'Records not inserted: {e}')
logging.info(f'Exit')
exit()
except Exception as e:
logging.error(f'Records not inserted: {e}')
exit()
unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone()
data_list = str(tool_data).strip("('{\"").strip("\"}\',)").split('","')
# Estrarre le informazioni degli ip dalla header
infos = extract_value(cfg.csv_infos, data_list[:9])
def elab_csv(cfg):
try:
with conn_db(cfg) as conn:
cur = conn.cursor()
cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbschema}.{cfg.dbrectable} where locked = 0 and status = 0')
id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone()
cur.execute(f'update {cfg.dbschema}.{cfg.dbrectable} set locked = 1 where id = {id}')
data_list = str(tool_data).strip("('{\"").strip("\"}\',)").split('","')
# Estrarre le informazioni degli ip dalla header
infos = extract_value(cfg.csv_infos, str(data_list[:9]))
except Exception as e:
logging.error(f'{e}')
records = []
# Elabora le righe dei dati a partire dalla riga 8 in poi
for line in data_list:
if ";|;" in line:
# Rimuovi spazi bianchi o caratteri di nuova riga
input_data = line.strip().replace('\\n', '')
# Definizione dei pattern
timestamp_pattern1 = r'(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2});'
timestamp_pattern2 = r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2});'
# Suddividi la stringa in sezioni usando ";|;" come separatore
parts = input_data.split(';|;')
# Formato desiderato per il timestamp
output_format = "%Y-%m-%d %H:%M:%S"
# Verifica che ci siano almeno tre parti (timestamp, misure e nodi)
if len(parts) < 3:
print(f"Riga non valida: {input_data}")
continue
for line in list(set(data_list)):
if (match := re.search(timestamp_pattern1, line)):
timestamp = datetime.strptime(match.group(1), "%Y/%m/%d %H:%M:%S").strftime(output_format)
elif (match := re.search(timestamp_pattern2, line)):
timestamp = datetime.strptime(match.group(1), "%d/%m/%Y %H:%M:%S").strftime(output_format)
else:
continue
# Estrai la data/ora e le prime misurazioni
timestamp = parts[0]
measurements = parts[1]
line_without_timestamp = (line[match.end():]).strip('|;')
# Estrai i valori di ciascun nodo e formatta i dati come JSON
nodes = parts[2:]
node_list = []
for i, node_data in enumerate(nodes, start=1):
node_dict = {"num": i}
# Dividi ogni nodo in valori separati da ";"
node_values = node_data.split(';')
for j, value in enumerate(node_values, start=0):
# Imposta i valori a -9999 se trovi "Dis."
node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value)
node_list.append(node_dict)
match_values = re.findall(r'[-+]?\d*\.\d+|\d+', line_without_timestamp)
battery_level, temperature = match_values[0], match_values[1]
remainder = ";".join(line_without_timestamp.split(";")[2:]).strip('|;')
# Prepara i dati per l'inserimento/aggiornamento
record = {
"unit_name": unit_name.upper(),
"unit_type": unit_type.upper(),
"tool_name": tool_name.upper(),
"tool_type": tool_type.upper(),
"ip_centralina": infos['IP'],
"ip_subnet": infos['Subnet'],
"ip_gateway": infos['Gateway'],
"Web_port": infos['Web port'],
"Ftp_port": infos['Ftp port'],
"event_timestamp": timestamp,
"battery_level": float(measurements.split(';')[0]),
"temperature": float(measurements.split(';')[1]),
"nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON
}
# Rimuovi spazi bianchi o caratteri di nuova riga
nodes = remainder.strip().replace('\\n', '').split(";|;")
records.append(record)
# Estrai i valori di ciascun nodo e formatta i dati come JSON
node_list = []
for i, node_data in enumerate(nodes, start=1):
node_dict = {"num": i}
# Dividi ogni nodo in valori separati da ";"
node_values = node_data.split(';')
for j, value in enumerate(node_values, start=0):
# Imposta i valori a -9999 se trovi "Dis."
node_dict['val' + str(j)] = -9999 if (value == "Dis." or value == "Err1" or value == "Err2" or value == "---" or value == "NotAv" or value == "No RX" or value == "DMUXe" or value == "CH n. Error" or value == "-") else float(value)
node_list.append(node_dict)
# Se abbiamo raggiunto 500 record, esegui l'inserimento in batch
if len(records) >= 500:
print("raggiunti 500 record scrivo sul db")
write_db(engine, records)
records = []
# Prepara i dati per l'inserimento/aggiornamento
record = {
"unit_name": unit_name.upper(),
"unit_type": unit_type.upper(),
"tool_name": tool_name.upper(),
"tool_type": tool_type.upper(),
"unit_ip": infos['IP'],
"unit_subnet": infos['Subnet'],
"unit_gateway": infos['Gateway'],
"event_timestamp": timestamp,
"battery_level": float(battery_level),
"temperature": float(temperature),
"nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON
}
write_db(engine, records)
records.append(record)
# Se abbiamo raggiunto 500 record, esegui l'inserimento in batch
if len(records) >= 500:
logging.info("Raggiunti 500 record scrivo sul DB")
write_db(records, cfg)
records = []
write_db(records, cfg)
def main():
@@ -123,14 +154,13 @@ def main():
try:
# Configura la connessione al database PostgreSQL
engine = create_engine(f'postgresql://{cfg.dbuser}:{cfg.dbpass}@{cfg.dbhost}:{cfg.dbport}/{cfg.dbschema}')
# Configure logging
logging.basicConfig(
format="%(asctime)s %(message)s",
filename=cfg.logfilename,
filename=cfg.elablog,
level=logging.INFO,
)
elab_csv(engine, cfg)
elab_csv(cfg)
except KeyboardInterrupt:
logging.info(