diff --git a/ftp_csv_receiver.ini b/ftp_csv_receiver.ini index 3b5c820..d0b9a43 100644 --- a/ftp_csv_receiver.ini +++ b/ftp_csv_receiver.ini @@ -19,7 +19,7 @@ [csvelab] logFilename = csvElab.log - max_threads = 5 + max_threads = 10 [db] hostname = 10.211.114.173 diff --git a/load_orchestrator.py b/load_orchestrator.py index 3057e9c..2224a2f 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -4,8 +4,7 @@ import mysql.connector import logging import importlib -import time -import threading +import asyncio # Import custom modules for configuration and database connection from utils.config import loader as setting @@ -16,14 +15,14 @@ from utils.database.loader_action import CSV_RECEIVED logger = logging.getLogger(__name__) # Function to elaborate CSV data -def load_csv(cfg: object, threads: list) -> bool: +async def load_csv(cfg: object) -> bool: try: # Establish a database connection with connetti_db(cfg) as conn: cur = conn.cursor() # Select a single record from the raw data table that is not currently locked and has a status of 0 - cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {CSV_RECEIVED} limit 1') - id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() + cur.execute(f'select id, unit_type, tool_type from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {CSV_RECEIVED} limit 1') + id, unit_type, tool_type = cur.fetchone() if id: # If a record is found, lock it by updating the 'locked' field to 1 cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}') @@ -36,27 +35,20 @@ def load_csv(cfg: object, threads: list) -> bool: # Get the 'main_loader' function from the imported module funzione = getattr(modulo, "main_loader") - # Create a new thread to execute the 'main_loader' function with the configuration and record ID - thread = threading.Thread(target = funzione, args=(cfg, id)) - # Add the thread to the list of active threads - threads.append(thread) - # Start the thread's execution - thread.start() - # Return True to indicate that a record was processed + funzione(cfg, id) + return True else: # If no record is found, wait for 20 seconds before attempting to fetch again - time.sleep(20) + await asyncio.sleep(20) # Return False to indicate that no record was processed return False - except mysql.connector.Error as e: # Handle database connection errors - print(f"Error: {e}") logger.error(f'{e}') -def main(): +async def main(): # Load the configuration settings cfg = setting.Config() @@ -67,30 +59,25 @@ def main(): filename=cfg.logfilename, level=logging.INFO, ) - # Initialize an empty list to keep track of active threads - threads = [] + + # Limita il numero di esecuzioni concorrenti a max_threads + semaphore = asyncio.Semaphore(cfg.max_threads) + # Enter an infinite loop to continuously process records while True: - # Check if the number of active threads exceeds the maximum allowed - while len(threads) > cfg.max_threads: - # Iterate through the list of threads - for thread in threads: - # If a thread is no longer alive (has finished execution) - if not thread.is_alive(): - # Remove it from the list of active threads - threads.remove(thread) - # Attempt to process a CSV record - if load_csv(cfg, threads): - # If a record was successfully processed, log the number of threads currently running - logger.info(f"Threads in execution: {len(threads)}") - pass + async with semaphore: + try: + await asyncio.create_task(load_csv(cfg)) + + except Exception as e: + logger.error(f"Error: {e}.") except KeyboardInterrupt: # Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program logger.info("Info: Shutdown requested...exiting") except Exception as e: - logger.info(f"Error: {e}.") + logger.error(f"Error: {e}.") if __name__ == "__main__": - main() \ No newline at end of file + asyncio.run(main()) \ No newline at end of file diff --git a/utils/config/loader.py b/utils/config/loader.py index 457873c..9bac85e 100644 --- a/utils/config/loader.py +++ b/utils/config/loader.py @@ -32,7 +32,7 @@ class Config: self.dbuser = c.get("db", "user") self.dbpass = c.get("db", "password") self.dbname = c.get("db", "dbName") - self.dbschema = c.get("db", "dbSchema") + #self.dbschema = c.get("db", "dbSchema") self.dbusertable = c.get("db", "userTableName") self.dbrectable = c.get("db", "recTableName") self.dbrawdata = c.get("db", "rawTableName") diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py index bb78700..d2b2962 100644 --- a/utils/database/loader_action.py +++ b/utils/database/loader_action.py @@ -8,6 +8,9 @@ CSV_RECEIVED = 0 DATA_LOADED = 1 DATA_ELABORATED = 2 +timestamp_cols = ['inserted_at', 'loaded_at', 'elaborated_at'] + + def load_data(cfg: object, matrice_valori: list) -> bool : sql_insert_RAWDATA = f''' INSERT IGNORE INTO {cfg.dbname}.{cfg.dbrawdata} ( @@ -42,8 +45,9 @@ def update_status(cfg: object, id: int, status: int) -> None: with connetti_db(cfg) as conn: cur = conn.cursor() try: - cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 0, status = {status} where id = {id}') + cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 0, status = {status}, {timestamp_cols[status]} = now() where id = {id}') conn.commit() + logging.info("Status updated.") except Exception as e: conn.rollback() logging.error(f'Error: {e}')