fix async

This commit is contained in:
2025-05-27 23:50:25 +02:00
parent 670972bd45
commit c40378b654
17 changed files with 181 additions and 210 deletions

View File

@@ -1,15 +1,14 @@
#!.venv/bin/python #!.venv/bin/python
# Import necessary libraries # Import necessary libraries
import mysql.connector
import logging import logging
import importlib import importlib
import asyncio import asyncio
import os import os
import aiomysql
# Import custom modules for configuration and database connection # Import custom modules for configuration and database connection
from utils.config import loader_load_data as setting from utils.config import loader_load_data as setting
from utils.database.connection import connetti_db
from utils.database import CSV_RECEIVED from utils.database import CSV_RECEIVED
# Initialize the logger for this module # Initialize the logger for this module
@@ -20,115 +19,91 @@ CSV_PROCESSING_DELAY = 0.1
# Tempo di attesa se non ci sono record da elaborare # Tempo di attesa se non ci sono record da elaborare
NO_RECORD_SLEEP = 20 NO_RECORD_SLEEP = 20
async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: async def get_next_csv_atomic(pool, table_name):
""" """Preleva atomicamente il prossimo CSV da elaborare"""
Worker asyncrono che preleva lavori dalla coda e li esegue. async with pool.acquire() as conn:
async with conn.cursor() as cur:
# Usa SELECT FOR UPDATE per lock atomico
await cur.execute(f"""
SELECT id, unit_type, tool_type, unit_name, tool_name
FROM {table_name}
WHERE locked = 0 AND status = %s
ORDER BY id
LIMIT 1
FOR UPDATE SKIP LOCKED
""", (CSV_RECEIVED,))
Args: result = await cur.fetchone()
worker_id (int): ID univoco del worker. if result:
queue (asyncio.Queue): Coda da cui prendere i lavori. await cur.execute(f"""
cfg (object): Configurazione caricata. UPDATE {table_name}
""" SET locked = 1
WHERE id = %s
""", (result[0],))
await conn.commit()
return result
async def worker(worker_id: int, cfg: object, pool) -> None:
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG)
logger.info(f"Worker {worker_id} - Avviato") logger.info(f"Worker {worker_id} - Avviato")
while True: while True:
try: try:
# Preleva un "lavoro" dalla coda (in questo caso non ci sono parametri)
await queue.get()
logger.info(f"Worker {worker_id} - Inizio elaborazione") logger.info(f"Worker {worker_id} - Inizio elaborazione")
record, success = await load_csv(cfg, worker_id) record = await get_next_csv_atomic(pool, cfg.dbrectable)
if not record: if record:
logger.debug(f"Worker {worker_id} - Nessun record trovato") success = await load_csv(record, cfg, worker_id, pool)
await asyncio.sleep(NO_RECORD_SLEEP) if not success:
if not success: logger.error(f"Worker {worker_id} - Errore durante l'elaborazione")
logger.error(f"Worker {worker_id} - Errore durante l'elaborazione")
await asyncio.sleep(CSV_PROCESSING_DELAY) await asyncio.sleep(CSV_PROCESSING_DELAY)
else: else:
logger.debug(f"Worker {worker_id} - Elaborazione completata correttamente") await asyncio.sleep(NO_RECORD_SLEEP)
await asyncio.sleep(CSV_PROCESSING_DELAY)
# Segnala che il lavoro è completato
queue.task_done()
except Exception as e: except Exception as e:
logger.error(f"Worker {worker_id} - Errore durante l'esecuzione: {e}", exc_info=debug_mode) logger.error(f"Worker {worker_id} - Errore durante l'esecuzione: {e}", exc_info=debug_mode)
queue.task_done() await asyncio.sleep(1)
async def load_csv(record: tuple, cfg: object, worker_id: int, pool) -> bool:
async def load_csv(cfg: object, worker_id: int) -> tuple:
"""
Cerca e carica un file CSV da elaborare dal database.
Args:
cfg (object): Oggetto configurazione contenente dati per DB e altro.
Returns:
bool: True se è stato trovato ed elaborato un record, False altrimenti.
"""
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG)
logger.debug(f"Worker {worker_id} - Inizio ricerca nuovo CSV da elaborare") logger.debug(f"Worker {worker_id} - Inizio ricerca nuovo CSV da elaborare")
try: id, unit_type, tool_type, unit_name, tool_name = record
with connetti_db(cfg) as conn: logger.info(f"Worker {worker_id} - Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}")
cur = conn.cursor()
logger.debug(f"Worker {worker_id} - Connessione al database stabilita")
query = f""" # Costruisce il nome del modulo da caricare dinamicamente
SELECT id, unit_type, tool_type, unit_name, tool_name module_names = [f'utils.parsers.by_name.{unit_name.lower()}_{tool_name.lower()}',
FROM {cfg.dbname}.{cfg.dbrectable} f'utils.parsers.by_name.{unit_name.lower()}_{tool_type.lower()}',
WHERE locked = 0 AND status = {CSV_RECEIVED} f'utils.parsers.by_name.{unit_name.lower()}_all',
LIMIT 1 f'utils.parsers.by_type.{unit_type.lower()}_{tool_type.lower()}']
""" modulo = None
logger.debug(f"Worker {worker_id} - Esecuzione query: {query}") for module_name in module_names:
cur.execute(query) try:
result = cur.fetchone() logger.debug(f"Worker {worker_id} - Caricamento dinamico del modulo: {module_name}")
modulo = importlib.import_module(module_name)
logger.debug(f"Worker {worker_id} - Funzione 'main_loader' caricata dal modulo {module_name}")
break
except (ImportError, AttributeError) as e:
logger.info(f"Worker {worker_id} - Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode)
if result: if not modulo:
id, unit_type, tool_type, unit_name, tool_name = result logger.error(f"Worker {worker_id} - Nessun modulo trovato {module_names}")
logger.info(f"Worker {worker_id} - Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") return False
lock_query = f"UPDATE {cfg.dbname}.{cfg.dbrectable} SET locked = 1 WHERE id = {id}" # Ottiene la funzione 'main_loader' dal modulo
logger.debug(f"Worker {worker_id} - Esecuzione lock del record: {lock_query}")
cur.execute(lock_query)
conn.commit()
# Costruisce il nome del modulo da caricare dinamicamente funzione = getattr(modulo, "main_loader")
module_names = [f'utils.parsers.by_name.{unit_name.lower()}_{tool_name.lower()}',
f'utils.parsers.by_name.{unit_name.lower()}_{tool_type.lower()}',
f'utils.parsers.by_name.{unit_name.lower()}_all',
f'utils.parsers.by_type.{unit_type.lower()}_{tool_type.lower()}']
modulo = None
for module_name in module_names:
try:
logger.debug(f"Worker {worker_id} - Caricamento dinamico del modulo: {module_name}")
modulo = importlib.import_module(module_name)
logger.debug(f"Worker {worker_id} - Funzione 'main_loader' caricata dal modulo {module_name}")
except (ImportError, AttributeError) as e:
logger.info(f"Worker {worker_id} - Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode)
if not modulo: # Esegui la funzione
logger.error(f"Worker {worker_id} - Nessun modulo trovato {module_names}")
return True, False
# Ottiene la funzione 'main_loader' dal modulo logger.info(f"Worker {worker_id} - Elaborazione con modulo {modulo} per ID={id}")
await funzione(cfg, id, pool)
logger.info(f"Worker {worker_id} - Elaborazione completata per ID={id}")
return True
funzione = getattr(modulo, "main_loader")
# Esegui la funzione
await funzione(cfg, id)
logger.info(f"Worker {worker_id} - Elaborazione completata per ID={id}")
return True, True
else:
logger.debug(f"Worker {worker_id} - Nessun record disponibile per l'elaborazione")
return False, False
except mysql.connector.Error as e:
logger.error(f"Worker {worker_id} - Errore database: {e}", exc_info=debug_mode)
return False, False
async def main(): async def main():
@@ -150,45 +125,36 @@ async def main():
) )
logger.info("Logging configurato correttamente") logger.info("Logging configurato correttamente")
# Crea una coda di lavoro illimitata
queue = asyncio.Queue(maxsize=cfg.max_threads * 2 or 20)
logger.debug("Coda di lavoro creata")
# Numero massimo di worker concorrenti # Numero massimo di worker concorrenti
num_workers = cfg.max_threads logger.info(f"Avvio di {cfg.max_threads} worker concorrenti")
logger.info(f"Avvio di {num_workers} worker concorrenti")
pool = await aiomysql.create_pool(
host=cfg.dbhost,
user=cfg.dbuser,
password=cfg.dbpass,
db=cfg.dbname,
minsize=1,
maxsize=cfg.max_threads*4
)
# Avvia i worker # Avvia i worker
workers = [ workers = [
asyncio.create_task(worker(i, queue, cfg)) for i in range(num_workers) asyncio.create_task(worker(i, cfg, pool))
for i in range(cfg.max_threads)
] ]
logger.info("Sistema avviato correttamente. In attesa di nuovi task...") logger.info("Sistema avviato correttamente. In attesa di nuovi task...")
# Ciclo infinito per aggiungere lavori alla coda try:
while True: await asyncio.gather(*workers, return_exceptions=debug_mode)
logger.debug("Aggiunta di un nuovo lavoro alla coda") finally:
await queue.put(None) pool.close()
await pool.wait_closed()
# Breve attesa prima di aggiungere un altro lavoro
await asyncio.sleep(0.5)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("Info: Shutdown richiesto... chiusura in corso") logger.info("Info: Shutdown richiesto... chiusura in corso")
# Attendi che tutti i lavori pendenti siano completati
logger.info("Attesa completamento dei task in coda...")
await queue.join()
# Ferma i worker
logger.info("Chiusura dei worker in corso...")
for task in workers:
task.cancel()
await asyncio.gather(*workers, return_exceptions=debug_mode)
logger.info("Info: Tutti i task terminati. Uscita.")
except Exception as e: except Exception as e:
logger.error(f"Errore principale: {e}", exc_info=debug_mode) logger.error(f"Errore principale: {e}", exc_info=debug_mode)

View File

@@ -1,5 +1,4 @@
#!.venv/bin/python #!.venv/bin/python
from utils.database.connection import connetti_db
from utils.database.nodes_query import get_nodes_type from utils.database.nodes_query import get_nodes_type
import utils.timestamp.date_check as date_check import utils.timestamp.date_check as date_check
import logging import logging
@@ -9,31 +8,15 @@ from itertools import islice
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_data(cfg: object, id: int) -> tuple: async def get_data(cfg: object, id: int, pool) -> tuple:
""" async with pool.acquire() as conn:
Retrieves data for a specific tool from the database. async with conn.cursor() as cur:
await cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}')
unit_name, tool_name, tool_data = await cur.fetchone()
This function connects to the database using the provided configuration, return unit_name, tool_name, tool_data
executes a query to retrieve the unit name, tool name ID, and tool data
associated with the given ID from the raw data table, and returns the results.
Args: async def make_pipe_sep_matrix(cfg: object, id: int, pool) -> list:
cfg: A configuration object containing database connection parameters
and table names (cfg.dbname, cfg.dbrectable).
id: The ID of the tool record to retrieve.
Returns:
A tuple containing the unit name, tool name ID, and tool data.
"""
with connetti_db(cfg) as conn:
cur = conn.cursor()
cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbname}.{cfg.dbrectable} where id = {id}')
unit_name, tool_name, tool_data = cur.fetchone()
cur.close()
conn.close()
return unit_name, tool_name, tool_data
def make_pipe_sep_matrix(cfg: object, id: int) -> list:
""" """
Processes raw tool data and transforms it into a matrix format for database insertion. Processes raw tool data and transforms it into a matrix format for database insertion.
@@ -55,7 +38,7 @@ def make_pipe_sep_matrix(cfg: object, id: int) -> list:
EventTime, BatLevel, Temperature, followed by up to 16 additional EventTime, BatLevel, Temperature, followed by up to 16 additional
measurement values (Val0 to ValF), padded with None if necessary. measurement values (Val0 to ValF), padded with None if necessary.
""" """
UnitName, ToolNameID, ToolData = get_data(cfg, id) UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
righe = ToolData.splitlines() righe = ToolData.splitlines()
matrice_valori = [] matrice_valori = []
for riga in [riga for riga in righe if ';|;' in riga]: for riga in [riga for riga in righe if ';|;' in riga]:
@@ -68,7 +51,7 @@ def make_pipe_sep_matrix(cfg: object, id: int) -> list:
return matrice_valori return matrice_valori
def make_ain_din_matrix(cfg: object, id: int) -> list: async def make_ain_din_matrix(cfg: object, id: int, pool) -> list:
""" """
Processes raw location (LOC) tool data and transforms it into a matrix format for database insertion. Processes raw location (LOC) tool data and transforms it into a matrix format for database insertion.
@@ -90,7 +73,7 @@ def make_ain_din_matrix(cfg: object, id: int) -> list:
A list of lists (matrix) representing the processed LOC data. Each inner A list of lists (matrix) representing the processed LOC data. Each inner
list contains data fields similar to `make_matrix`, adjusted for LOC data. list contains data fields similar to `make_matrix`, adjusted for LOC data.
""" """
UnitName, ToolNameID, ToolData = get_data(cfg, id) UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
righe = ToolData.splitlines() righe = ToolData.splitlines()
matrice_valori = [] matrice_valori = []
@@ -113,8 +96,8 @@ def make_ain_din_matrix(cfg: object, id: int) -> list:
return matrice_valori return matrice_valori
def make_channels_matrix(cfg: object, id: int) -> list: async def make_channels_matrix(cfg: object, id: int, pool) -> list:
UnitName, ToolNameID, ToolData = get_data(cfg, id) UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
righe = ToolData.splitlines() righe = ToolData.splitlines()
matrice_valori = [] matrice_valori = []

View File

@@ -6,7 +6,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, action: str) -> None: async def main_loader(cfg: object, id: int, pool, action: str) -> None:
type_matrix_mapping = { type_matrix_mapping = {
"pipe_separator": make_pipe_sep_matrix, "pipe_separator": make_pipe_sep_matrix,
"analogic_digital": make_ain_din_matrix, "analogic_digital": make_ain_din_matrix,
@@ -15,11 +15,11 @@ async def main_loader(cfg: object, id: int, action: str) -> None:
if action in type_matrix_mapping: if action in type_matrix_mapping:
function_to_call = type_matrix_mapping[action] function_to_call = type_matrix_mapping[action]
# Create a matrix of values from the data # Create a matrix of values from the data
matrice_valori = function_to_call(cfg, id) matrice_valori = await function_to_call(cfg, id, pool)
logger.info("matrice valori creata") logger.info("matrice valori creata")
# Load the data into the database # Load the data into the database
if load_data(cfg, matrice_valori): if await load_data(cfg, matrice_valori, pool):
update_status(cfg, id, DATA_LOADED) await update_status(cfg, id, DATA_LOADED, pool)
else: else:
logger.warning(f"Action '{action}' non riconosciuta.") logger.warning(f"Action '{action}' non riconosciuta.")

View File

@@ -1,5 +1,4 @@
#!.venv/bin/python #!.venv/bin/python
from utils.database.connection import connetti_db
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -7,12 +6,12 @@ logger = logging.getLogger(__name__)
timestamp_cols = ['inserted_at', 'loaded_at', 'elaborated_at'] timestamp_cols = ['inserted_at', 'loaded_at', 'elaborated_at']
def load_data(cfg: object, matrice_valori: list) -> bool : async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
if not matrice_valori: if not matrice_valori:
logger.info("Nulla da caricare.") logger.info("Nulla da caricare.")
return True return True
sql_insert_RAWDATA = f''' sql_insert_RAWDATA = f'''
INSERT IGNORE INTO {cfg.dbname}.{cfg.dbrawdata} ( INSERT INTO {cfg.dbrawdata} (
`UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`,
`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`,
`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`, `Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`,
@@ -23,44 +22,67 @@ def load_data(cfg: object, matrice_valori: list) -> bool :
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s %s, %s, %s
) ) as new_data
ON DUPLICATE KEY UPDATE
`BatLevel` = IF({cfg.dbrawdata}.`BatLevel` != new_data.`BatLevel`, new_data.`BatLevel`, {cfg.dbrawdata}.`BatLevel`),
`Temperature` = IF({cfg.dbrawdata}.`Temperature` != new_data.Temperature, new_data.Temperature, {cfg.dbrawdata}.`Temperature`),
`Val0` = IF({cfg.dbrawdata}.`Val0` != new_data.Val0, new_data.Val0, {cfg.dbrawdata}.`Val0`),
`Val1` = IF({cfg.dbrawdata}.`Val1` != new_data.Val1, new_data.Val1, {cfg.dbrawdata}.`Val1`),
`Val2` = IF({cfg.dbrawdata}.`Val2` != new_data.Val2, new_data.Val2, {cfg.dbrawdata}.`Val2`),
`Val3` = IF({cfg.dbrawdata}.`Val3` != new_data.Val3, new_data.Val3, {cfg.dbrawdata}.`Val3`),
`Val4` = IF({cfg.dbrawdata}.`Val4` != new_data.Val4, new_data.Val4, {cfg.dbrawdata}.`Val4`),
`Val5` = IF({cfg.dbrawdata}.`Val5` != new_data.Val5, new_data.Val5, {cfg.dbrawdata}.`Val5`),
`Val6` = IF({cfg.dbrawdata}.`Val6` != new_data.Val6, new_data.Val6, {cfg.dbrawdata}.`Val6`),
`Val7` = IF({cfg.dbrawdata}.`Val7` != new_data.Val7, new_data.Val7, {cfg.dbrawdata}.`Val7`),
`Val8` = IF({cfg.dbrawdata}.`Val8` != new_data.Val8, new_data.Val8, {cfg.dbrawdata}.`Val8`),
`Val9` = IF({cfg.dbrawdata}.`Val9` != new_data.Val9, new_data.Val9, {cfg.dbrawdata}.`Val9`),
`ValA` = IF({cfg.dbrawdata}.`ValA` != new_data.ValA, new_data.ValA, {cfg.dbrawdata}.`ValA`),
`ValB` = IF({cfg.dbrawdata}.`ValB` != new_data.ValB, new_data.ValB, {cfg.dbrawdata}.`ValB`),
`ValC` = IF({cfg.dbrawdata}.`ValC` != new_data.ValC, new_data.ValC, {cfg.dbrawdata}.`ValC`),
`ValD` = IF({cfg.dbrawdata}.`ValD` != new_data.ValD, new_data.ValD, {cfg.dbrawdata}.`ValD`),
`ValE` = IF({cfg.dbrawdata}.`ValE` != new_data.ValE, new_data.ValE, {cfg.dbrawdata}.`ValE`),
`ValF` = IF({cfg.dbrawdata}.`ValF` != new_data.ValF, new_data.ValF, {cfg.dbrawdata}.`ValF`),
`BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule, {cfg.dbrawdata}.`BatLevelModule`),
`TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`),
`RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`),
`Created_at` = NOW()
''' '''
with connetti_db(cfg) as conn:
cur = conn.cursor()
try:
cur.executemany(sql_insert_RAWDATA, matrice_valori)
conn.commit()
logging.info("Data loaded.")
rc = True
except Exception as e:
conn.rollback()
logging.error(f"Error: {e}.")
rc = False
finally:
conn.close()
return rc
def update_status(cfg: object, id: int, status: int) -> None: async with pool.acquire() as conn:
with connetti_db(cfg) as conn: async with conn.cursor() as cur:
cur = conn.cursor() try:
try: await cur.executemany(sql_insert_RAWDATA, matrice_valori)
cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 0, status = {status}, {timestamp_cols[status]} = now() where id = {id}') await conn.commit()
conn.commit() logging.info("Data loaded.")
logging.info("Status updated.") rc = True
except Exception as e: except Exception as e:
conn.rollback() await conn.rollback()
logging.error(f'Error: {e}') logging.error(f"Error: {e}.")
rc = False
finally:
return rc
def get_matlab_cmd(cfg: object, unit: str, tool: str) -> tuple: async def update_status(cfg: object, id: int, status: int, pool) -> None:
with connetti_db(cfg) as conn: async with pool.acquire() as conn:
cur = conn.cursor() async with conn.cursor() as cur:
try: try:
cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate await cur.execute(f'update {cfg.dbrectable} set locked = 0, status = {status}, {timestamp_cols[status]} = now() where id = {id}')
from matfuncs as m await conn.commit()
inner join tools as t on t.matfunc = m.id logging.info("Status updated.")
inner join units as u on u.id = t.unit_id except Exception as e:
inner join statustools as s on t.statustool_id = s.id await conn.rollback()
where t.name = "{tool}" and u.name = "{unit}"''') logging.error(f'Error: {e}')
return cur.fetchone()
except Exception as e: async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple:
logging.error(f'Error: {e}') async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate
from matfuncs as m
inner join tools as t on t.matfunc = m.id
inner join units as u on u.id = t.unit_id
inner join statustools as s on t.statustool_id = s.id
where t.name = "{tool}" and u.name = "{unit}"''')
return cur.fetchone()
except Exception as e:
logging.error(f'Error: {e}')

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as channels_main_loader from utils.csv.loaders import main_loader as channels_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await channels_main_loader(cfg, id, "channels") await channels_main_loader(cfg, id, pool,"channels")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await pipe_sep_main_loader(cfg, id, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await pipe_sep_main_loader(cfg, id, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as analog_dig_main_loader from utils.csv.loaders import main_loader as analog_dig_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await analog_dig_main_loader(cfg, id, "analogic_digital") await analog_dig_main_loader(cfg, id, pool, "analogic_digital")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await pipe_sep_main_loader(cfg, id, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as channels_main_loader from utils.csv.loaders import main_loader as channels_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await channels_main_loader(cfg, id, "channels") await channels_main_loader(cfg, id, pool, "channels")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await pipe_sep_main_loader(cfg, id, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as analog_dig_main_loader from utils.csv.loaders import main_loader as analog_dig_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await analog_dig_main_loader(cfg, id, "analogic_digital") await analog_dig_main_loader(cfg, id, pool, "analogic_digital")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await pipe_sep_main_loader(cfg, id, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await pipe_sep_main_loader(cfg, id, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as channels_main_loader from utils.csv.loaders import main_loader as channels_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await channels_main_loader(cfg, id, "channels") await channels_main_loader(cfg, id, pool, "channels")

View File

@@ -1,4 +1,4 @@
from .tlp_tlp import main_loader as tlp_tlp_main_loader from .tlp_tlp import main_loader as tlp_tlp_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await tlp_tlp_main_loader(cfg, id) await tlp_tlp_main_loader(cfg, id)

View File

@@ -1,4 +1,4 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int) -> None: async def main_loader(cfg: object, id: int, pool) -> None:
await pipe_sep_main_loader(cfg, id, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")