diff --git a/dbddl/received.ddl b/dbddl/received.ddl index 59b9215..c79498e 100644 --- a/dbddl/received.ddl +++ b/dbddl/received.ddl @@ -3,10 +3,10 @@ DROP TABLE ase_lar.received; CREATE TABLE `received` ( `id` int NOT NULL AUTO_INCREMENT, `filename` varchar(100) COLLATE utf8mb4_general_ci NOT NULL, - `unit_name` varchar(10) COLLATE utf8mb4_general_ci NOT NULL, - `unit_type` varchar(20) COLLATE utf8mb4_general_ci NOT NULL, - `tool_name` varchar(10) COLLATE utf8mb4_general_ci NOT NULL, - `tool_type` varchar(20) COLLATE utf8mb4_general_ci NOT NULL, + `unit_name` varchar(30) COLLATE utf8mb4_general_ci NOT NULL, + `unit_type` varchar(30) COLLATE utf8mb4_general_ci NOT NULL, + `tool_name` varchar(30) COLLATE utf8mb4_general_ci NOT NULL, + `tool_type` varchar(30) COLLATE utf8mb4_general_ci NOT NULL, `tool_data` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `locked` int DEFAULT '0', `status` int DEFAULT '0', diff --git a/env/db.ini b/env/db.ini index 366d05f..dc5889f 100644 --- a/env/db.ini +++ b/env/db.ini @@ -7,7 +7,7 @@ user = root password = batt1l0 dbName = ase_lar - maxRetries = 5 + maxRetries = 10 [tables] userTableName = virtusers diff --git a/env/ftp.ini b/env/ftp.ini index a8574e9..5789825 100644 --- a/env/ftp.ini +++ b/env/ftp.ini @@ -20,12 +20,12 @@ logFilename = ./ftp_csv_rec.log [unit] - Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1|HORTUS - Names = ID[0-9]{4}|IX[0-9]{4} + Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1|HORTUS|RIFKL|HEALTH-|READINGS-|INTEGRITY MONITOR|MESSPUNKTEPINI_|HIRPINIA|CO_[0-9]{4}_[0-9]|ISI CSV LOG + Names = ID[0-9]{4}|IX[0-9]{4}|CHESA_ARCOIRIS_[0-9]*|TS_PS_PETITES_CROISETTES|CO_[0-9]{4}_[0-9] [tool] - Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA|GS1|G201|TLP|DSAS|HORTUS - Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|measurement + Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA|GS1|G201|TLP|DSAS|HORTUS|RIFKL|HEALTH-|READINGS-|INTEGRITY MONITOR|MESSPUNKTEPINI_|HIRPINIA|CO_[0-9]{4}_[0-9]|VULINK + Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|MEASUREMENTS_|CHESA_ARCOIRIS_[0-9]*|TS_PS_PETITES_CROISETTES|CO_[0-9]{4}_[0-9] [csv] Infos = IP|Subnet|Gateway diff --git a/load_orchestrator.py b/load_orchestrator.py index 4fa0d4b..288a8cb 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -50,8 +50,8 @@ async def get_next_csv_atomic(pool, table_name): SET locked = 1 WHERE id = %s """, (result[0],)) - await conn.commit() + await conn.commit() return result async def worker(worker_id: int, cfg: object, pool) -> None: @@ -149,7 +149,9 @@ async def main(): password=cfg.dbpass, db=cfg.dbname, minsize=1, - maxsize=cfg.max_threads*4 + maxsize=cfg.max_threads*4, + pool_recycle=3600, + autocommit=True ) # Avvia i worker diff --git a/utils/csv/data_preparation.py b/utils/csv/data_preparation.py index 34c88df..d44a021 100644 --- a/utils/csv/data_preparation.py +++ b/utils/csv/data_preparation.py @@ -47,6 +47,11 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool) -> list: if batlevel == '|': batlevel = temperature temperature, rilevazioni = rilevazioni.split(';',1) + ''' in alcune letture mancano temperatura e livello batteria''' + if temperature == '': + temperature = 0 + if batlevel == '': + batlevel = 0 valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): valori = valori_nodo.split(';') diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py index 26ac543..885c049 100644 --- a/utils/database/loader_action.py +++ b/utils/database/loader_action.py @@ -27,37 +27,39 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool : ON DUPLICATE KEY UPDATE `BatLevel` = IF({cfg.dbrawdata}.`BatLevel` != new_data.`BatLevel`, new_data.`BatLevel`, {cfg.dbrawdata}.`BatLevel`), `Temperature` = IF({cfg.dbrawdata}.`Temperature` != new_data.Temperature, new_data.Temperature, {cfg.dbrawdata}.`Temperature`), - `Val0` = IF({cfg.dbrawdata}.`Val0` != new_data.Val0, new_data.Val0, {cfg.dbrawdata}.`Val0`), - `Val1` = IF({cfg.dbrawdata}.`Val1` != new_data.Val1, new_data.Val1, {cfg.dbrawdata}.`Val1`), - `Val2` = IF({cfg.dbrawdata}.`Val2` != new_data.Val2, new_data.Val2, {cfg.dbrawdata}.`Val2`), - `Val3` = IF({cfg.dbrawdata}.`Val3` != new_data.Val3, new_data.Val3, {cfg.dbrawdata}.`Val3`), - `Val4` = IF({cfg.dbrawdata}.`Val4` != new_data.Val4, new_data.Val4, {cfg.dbrawdata}.`Val4`), - `Val5` = IF({cfg.dbrawdata}.`Val5` != new_data.Val5, new_data.Val5, {cfg.dbrawdata}.`Val5`), - `Val6` = IF({cfg.dbrawdata}.`Val6` != new_data.Val6, new_data.Val6, {cfg.dbrawdata}.`Val6`), - `Val7` = IF({cfg.dbrawdata}.`Val7` != new_data.Val7, new_data.Val7, {cfg.dbrawdata}.`Val7`), - `Val8` = IF({cfg.dbrawdata}.`Val8` != new_data.Val8, new_data.Val8, {cfg.dbrawdata}.`Val8`), - `Val9` = IF({cfg.dbrawdata}.`Val9` != new_data.Val9, new_data.Val9, {cfg.dbrawdata}.`Val9`), - `ValA` = IF({cfg.dbrawdata}.`ValA` != new_data.ValA, new_data.ValA, {cfg.dbrawdata}.`ValA`), - `ValB` = IF({cfg.dbrawdata}.`ValB` != new_data.ValB, new_data.ValB, {cfg.dbrawdata}.`ValB`), - `ValC` = IF({cfg.dbrawdata}.`ValC` != new_data.ValC, new_data.ValC, {cfg.dbrawdata}.`ValC`), - `ValD` = IF({cfg.dbrawdata}.`ValD` != new_data.ValD, new_data.ValD, {cfg.dbrawdata}.`ValD`), - `ValE` = IF({cfg.dbrawdata}.`ValE` != new_data.ValE, new_data.ValE, {cfg.dbrawdata}.`ValE`), - `ValF` = IF({cfg.dbrawdata}.`ValF` != new_data.ValF, new_data.ValF, {cfg.dbrawdata}.`ValF`), + `Val0` = IF({cfg.dbrawdata}.`Val0` != new_data.Val0 AND new_data.`Val0` IS NOT NULL, new_data.Val0, {cfg.dbrawdata}.`Val0`), + `Val1` = IF({cfg.dbrawdata}.`Val1` != new_data.Val1 AND new_data.`Val1` IS NOT NULL, new_data.Val1, {cfg.dbrawdata}.`Val1`), + `Val2` = IF({cfg.dbrawdata}.`Val2` != new_data.Val2 AND new_data.`Val2` IS NOT NULL, new_data.Val2, {cfg.dbrawdata}.`Val2`), + `Val3` = IF({cfg.dbrawdata}.`Val3` != new_data.Val3 AND new_data.`Val3` IS NOT NULL, new_data.Val3, {cfg.dbrawdata}.`Val3`), + `Val4` = IF({cfg.dbrawdata}.`Val4` != new_data.Val4 AND new_data.`Val4` IS NOT NULL, new_data.Val4, {cfg.dbrawdata}.`Val4`), + `Val5` = IF({cfg.dbrawdata}.`Val5` != new_data.Val5 AND new_data.`Val5` IS NOT NULL, new_data.Val5, {cfg.dbrawdata}.`Val5`), + `Val6` = IF({cfg.dbrawdata}.`Val6` != new_data.Val6 AND new_data.`Val6` IS NOT NULL, new_data.Val6, {cfg.dbrawdata}.`Val6`), + `Val7` = IF({cfg.dbrawdata}.`Val7` != new_data.Val7 AND new_data.`Val7` IS NOT NULL, new_data.Val7, {cfg.dbrawdata}.`Val7`), + `Val8` = IF({cfg.dbrawdata}.`Val8` != new_data.Val8 AND new_data.`Val8` IS NOT NULL, new_data.Val8, {cfg.dbrawdata}.`Val8`), + `Val9` = IF({cfg.dbrawdata}.`Val9` != new_data.Val9 AND new_data.`Val9` IS NOT NULL, new_data.Val9, {cfg.dbrawdata}.`Val9`), + `ValA` = IF({cfg.dbrawdata}.`ValA` != new_data.ValA AND new_data.`ValA` IS NOT NULL, new_data.ValA, {cfg.dbrawdata}.`ValA`), + `ValB` = IF({cfg.dbrawdata}.`ValB` != new_data.ValB AND new_data.`ValB` IS NOT NULL, new_data.ValB, {cfg.dbrawdata}.`ValB`), + `ValC` = IF({cfg.dbrawdata}.`ValC` != new_data.ValC AND new_data.`ValC` IS NOT NULL, new_data.ValC, {cfg.dbrawdata}.`ValC`), + `ValD` = IF({cfg.dbrawdata}.`ValD` != new_data.ValD AND new_data.`ValD` IS NOT NULL, new_data.ValD, {cfg.dbrawdata}.`ValD`), + `ValE` = IF({cfg.dbrawdata}.`ValE` != new_data.ValE AND new_data.`ValE` IS NOT NULL, new_data.ValE, {cfg.dbrawdata}.`ValE`), + `ValF` = IF({cfg.dbrawdata}.`ValF` != new_data.ValF AND new_data.`ValF` IS NOT NULL, new_data.ValF, {cfg.dbrawdata}.`ValF`), `BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule, {cfg.dbrawdata}.`BatLevelModule`), `TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`), `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), `Created_at` = NOW() ''' + rc = False async with pool.acquire() as conn: async with conn.cursor() as cur: - rc = False for attempt in range(cfg.max_retries): try: + logging.info(f"Loading data attempt {attempt + 1}.") await cur.executemany(sql_insert_RAWDATA, matrice_valori) await conn.commit() logging.info("Data loaded.") rc = True + break except Exception as e: await conn.rollback() logging.error(f"Error: {e}.") @@ -72,8 +74,7 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool : else: logging.error("Max retry attempts reached for deadlock") raise - finally: - return rc + return rc async def update_status(cfg: object, id: int, status: int, pool) -> None: async with pool.acquire() as conn: diff --git a/utils/ftp/file_management.py b/utils/ftp/file_management.py index ec5584a..a43e8e6 100644 --- a/utils/ftp/file_management.py +++ b/utils/ftp/file_management.py @@ -23,13 +23,13 @@ def on_file_received(self: object, file: str) -> None: path, filenameExt = os.path.split(file) filename, fileExtension = os.path.splitext(filenameExt) if (fileExtension.upper() in (cfg.fileext)): - with open(file, 'r') as csvfile: + with open(file, 'r', encoding='utf-8', errors='ignore') as csvfile: lines = csvfile.readlines() - unit_name = extract_value(cfg.units_name, filename, str(lines[0:9])) - unit_type = extract_value(cfg.units_type, filename, str(lines[0:9])) - tool_name = extract_value(cfg.tools_name, filename, str(lines[0:9])) - tool_type = extract_value(cfg.tools_type, filename, str(lines[0:9])) + unit_name = extract_value(cfg.units_name, filename, str(lines[0:10])) + unit_type = extract_value(cfg.units_type, filename, str(lines[0:10])) + tool_name = extract_value(cfg.tools_name, filename, str(lines[0:10])) + tool_type = extract_value(cfg.tools_type, filename, str(lines[0:10])) try: conn = connetti_db(cfg) diff --git a/utils/parsers/by_type/tlp_loc.py b/utils/parsers/by_type/tlp_loc.py new file mode 100644 index 0000000..70a298a --- /dev/null +++ b/utils/parsers/by_type/tlp_loc.py @@ -0,0 +1,4 @@ +from utils.csv.loaders import main_loader as analog_dig_main_loader + +async def main_loader(cfg: object, id: int, pool) -> None: + await analog_dig_main_loader(cfg, id, pool, "analogic_digital") \ No newline at end of file