fix caricamenti
This commit is contained in:
@@ -12,8 +12,11 @@ CREATE TABLE `received` (
|
|||||||
`status` int DEFAULT '0',
|
`status` int DEFAULT '0',
|
||||||
`matlab_timestamp` timestamp NULL DEFAULT NULL,
|
`matlab_timestamp` timestamp NULL DEFAULT NULL,
|
||||||
`inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
|
`inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
`loaded_at` datetime DEFAULT NULL,
|
`loaded_at` timestamp DEFAULT NULL,
|
||||||
`elaborated_at` timestamp NULL DEFAULT NULL,
|
`elaborated_at` timestamp NULL DEFAULT NULL,
|
||||||
`sent_at` timestamp NULL DEFAULT NULL,
|
`sent_raw_at` timestamp NULL DEFAULT NULL,
|
||||||
|
`sent_elab_at` timestamp NULL DEFAULT NULL,
|
||||||
|
`last_update_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||||
PRIMARY KEY (`id`)
|
PRIMARY KEY (`id`)
|
||||||
) ENGINE=InnoDB AUTO_INCREMENT=694 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
|
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
|
||||||
|
|
||||||
|
|||||||
@@ -3,18 +3,14 @@
|
|||||||
# Import necessary libraries
|
# Import necessary libraries
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
# Import custom modules for configuration and database connection
|
# Import custom modules for configuration and database connection
|
||||||
from utils.config import loader_matlab_elab as setting
|
from utils.config import loader_matlab_elab as setting
|
||||||
from utils.database import DATA_LOADED, DATA_ELABORATED, DATA_SENT
|
from utils.database import WorkflowFlags
|
||||||
from utils.database.matlab_query import get_matlab_command
|
from utils.database.matlab_query import get_matlab_command
|
||||||
from utils.csv.loaders import get_next_csv_atomic
|
from utils.csv.loaders import get_next_csv_atomic
|
||||||
from utils.orchestrator_utils import run_orchestrator, worker_context
|
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||||
from utils.database.loader_action import update_status, unlock
|
from utils.database.loader_action import update_status, unlock
|
||||||
from utils.database.elab_query import get_data_as_csv
|
|
||||||
#from utils.ftp.elab_send import send_csv_to_customer
|
|
||||||
|
|
||||||
|
|
||||||
# Initialize the logger for this module
|
# Initialize the logger for this module
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
@@ -46,7 +42,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
|||||||
try:
|
try:
|
||||||
logger.info("Inizio elaborazione")
|
logger.info("Inizio elaborazione")
|
||||||
|
|
||||||
record = await get_next_csv_atomic(pool, cfg.dbrectable, DATA_LOADED)
|
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED)
|
||||||
|
|
||||||
if record:
|
if record:
|
||||||
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
||||||
@@ -56,7 +52,6 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
|||||||
logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ")
|
logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ")
|
||||||
|
|
||||||
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}"
|
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}"
|
||||||
timestamp_matlab_elab = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
proc = await asyncio.create_subprocess_shell(
|
proc = await asyncio.create_subprocess_shell(
|
||||||
matlab_cmd,
|
matlab_cmd,
|
||||||
cwd=cfg.matlab_func_path,
|
cwd=cfg.matlab_func_path,
|
||||||
@@ -73,17 +68,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
|||||||
f.write(stderr.decode().strip())
|
f.write(stderr.decode().strip())
|
||||||
else:
|
else:
|
||||||
logger.info(stdout.decode().strip())
|
logger.info(stdout.decode().strip())
|
||||||
await update_status(cfg, id, DATA_ELABORATED, pool)
|
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
|
||||||
if tool_elab_info['ftp_send']:
|
|
||||||
if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab:
|
|
||||||
if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
|
|
||||||
print(elab_csv)
|
|
||||||
#if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
|
|
||||||
#await update_status(cfg, id, DATA_SENT, pool)
|
|
||||||
await update_status(cfg, id, DATA_SENT, pool)
|
|
||||||
else:
|
|
||||||
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.")
|
|
||||||
|
|
||||||
await unlock(cfg, id, pool)
|
await unlock(cfg, id, pool)
|
||||||
await asyncio.sleep(ELAB_PROCESSING_DELAY)
|
await asyncio.sleep(ELAB_PROCESSING_DELAY)
|
||||||
else:
|
else:
|
||||||
|
|||||||
2
env/elab.ini
vendored
2
env/elab.ini
vendored
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
[tool]
|
[tool]
|
||||||
# stati in minuscolo
|
# stati in minuscolo
|
||||||
elab_status = active|manual update
|
elab_status = active|manual upload
|
||||||
|
|
||||||
[matlab]
|
[matlab]
|
||||||
#runtime = /usr/local/MATLAB/MATLAB_Runtime/v93
|
#runtime = /usr/local/MATLAB/MATLAB_Runtime/v93
|
||||||
|
|||||||
1
env/ftp.ini
vendored
1
env/ftp.ini
vendored
@@ -2,6 +2,7 @@
|
|||||||
# python3 -c 'from hashlib import sha256;print(sha256("????password???".encode("UTF-8")).hexdigest())'
|
# python3 -c 'from hashlib import sha256;print(sha256("????password???".encode("UTF-8")).hexdigest())'
|
||||||
|
|
||||||
[ftpserver]
|
[ftpserver]
|
||||||
|
service_port = 2121
|
||||||
firstPort = 40000
|
firstPort = 40000
|
||||||
proxyAddr = 0.0.0.0
|
proxyAddr = 0.0.0.0
|
||||||
portRangeWidth = 500
|
portRangeWidth = 500
|
||||||
|
|||||||
5
env/send.ini
vendored
Normal file
5
env/send.ini
vendored
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
[logging]
|
||||||
|
logFilename = ./send_data.log
|
||||||
|
|
||||||
|
[threads]
|
||||||
|
max_num = 5
|
||||||
@@ -136,7 +136,7 @@ def main():
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Create and start the FTP server
|
# Create and start the FTP server
|
||||||
server = FTPServer(("0.0.0.0", 2121), handler)
|
server = FTPServer(("0.0.0.0", cfg.service_port), handler)
|
||||||
server.serve_forever()
|
server.serve_forever()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import asyncio
|
|||||||
|
|
||||||
# Import custom modules for configuration and database connection
|
# Import custom modules for configuration and database connection
|
||||||
from utils.config import loader_load_data as setting
|
from utils.config import loader_load_data as setting
|
||||||
from utils.database import CSV_RECEIVED
|
from utils.database import WorkflowFlags
|
||||||
from utils.csv.loaders import get_next_csv_atomic
|
from utils.csv.loaders import get_next_csv_atomic
|
||||||
from utils.orchestrator_utils import run_orchestrator, worker_context
|
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||||
|
|
||||||
@@ -33,14 +33,13 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
|||||||
# Imposta il context per questo worker
|
# Imposta il context per questo worker
|
||||||
worker_context.set(f"W{worker_id:02d}")
|
worker_context.set(f"W{worker_id:02d}")
|
||||||
|
|
||||||
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
|
||||||
logger.info("Avviato")
|
logger.info("Avviato")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
logger.info("Inizio elaborazione")
|
logger.info("Inizio elaborazione")
|
||||||
|
|
||||||
record = await get_next_csv_atomic(pool, cfg.dbrectable, CSV_RECEIVED)
|
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.DATA_LOADED)
|
||||||
|
|
||||||
if record:
|
if record:
|
||||||
success = await load_csv(record, cfg, pool)
|
success = await load_csv(record, cfg, pool)
|
||||||
@@ -52,7 +51,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
|||||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
|
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=1)
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
@@ -67,6 +66,7 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
|
|||||||
Returns:
|
Returns:
|
||||||
True se l'elaborazione del CSV è avvenuta con successo, False altrimenti.
|
True se l'elaborazione del CSV è avvenuta con successo, False altrimenti.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
||||||
logger.debug("Inizio ricerca nuovo CSV da elaborare")
|
logger.debug("Inizio ricerca nuovo CSV da elaborare")
|
||||||
|
|
||||||
|
|||||||
68
send_orchestrator.py
Executable file
68
send_orchestrator.py
Executable file
@@ -0,0 +1,68 @@
|
|||||||
|
#!.venv/bin/python
|
||||||
|
|
||||||
|
# Import necessary libraries
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
# Import custom modules for configuration and database connection
|
||||||
|
from utils.config import loader_send_data as setting
|
||||||
|
from utils.database import WorkflowFlags
|
||||||
|
from utils.csv.loaders import get_next_csv_atomic
|
||||||
|
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||||
|
from utils.database.loader_action import update_status, unlock
|
||||||
|
from utils.database.elab_query import get_data_as_csv
|
||||||
|
#from utils.ftp.elab_send import send_csv_to_customer
|
||||||
|
|
||||||
|
|
||||||
|
# Initialize the logger for this module
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
# Delay tra un processamento CSV e il successivo (in secondi)
|
||||||
|
ELAB_PROCESSING_DELAY = 0.2
|
||||||
|
# Tempo di attesa se non ci sono record da elaborare
|
||||||
|
NO_RECORD_SLEEP = 60
|
||||||
|
|
||||||
|
async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
||||||
|
|
||||||
|
# Imposta il context per questo worker
|
||||||
|
worker_context.set(f"W{worker_id:02d}")
|
||||||
|
|
||||||
|
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
||||||
|
logger.info("Avviato")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
logger.info("Inizio elaborazione")
|
||||||
|
|
||||||
|
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.SENT_RAW_DATA)
|
||||||
|
|
||||||
|
if record:
|
||||||
|
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
||||||
|
|
||||||
|
'''
|
||||||
|
if tool_elab_info['ftp_send']:
|
||||||
|
if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab:
|
||||||
|
if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
|
||||||
|
print(elab_csv)
|
||||||
|
#if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
|
||||||
|
#await update_status(cfg, id, , pool)
|
||||||
|
#await update_status(cfg, id, , pool)
|
||||||
|
else:
|
||||||
|
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.")
|
||||||
|
'''
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.info("Nessun record disponibile")
|
||||||
|
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""Funzione principale che avvia il send_orchestrator."""
|
||||||
|
await run_orchestrator(setting.Config, worker)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
@@ -10,6 +10,7 @@ class Config:
|
|||||||
c.read(["env/ftp.ini", "env/db.ini"])
|
c.read(["env/ftp.ini", "env/db.ini"])
|
||||||
|
|
||||||
# FTP setting
|
# FTP setting
|
||||||
|
self.service_port = c.getint("ftpserver", "service_port")
|
||||||
self.firstport = c.getint("ftpserver", "firstPort")
|
self.firstport = c.getint("ftpserver", "firstPort")
|
||||||
self.proxyaddr = c.get("ftpserver", "proxyAddr")
|
self.proxyaddr = c.get("ftpserver", "proxyAddr")
|
||||||
self.portrangewidth = c.getint("ftpserver", "portRangeWidth")
|
self.portrangewidth = c.getint("ftpserver", "portRangeWidth")
|
||||||
|
|||||||
31
utils/config/loader_send_data.py
Normal file
31
utils/config/loader_send_data.py
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
"""set configurations
|
||||||
|
|
||||||
|
"""
|
||||||
|
from configparser import ConfigParser
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
def __init__(self):
|
||||||
|
|
||||||
|
c = ConfigParser()
|
||||||
|
c.read(["env/send.ini", "env/db.ini"])
|
||||||
|
|
||||||
|
# LOG setting
|
||||||
|
self.logfilename = c.get("logging", "logFilename")
|
||||||
|
|
||||||
|
# Worker setting
|
||||||
|
self.max_threads = c.getint("threads", "max_num")
|
||||||
|
|
||||||
|
# DB setting
|
||||||
|
self.dbhost = c.get("db", "hostname")
|
||||||
|
self.dbport = c.getint("db", "port")
|
||||||
|
self.dbuser = c.get("db", "user")
|
||||||
|
self.dbpass = c.get("db", "password")
|
||||||
|
self.dbname = c.get("db", "dbName")
|
||||||
|
self.max_retries = c.getint("db", "maxRetries")
|
||||||
|
|
||||||
|
# Tables
|
||||||
|
self.dbusertable = c.get("tables", "userTableName")
|
||||||
|
self.dbrectable = c.get("tables", "recTableName")
|
||||||
|
self.dbrawdata = c.get("tables", "rawTableName")
|
||||||
|
self.dbrawdata = c.get("tables", "rawTableName")
|
||||||
|
self.dbnodes = c.get("tables", "nodesTableName")
|
||||||
@@ -40,7 +40,14 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list:
|
|||||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||||
righe = ToolData.splitlines()
|
righe = ToolData.splitlines()
|
||||||
matrice_valori = []
|
matrice_valori = []
|
||||||
for riga in [riga for riga in righe if ';|;' in riga]:
|
"""
|
||||||
|
Ciclo su tutte le righe del file CSV, escludendo quelle che:
|
||||||
|
non hanno il pattern ';|;' perché non sono dati ma è la header
|
||||||
|
che hanno il pattern 'No RX' perché sono letture non pervenute o in errore
|
||||||
|
che hanno il pattern '.-' perché sono letture con un numero errato - negativo dopo la virgola
|
||||||
|
che hanno il pattern 'File Creation' perché vuol dire che c'è stato un errore della centralina
|
||||||
|
"""
|
||||||
|
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga]:
|
||||||
timestamp, batlevel, temperature, rilevazioni = riga.split(';',3)
|
timestamp, batlevel, temperature, rilevazioni = riga.split(';',3)
|
||||||
EventDate, EventTime = timestamp.split(' ')
|
EventDate, EventTime = timestamp.split(' ')
|
||||||
if batlevel == '|':
|
if batlevel == '|':
|
||||||
@@ -70,7 +77,7 @@ async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list:
|
|||||||
list: A list of lists, where each inner list represents a row in the matrix.
|
list: A list of lists, where each inner list represents a row in the matrix.
|
||||||
"""
|
"""
|
||||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||||
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
|
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
|
||||||
righe = ToolData.splitlines()
|
righe = ToolData.splitlines()
|
||||||
matrice_valori = []
|
matrice_valori = []
|
||||||
pattern = r'^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$'
|
pattern = r'^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$'
|
||||||
@@ -104,10 +111,10 @@ async def make_channels_matrix(cfg: object, id: int, pool: object) -> list:
|
|||||||
list: A list of lists, where each inner list represents a row in the matrix.
|
list: A list of lists, where each inner list represents a row in the matrix.
|
||||||
"""
|
"""
|
||||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||||
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
|
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
|
||||||
righe = ToolData.splitlines()
|
righe = ToolData.splitlines()
|
||||||
matrice_valori = []
|
matrice_valori = []
|
||||||
for riga in [riga for riga in righe if ';|;' in riga]:
|
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga]:
|
||||||
timestamp, batlevel, temperature, rilevazioni = riga.replace(';|;',';').split(';',3)
|
timestamp, batlevel, temperature, rilevazioni = riga.replace(';|;',';').split(';',3)
|
||||||
EventDate, EventTime = timestamp.split(' ')
|
EventDate, EventTime = timestamp.split(' ')
|
||||||
valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|']
|
valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|']
|
||||||
@@ -132,10 +139,10 @@ async def make_musa_matrix(cfg: object, id: int, pool: object) -> list:
|
|||||||
list: A list of lists, where each inner list represents a row in the matrix.
|
list: A list of lists, where each inner list represents a row in the matrix.
|
||||||
"""
|
"""
|
||||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||||
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
|
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
|
||||||
righe = ToolData.splitlines()
|
righe = ToolData.splitlines()
|
||||||
matrice_valori = []
|
matrice_valori = []
|
||||||
for riga in [riga for riga in righe if ';|;' in riga]:
|
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga]:
|
||||||
timestamp, batlevel, rilevazioni = riga.replace(';|;',';').split(';',2)
|
timestamp, batlevel, rilevazioni = riga.replace(';|;',';').split(';',2)
|
||||||
if timestamp == '':
|
if timestamp == '':
|
||||||
continue
|
continue
|
||||||
@@ -194,17 +201,21 @@ async def make_gd_matrix(cfg: object, id: int, pool: object) -> list:
|
|||||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||||
righe = ToolData.splitlines()
|
righe = ToolData.splitlines()
|
||||||
matrice_valori = []
|
matrice_valori = []
|
||||||
pattern = r'^-\d*dB$'
|
pattern = r';-?\d+dB$'
|
||||||
for riga in [riga for riga in righe if ';|;' in riga]:
|
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga]:
|
||||||
timestamp, batlevel, temperature, rilevazioni = riga.split(';',3)
|
timestamp, rilevazioni = riga.split(';|;',1)
|
||||||
EventDate, EventTime = timestamp.split(' ')
|
EventDate, EventTime = timestamp.split(' ')
|
||||||
if batlevel == '|':
|
logger.info(f"GD id {id}: {pattern} {rilevazioni}")
|
||||||
batlevel = temperature
|
if re.search(pattern, rilevazioni):
|
||||||
temperature, rilevazioni = rilevazioni.split(';',1)
|
batlevel, temperature, rssi = rilevazioni.split(';')
|
||||||
if re.match(pattern, rilevazioni):
|
logger.info(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}")
|
||||||
valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;'
|
elif all(char == ';' for char in rilevazioni):
|
||||||
for num_nodo, valori_nodo in enumerate(valori_nodi, start=1):
|
pass
|
||||||
valori = valori_nodo.split(';')
|
elif ';|;' in rilevazioni:
|
||||||
matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori))))
|
unit_metrics, data = rilevazioni.split(';|;')
|
||||||
|
batlevel, temperature = unit_metrics.split(';')
|
||||||
|
logger.info(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"GD id {id}: dati non trattati - {rilevazioni}")
|
||||||
|
|
||||||
return matrice_valori
|
return matrice_valori
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
from utils.database.loader_action import load_data, update_status, unlock
|
from utils.database.loader_action import load_data, update_status, unlock
|
||||||
from utils.database import DATA_LOADED
|
from utils.database import WorkflowFlags
|
||||||
from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix
|
from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
@@ -32,13 +32,13 @@ async def main_loader(cfg: object, id: int, pool: object, action: str) -> None:
|
|||||||
logger.info("matrice valori creata")
|
logger.info("matrice valori creata")
|
||||||
# Load the data into the database
|
# Load the data into the database
|
||||||
if await load_data(cfg, matrice_valori, pool):
|
if await load_data(cfg, matrice_valori, pool):
|
||||||
await update_status(cfg, id, DATA_LOADED, pool)
|
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
|
||||||
await unlock(cfg, id, pool)
|
await unlock(cfg, id, pool)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Action '{action}' non riconosciuta.")
|
logger.warning(f"Action '{action}' non riconosciuta.")
|
||||||
|
|
||||||
|
|
||||||
async def get_next_csv_atomic(pool, table_name, status):
|
async def get_next_csv_atomic(pool, table_name, status, next_status):
|
||||||
"""Preleva atomicamente il prossimo CSV da elaborare"""
|
"""Preleva atomicamente il prossimo CSV da elaborare"""
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
# IMPORTANTE: Disabilita autocommit per questa transazione
|
# IMPORTANTE: Disabilita autocommit per questa transazione
|
||||||
@@ -47,14 +47,17 @@ async def get_next_csv_atomic(pool, table_name, status):
|
|||||||
try:
|
try:
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
# Usa SELECT FOR UPDATE per lock atomico
|
# Usa SELECT FOR UPDATE per lock atomico
|
||||||
|
|
||||||
await cur.execute(f"""
|
await cur.execute(f"""
|
||||||
SELECT id, unit_type, tool_type, unit_name, tool_name
|
SELECT id, unit_type, tool_type, unit_name, tool_name
|
||||||
FROM {table_name}
|
FROM {table_name}
|
||||||
WHERE locked = 0 AND status = %s
|
WHERE locked = 0
|
||||||
|
AND ((status & %s) > 0 OR %s = 0)
|
||||||
|
AND (status & %s) = 0
|
||||||
ORDER BY id
|
ORDER BY id
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
FOR UPDATE SKIP LOCKED
|
FOR UPDATE SKIP LOCKED
|
||||||
""", (status,))
|
""", (status, status, next_status))
|
||||||
|
|
||||||
result = await cur.fetchone()
|
result = await cur.fetchone()
|
||||||
if result:
|
if result:
|
||||||
|
|||||||
@@ -1,4 +1,18 @@
|
|||||||
CSV_RECEIVED = 0
|
class WorkflowFlags:
|
||||||
DATA_LOADED = 1
|
CSV_RECEIVED = 0 # 0000
|
||||||
DATA_ELABORATED = 2
|
DATA_LOADED = 1 # 0001
|
||||||
DATA_SENT = 3
|
DATA_ELABORATED = 2 # 0010
|
||||||
|
SENT_RAW_DATA = 4 # 0100
|
||||||
|
SENT_ELAB_DATA = 8 # 1000
|
||||||
|
|
||||||
|
|
||||||
|
# Mappatura flag -> colonna timestamp
|
||||||
|
FLAG_TO_TIMESTAMP = {
|
||||||
|
WorkflowFlags.CSV_RECEIVED: "inserted_at",
|
||||||
|
WorkflowFlags.DATA_LOADED: "loaded_at",
|
||||||
|
WorkflowFlags.DATA_ELABORATED: "elaborated_at",
|
||||||
|
WorkflowFlags.SENT_RAW_DATA: "sent_raw_at",
|
||||||
|
WorkflowFlags.SENT_ELAB_DATA: "sent_elab_at"
|
||||||
|
}
|
||||||
|
|
||||||
|
BATCH_SIZE = 1000
|
||||||
@@ -2,11 +2,10 @@
|
|||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
|
from utils.database import FLAG_TO_TIMESTAMP, BATCH_SIZE
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at", "sent_at"]
|
|
||||||
|
|
||||||
|
|
||||||
async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
|
async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
|
||||||
"""Carica una lista di record di dati grezzi nel database.
|
"""Carica una lista di record di dati grezzi nel database.
|
||||||
|
|
||||||
@@ -62,15 +61,23 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
|
|||||||
`RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`),
|
`RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`),
|
||||||
`Created_at` = NOW()
|
`Created_at` = NOW()
|
||||||
"""
|
"""
|
||||||
|
#logger.info(f"Query insert: {sql_insert_RAWDATA}.")
|
||||||
|
#logger.info(f"Matrice valori da inserire: {matrice_valori}.")
|
||||||
rc = False
|
rc = False
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
for attempt in range(cfg.max_retries):
|
for attempt in range(cfg.max_retries):
|
||||||
try:
|
try:
|
||||||
logging.info(f"Loading data attempt {attempt + 1}.")
|
logging.info(f"Loading data attempt {attempt + 1}.")
|
||||||
await cur.executemany(sql_insert_RAWDATA, matrice_valori)
|
|
||||||
await conn.commit()
|
for i in range(0, len(matrice_valori), BATCH_SIZE):
|
||||||
|
batch = matrice_valori[i:i + BATCH_SIZE]
|
||||||
|
|
||||||
|
await cur.executemany(sql_insert_RAWDATA, batch)
|
||||||
|
await conn.commit()
|
||||||
|
|
||||||
|
logging.info(f"Completed batch {i//BATCH_SIZE + 1}/{(len(matrice_valori)-1)//BATCH_SIZE + 1}")
|
||||||
|
|
||||||
logging.info("Data loaded.")
|
logging.info("Data loaded.")
|
||||||
rc = True
|
rc = True
|
||||||
break
|
break
|
||||||
@@ -93,7 +100,7 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
|
|||||||
return rc
|
return rc
|
||||||
|
|
||||||
|
|
||||||
async def update_status(cfg: object, id: int, status: int, pool: object) -> None:
|
async def update_status(cfg: object, id: int, status: str, pool: object) -> None:
|
||||||
"""Aggiorna lo stato di un record nella tabella dei record CSV.
|
"""Aggiorna lo stato di un record nella tabella dei record CSV.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -106,7 +113,11 @@ async def update_status(cfg: object, id: int, status: int, pool: object) -> None
|
|||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
try:
|
try:
|
||||||
await cur.execute(
|
await cur.execute(
|
||||||
f"update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}"
|
f"""update {cfg.dbrectable} set
|
||||||
|
status = status | {status},
|
||||||
|
{FLAG_TO_TIMESTAMP[status]} = now()
|
||||||
|
where id = {id}
|
||||||
|
"""
|
||||||
)
|
)
|
||||||
await conn.commit()
|
await conn.commit()
|
||||||
logging.info(f"Status updated id {id}.")
|
logging.info(f"Status updated id {id}.")
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import logging
|
|||||||
import aiomysql
|
import aiomysql
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class FTPConnection:
|
class FTPConnection:
|
||||||
"""
|
"""
|
||||||
Manages an FTP or FTP_TLS connection, providing a context manager for automatic disconnection.
|
Manages an FTP or FTP_TLS connection, providing a context manager for automatic disconnection.
|
||||||
@@ -38,7 +39,11 @@ class FTPConnection:
|
|||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
self.ftp.quit()
|
self.ftp.quit()
|
||||||
|
|
||||||
async def send_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
|
async def send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
|
||||||
|
None
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
|
||||||
"""
|
"""
|
||||||
Sends elaborated CSV data to a customer via FTP.
|
Sends elaborated CSV data to a customer via FTP.
|
||||||
|
|
||||||
@@ -79,8 +84,6 @@ async def send_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_dat
|
|||||||
passive = ftp_parms.get('passive', True)
|
passive = ftp_parms.get('passive', True)
|
||||||
port = ftp_parms.get('port', 21)
|
port = ftp_parms.get('port', 21)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Connessione FTP
|
# Connessione FTP
|
||||||
with FTPConnection(host=send_ftp_info["ftp_addrs"], port=port, use_tls=use_tls, user=send_ftp_info["ftp_user"], passwd=send_ftp_info["ftp_passwd"], passive=passive) as ftp:
|
with FTPConnection(host=send_ftp_info["ftp_addrs"], port=port, use_tls=use_tls, user=send_ftp_info["ftp_user"], passwd=send_ftp_info["ftp_passwd"], passive=passive) as ftp:
|
||||||
|
|
||||||
Reference in New Issue
Block a user