diff --git a/load_orchestrator.py b/load_orchestrator.py index faec53d..d34faab 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -1,15 +1,14 @@ #!.venv/bin/python # Import necessary libraries -import mysql.connector import logging import importlib import asyncio import os +import aiomysql # Import custom modules for configuration and database connection from utils.config import loader_load_data as setting -from utils.database.connection import connetti_db from utils.database import CSV_RECEIVED # Initialize the logger for this module @@ -20,115 +19,91 @@ CSV_PROCESSING_DELAY = 0.1 # Tempo di attesa se non ci sono record da elaborare NO_RECORD_SLEEP = 20 -async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: - """ - Worker asyncrono che preleva lavori dalla coda e li esegue. +async def get_next_csv_atomic(pool, table_name): + """Preleva atomicamente il prossimo CSV da elaborare""" + async with pool.acquire() as conn: + async with conn.cursor() as cur: + # Usa SELECT FOR UPDATE per lock atomico + await cur.execute(f""" + SELECT id, unit_type, tool_type, unit_name, tool_name + FROM {table_name} + WHERE locked = 0 AND status = %s + ORDER BY id + LIMIT 1 + FOR UPDATE SKIP LOCKED + """, (CSV_RECEIVED,)) - Args: - worker_id (int): ID univoco del worker. - queue (asyncio.Queue): Coda da cui prendere i lavori. - cfg (object): Configurazione caricata. - """ + result = await cur.fetchone() + if result: + await cur.execute(f""" + UPDATE {table_name} + SET locked = 1 + WHERE id = %s + """, (result[0],)) + await conn.commit() + + return result + +async def worker(worker_id: int, cfg: object, pool) -> None: debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) logger.info(f"Worker {worker_id} - Avviato") while True: try: - # Preleva un "lavoro" dalla coda (in questo caso non ci sono parametri) - await queue.get() - logger.info(f"Worker {worker_id} - Inizio elaborazione") - record, success = await load_csv(cfg, worker_id) + record = await get_next_csv_atomic(pool, cfg.dbrectable) - if not record: - logger.debug(f"Worker {worker_id} - Nessun record trovato") - await asyncio.sleep(NO_RECORD_SLEEP) - if not success: - logger.error(f"Worker {worker_id} - Errore durante l'elaborazione") + if record: + success = await load_csv(record, cfg, worker_id, pool) + if not success: + logger.error(f"Worker {worker_id} - Errore durante l'elaborazione") await asyncio.sleep(CSV_PROCESSING_DELAY) else: - logger.debug(f"Worker {worker_id} - Elaborazione completata correttamente") - await asyncio.sleep(CSV_PROCESSING_DELAY) + await asyncio.sleep(NO_RECORD_SLEEP) - # Segnala che il lavoro è completato - queue.task_done() except Exception as e: logger.error(f"Worker {worker_id} - Errore durante l'esecuzione: {e}", exc_info=debug_mode) - queue.task_done() + await asyncio.sleep(1) - -async def load_csv(cfg: object, worker_id: int) -> tuple: - """ - Cerca e carica un file CSV da elaborare dal database. - - Args: - cfg (object): Oggetto configurazione contenente dati per DB e altro. - - Returns: - bool: True se è stato trovato ed elaborato un record, False altrimenti. - """ +async def load_csv(record: tuple, cfg: object, worker_id: int, pool) -> bool: debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) logger.debug(f"Worker {worker_id} - Inizio ricerca nuovo CSV da elaborare") - try: - with connetti_db(cfg) as conn: - cur = conn.cursor() - logger.debug(f"Worker {worker_id} - Connessione al database stabilita") + id, unit_type, tool_type, unit_name, tool_name = record + logger.info(f"Worker {worker_id} - Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") - query = f""" - SELECT id, unit_type, tool_type, unit_name, tool_name - FROM {cfg.dbname}.{cfg.dbrectable} - WHERE locked = 0 AND status = {CSV_RECEIVED} - LIMIT 1 - """ - logger.debug(f"Worker {worker_id} - Esecuzione query: {query}") - cur.execute(query) - result = cur.fetchone() + # Costruisce il nome del modulo da caricare dinamicamente + module_names = [f'utils.parsers.by_name.{unit_name.lower()}_{tool_name.lower()}', + f'utils.parsers.by_name.{unit_name.lower()}_{tool_type.lower()}', + f'utils.parsers.by_name.{unit_name.lower()}_all', + f'utils.parsers.by_type.{unit_type.lower()}_{tool_type.lower()}'] + modulo = None + for module_name in module_names: + try: + logger.debug(f"Worker {worker_id} - Caricamento dinamico del modulo: {module_name}") + modulo = importlib.import_module(module_name) + logger.debug(f"Worker {worker_id} - Funzione 'main_loader' caricata dal modulo {module_name}") + break + except (ImportError, AttributeError) as e: + logger.info(f"Worker {worker_id} - Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode) - if result: - id, unit_type, tool_type, unit_name, tool_name = result - logger.info(f"Worker {worker_id} - Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") + if not modulo: + logger.error(f"Worker {worker_id} - Nessun modulo trovato {module_names}") + return False - lock_query = f"UPDATE {cfg.dbname}.{cfg.dbrectable} SET locked = 1 WHERE id = {id}" - logger.debug(f"Worker {worker_id} - Esecuzione lock del record: {lock_query}") - cur.execute(lock_query) - conn.commit() + # Ottiene la funzione 'main_loader' dal modulo - # Costruisce il nome del modulo da caricare dinamicamente - module_names = [f'utils.parsers.by_name.{unit_name.lower()}_{tool_name.lower()}', - f'utils.parsers.by_name.{unit_name.lower()}_{tool_type.lower()}', - f'utils.parsers.by_name.{unit_name.lower()}_all', - f'utils.parsers.by_type.{unit_type.lower()}_{tool_type.lower()}'] - modulo = None - for module_name in module_names: - try: - logger.debug(f"Worker {worker_id} - Caricamento dinamico del modulo: {module_name}") - modulo = importlib.import_module(module_name) - logger.debug(f"Worker {worker_id} - Funzione 'main_loader' caricata dal modulo {module_name}") - except (ImportError, AttributeError) as e: - logger.info(f"Worker {worker_id} - Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode) + funzione = getattr(modulo, "main_loader") - if not modulo: - logger.error(f"Worker {worker_id} - Nessun modulo trovato {module_names}") - return True, False + # Esegui la funzione - # Ottiene la funzione 'main_loader' dal modulo + logger.info(f"Worker {worker_id} - Elaborazione con modulo {modulo} per ID={id}") + await funzione(cfg, id, pool) + logger.info(f"Worker {worker_id} - Elaborazione completata per ID={id}") + return True - funzione = getattr(modulo, "main_loader") - - # Esegui la funzione - await funzione(cfg, id) - logger.info(f"Worker {worker_id} - Elaborazione completata per ID={id}") - return True, True - else: - logger.debug(f"Worker {worker_id} - Nessun record disponibile per l'elaborazione") - return False, False - - except mysql.connector.Error as e: - logger.error(f"Worker {worker_id} - Errore database: {e}", exc_info=debug_mode) - return False, False async def main(): @@ -150,45 +125,36 @@ async def main(): ) logger.info("Logging configurato correttamente") - # Crea una coda di lavoro illimitata - queue = asyncio.Queue(maxsize=cfg.max_threads * 2 or 20) - logger.debug("Coda di lavoro creata") # Numero massimo di worker concorrenti - num_workers = cfg.max_threads - logger.info(f"Avvio di {num_workers} worker concorrenti") + logger.info(f"Avvio di {cfg.max_threads} worker concorrenti") + + pool = await aiomysql.create_pool( + host=cfg.dbhost, + user=cfg.dbuser, + password=cfg.dbpass, + db=cfg.dbname, + minsize=1, + maxsize=cfg.max_threads*4 + ) # Avvia i worker workers = [ - asyncio.create_task(worker(i, queue, cfg)) for i in range(num_workers) + asyncio.create_task(worker(i, cfg, pool)) + for i in range(cfg.max_threads) ] logger.info("Sistema avviato correttamente. In attesa di nuovi task...") - # Ciclo infinito per aggiungere lavori alla coda - while True: - logger.debug("Aggiunta di un nuovo lavoro alla coda") - await queue.put(None) - - # Breve attesa prima di aggiungere un altro lavoro - await asyncio.sleep(0.5) + try: + await asyncio.gather(*workers, return_exceptions=debug_mode) + finally: + pool.close() + await pool.wait_closed() except KeyboardInterrupt: logger.info("Info: Shutdown richiesto... chiusura in corso") - # Attendi che tutti i lavori pendenti siano completati - logger.info("Attesa completamento dei task in coda...") - await queue.join() - - # Ferma i worker - logger.info("Chiusura dei worker in corso...") - for task in workers: - task.cancel() - - await asyncio.gather(*workers, return_exceptions=debug_mode) - - logger.info("Info: Tutti i task terminati. Uscita.") - except Exception as e: logger.error(f"Errore principale: {e}", exc_info=debug_mode) diff --git a/utils/csv/data_preparation.py b/utils/csv/data_preparation.py index dbdd7fc..49808e3 100644 --- a/utils/csv/data_preparation.py +++ b/utils/csv/data_preparation.py @@ -1,5 +1,4 @@ #!.venv/bin/python -from utils.database.connection import connetti_db from utils.database.nodes_query import get_nodes_type import utils.timestamp.date_check as date_check import logging @@ -9,31 +8,15 @@ from itertools import islice logger = logging.getLogger(__name__) -def get_data(cfg: object, id: int) -> tuple: - """ - Retrieves data for a specific tool from the database. +async def get_data(cfg: object, id: int, pool) -> tuple: + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}') + unit_name, tool_name, tool_data = await cur.fetchone() - This function connects to the database using the provided configuration, - executes a query to retrieve the unit name, tool name ID, and tool data - associated with the given ID from the raw data table, and returns the results. + return unit_name, tool_name, tool_data - Args: - cfg: A configuration object containing database connection parameters - and table names (cfg.dbname, cfg.dbrectable). - id: The ID of the tool record to retrieve. - - Returns: - A tuple containing the unit name, tool name ID, and tool data. - """ - with connetti_db(cfg) as conn: - cur = conn.cursor() - cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbname}.{cfg.dbrectable} where id = {id}') - unit_name, tool_name, tool_data = cur.fetchone() - cur.close() - conn.close() - return unit_name, tool_name, tool_data - -def make_pipe_sep_matrix(cfg: object, id: int) -> list: +async def make_pipe_sep_matrix(cfg: object, id: int, pool) -> list: """ Processes raw tool data and transforms it into a matrix format for database insertion. @@ -55,7 +38,7 @@ def make_pipe_sep_matrix(cfg: object, id: int) -> list: EventTime, BatLevel, Temperature, followed by up to 16 additional measurement values (Val0 to ValF), padded with None if necessary. """ - UnitName, ToolNameID, ToolData = get_data(cfg, id) + UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) righe = ToolData.splitlines() matrice_valori = [] for riga in [riga for riga in righe if ';|;' in riga]: @@ -68,7 +51,7 @@ def make_pipe_sep_matrix(cfg: object, id: int) -> list: return matrice_valori -def make_ain_din_matrix(cfg: object, id: int) -> list: +async def make_ain_din_matrix(cfg: object, id: int, pool) -> list: """ Processes raw location (LOC) tool data and transforms it into a matrix format for database insertion. @@ -90,7 +73,7 @@ def make_ain_din_matrix(cfg: object, id: int) -> list: A list of lists (matrix) representing the processed LOC data. Each inner list contains data fields similar to `make_matrix`, adjusted for LOC data. """ - UnitName, ToolNameID, ToolData = get_data(cfg, id) + UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) righe = ToolData.splitlines() matrice_valori = [] @@ -113,8 +96,8 @@ def make_ain_din_matrix(cfg: object, id: int) -> list: return matrice_valori -def make_channels_matrix(cfg: object, id: int) -> list: - UnitName, ToolNameID, ToolData = get_data(cfg, id) +async def make_channels_matrix(cfg: object, id: int, pool) -> list: + UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) righe = ToolData.splitlines() matrice_valori = [] diff --git a/utils/csv/loaders.py b/utils/csv/loaders.py index e1b6f8b..c2f4b49 100644 --- a/utils/csv/loaders.py +++ b/utils/csv/loaders.py @@ -6,7 +6,7 @@ import logging logger = logging.getLogger(__name__) -async def main_loader(cfg: object, id: int, action: str) -> None: +async def main_loader(cfg: object, id: int, pool, action: str) -> None: type_matrix_mapping = { "pipe_separator": make_pipe_sep_matrix, "analogic_digital": make_ain_din_matrix, @@ -15,11 +15,11 @@ async def main_loader(cfg: object, id: int, action: str) -> None: if action in type_matrix_mapping: function_to_call = type_matrix_mapping[action] # Create a matrix of values from the data - matrice_valori = function_to_call(cfg, id) + matrice_valori = await function_to_call(cfg, id, pool) logger.info("matrice valori creata") # Load the data into the database - if load_data(cfg, matrice_valori): - update_status(cfg, id, DATA_LOADED) + if await load_data(cfg, matrice_valori, pool): + await update_status(cfg, id, DATA_LOADED, pool) else: logger.warning(f"Action '{action}' non riconosciuta.") diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py index 24f7c2f..630668f 100644 --- a/utils/database/loader_action.py +++ b/utils/database/loader_action.py @@ -1,5 +1,4 @@ #!.venv/bin/python -from utils.database.connection import connetti_db import logging logger = logging.getLogger(__name__) @@ -7,12 +6,12 @@ logger = logging.getLogger(__name__) timestamp_cols = ['inserted_at', 'loaded_at', 'elaborated_at'] -def load_data(cfg: object, matrice_valori: list) -> bool : +async def load_data(cfg: object, matrice_valori: list, pool) -> bool : if not matrice_valori: logger.info("Nulla da caricare.") return True sql_insert_RAWDATA = f''' - INSERT IGNORE INTO {cfg.dbname}.{cfg.dbrawdata} ( + INSERT INTO {cfg.dbrawdata} ( `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, `Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`, @@ -23,44 +22,67 @@ def load_data(cfg: object, matrice_valori: list) -> bool : %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s - ) + ) as new_data + ON DUPLICATE KEY UPDATE + `BatLevel` = IF({cfg.dbrawdata}.`BatLevel` != new_data.`BatLevel`, new_data.`BatLevel`, {cfg.dbrawdata}.`BatLevel`), + `Temperature` = IF({cfg.dbrawdata}.`Temperature` != new_data.Temperature, new_data.Temperature, {cfg.dbrawdata}.`Temperature`), + `Val0` = IF({cfg.dbrawdata}.`Val0` != new_data.Val0, new_data.Val0, {cfg.dbrawdata}.`Val0`), + `Val1` = IF({cfg.dbrawdata}.`Val1` != new_data.Val1, new_data.Val1, {cfg.dbrawdata}.`Val1`), + `Val2` = IF({cfg.dbrawdata}.`Val2` != new_data.Val2, new_data.Val2, {cfg.dbrawdata}.`Val2`), + `Val3` = IF({cfg.dbrawdata}.`Val3` != new_data.Val3, new_data.Val3, {cfg.dbrawdata}.`Val3`), + `Val4` = IF({cfg.dbrawdata}.`Val4` != new_data.Val4, new_data.Val4, {cfg.dbrawdata}.`Val4`), + `Val5` = IF({cfg.dbrawdata}.`Val5` != new_data.Val5, new_data.Val5, {cfg.dbrawdata}.`Val5`), + `Val6` = IF({cfg.dbrawdata}.`Val6` != new_data.Val6, new_data.Val6, {cfg.dbrawdata}.`Val6`), + `Val7` = IF({cfg.dbrawdata}.`Val7` != new_data.Val7, new_data.Val7, {cfg.dbrawdata}.`Val7`), + `Val8` = IF({cfg.dbrawdata}.`Val8` != new_data.Val8, new_data.Val8, {cfg.dbrawdata}.`Val8`), + `Val9` = IF({cfg.dbrawdata}.`Val9` != new_data.Val9, new_data.Val9, {cfg.dbrawdata}.`Val9`), + `ValA` = IF({cfg.dbrawdata}.`ValA` != new_data.ValA, new_data.ValA, {cfg.dbrawdata}.`ValA`), + `ValB` = IF({cfg.dbrawdata}.`ValB` != new_data.ValB, new_data.ValB, {cfg.dbrawdata}.`ValB`), + `ValC` = IF({cfg.dbrawdata}.`ValC` != new_data.ValC, new_data.ValC, {cfg.dbrawdata}.`ValC`), + `ValD` = IF({cfg.dbrawdata}.`ValD` != new_data.ValD, new_data.ValD, {cfg.dbrawdata}.`ValD`), + `ValE` = IF({cfg.dbrawdata}.`ValE` != new_data.ValE, new_data.ValE, {cfg.dbrawdata}.`ValE`), + `ValF` = IF({cfg.dbrawdata}.`ValF` != new_data.ValF, new_data.ValF, {cfg.dbrawdata}.`ValF`), + `BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule, {cfg.dbrawdata}.`BatLevelModule`), + `TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`), + `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), + `Created_at` = NOW() ''' - with connetti_db(cfg) as conn: - cur = conn.cursor() - try: - cur.executemany(sql_insert_RAWDATA, matrice_valori) - conn.commit() - logging.info("Data loaded.") - rc = True - except Exception as e: - conn.rollback() - logging.error(f"Error: {e}.") - rc = False - finally: - conn.close() - return rc -def update_status(cfg: object, id: int, status: int) -> None: - with connetti_db(cfg) as conn: - cur = conn.cursor() - try: - cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 0, status = {status}, {timestamp_cols[status]} = now() where id = {id}') - conn.commit() - logging.info("Status updated.") - except Exception as e: - conn.rollback() - logging.error(f'Error: {e}') + async with pool.acquire() as conn: + async with conn.cursor() as cur: + try: + await cur.executemany(sql_insert_RAWDATA, matrice_valori) + await conn.commit() + logging.info("Data loaded.") + rc = True + except Exception as e: + await conn.rollback() + logging.error(f"Error: {e}.") + rc = False + finally: + return rc -def get_matlab_cmd(cfg: object, unit: str, tool: str) -> tuple: - with connetti_db(cfg) as conn: - cur = conn.cursor() - try: - cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate - from matfuncs as m - inner join tools as t on t.matfunc = m.id - inner join units as u on u.id = t.unit_id - inner join statustools as s on t.statustool_id = s.id - where t.name = "{tool}" and u.name = "{unit}"''') - return cur.fetchone() - except Exception as e: - logging.error(f'Error: {e}') \ No newline at end of file +async def update_status(cfg: object, id: int, status: int, pool) -> None: + async with pool.acquire() as conn: + async with conn.cursor() as cur: + try: + await cur.execute(f'update {cfg.dbrectable} set locked = 0, status = {status}, {timestamp_cols[status]} = now() where id = {id}') + await conn.commit() + logging.info("Status updated.") + except Exception as e: + await conn.rollback() + logging.error(f'Error: {e}') + +async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple: + async with pool.acquire() as conn: + async with conn.cursor() as cur: + try: + await cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate + from matfuncs as m + inner join tools as t on t.matfunc = m.id + inner join units as u on u.id = t.unit_id + inner join statustools as s on t.statustool_id = s.id + where t.name = "{tool}" and u.name = "{unit}"''') + return cur.fetchone() + except Exception as e: + logging.error(f'Error: {e}') \ No newline at end of file diff --git a/utils/parsers/by_type/g201_g201.py b/utils/parsers/by_type/g201_g201.py index 2f43f9c..1c38e53 100644 --- a/utils/parsers/by_type/g201_g201.py +++ b/utils/parsers/by_type/g201_g201.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as channels_main_loader -async def main_loader(cfg: object, id: int) -> None: - await channels_main_loader(cfg, id, "channels") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await channels_main_loader(cfg, id, pool,"channels") \ No newline at end of file diff --git a/utils/parsers/by_type/g301_g301.py b/utils/parsers/by_type/g301_g301.py index 87d55d2..c6948eb 100644 --- a/utils/parsers/by_type/g301_g301.py +++ b/utils/parsers/by_type/g301_g301.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int) -> None: - await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g801_iptm.py b/utils/parsers/by_type/g801_iptm.py index 87d55d2..c6948eb 100644 --- a/utils/parsers/by_type/g801_iptm.py +++ b/utils/parsers/by_type/g801_iptm.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int) -> None: - await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g801_loc.py b/utils/parsers/by_type/g801_loc.py index 0233b83..6340601 100644 --- a/utils/parsers/by_type/g801_loc.py +++ b/utils/parsers/by_type/g801_loc.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as analog_dig_main_loader -async def main_loader(cfg: object, id: int) -> None: - await analog_dig_main_loader(cfg, id, "analogic_digital") +async def main_loader(cfg: object, id: int, pool) -> None: + await analog_dig_main_loader(cfg, id, pool, "analogic_digital") diff --git a/utils/parsers/by_type/g801_mums.py b/utils/parsers/by_type/g801_mums.py index c00ddf4..19d2f34 100644 --- a/utils/parsers/by_type/g801_mums.py +++ b/utils/parsers/by_type/g801_mums.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int) -> None: - await pipe_sep_main_loader(cfg, id, "pipe_separator") +async def main_loader(cfg: object, id: int, pool) -> None: + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") diff --git a/utils/parsers/by_type/g801_mux.py b/utils/parsers/by_type/g801_mux.py index 2f43f9c..b56f111 100644 --- a/utils/parsers/by_type/g801_mux.py +++ b/utils/parsers/by_type/g801_mux.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as channels_main_loader -async def main_loader(cfg: object, id: int) -> None: - await channels_main_loader(cfg, id, "channels") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await channels_main_loader(cfg, id, pool, "channels") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_dsas.py b/utils/parsers/by_type/g802_dsas.py index 87d55d2..c6948eb 100644 --- a/utils/parsers/by_type/g802_dsas.py +++ b/utils/parsers/by_type/g802_dsas.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int) -> None: - await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_loc.py b/utils/parsers/by_type/g802_loc.py index 389fd0f..70a298a 100644 --- a/utils/parsers/by_type/g802_loc.py +++ b/utils/parsers/by_type/g802_loc.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as analog_dig_main_loader -async def main_loader(cfg: object, id: int) -> None: - await analog_dig_main_loader(cfg, id, "analogic_digital") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await analog_dig_main_loader(cfg, id, pool, "analogic_digital") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_modb.py b/utils/parsers/by_type/g802_modb.py index 87d55d2..c6948eb 100644 --- a/utils/parsers/by_type/g802_modb.py +++ b/utils/parsers/by_type/g802_modb.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int) -> None: - await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_mums.py b/utils/parsers/by_type/g802_mums.py index 87d55d2..c6948eb 100644 --- a/utils/parsers/by_type/g802_mums.py +++ b/utils/parsers/by_type/g802_mums.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int) -> None: - await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_mux.py b/utils/parsers/by_type/g802_mux.py index 2f43f9c..b56f111 100644 --- a/utils/parsers/by_type/g802_mux.py +++ b/utils/parsers/by_type/g802_mux.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as channels_main_loader -async def main_loader(cfg: object, id: int) -> None: - await channels_main_loader(cfg, id, "channels") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await channels_main_loader(cfg, id, pool, "channels") \ No newline at end of file diff --git a/utils/parsers/by_type/gs1_gs1.py b/utils/parsers/by_type/gs1_gs1.py index 1b94ee6..9814dea 100644 --- a/utils/parsers/by_type/gs1_gs1.py +++ b/utils/parsers/by_type/gs1_gs1.py @@ -1,4 +1,4 @@ from .tlp_tlp import main_loader as tlp_tlp_main_loader -async def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int, pool) -> None: await tlp_tlp_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/by_type/hortus_hortus.py b/utils/parsers/by_type/hortus_hortus.py index 87d55d2..c6948eb 100644 --- a/utils/parsers/by_type/hortus_hortus.py +++ b/utils/parsers/by_type/hortus_hortus.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int) -> None: - await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file