#!.venv/bin/python # Import necessary libraries import mysql.connector import logging import importlib import asyncio # Import custom modules for configuration and database connection from utils.config import loader as setting from utils.database.connection import connetti_db from utils.database.loader_action import CSV_RECEIVED # Initialize the logger for this module logger = logging.getLogger(__name__) # Function to elaborate CSV data async def load_csv(cfg: object) -> bool: try: # Establish a database connection with connetti_db(cfg) as conn: cur = conn.cursor() # Select a single record from the raw data table that is not currently locked and has a status of 0 cur.execute(f'select id, unit_type, tool_type from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {CSV_RECEIVED} limit 1') id, unit_type, tool_type = cur.fetchone() if id: # If a record is found, lock it by updating the 'locked' field to 1 cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}') # Construct the module name based on unit and tool types for dynamic import module_name = f'utils.parsers.{unit_type.lower()}_{tool_type.lower()}' # Dynamically import the module modulo = importlib.import_module(module_name) # Get the 'main_loader' function from the imported module funzione = getattr(modulo, "main_loader") funzione(cfg, id) return True else: # If no record is found, wait for 20 seconds before attempting to fetch again await asyncio.sleep(20) # Return False to indicate that no record was processed return False except mysql.connector.Error as e: # Handle database connection errors logger.error(f'{e}') async def main(): # Load the configuration settings cfg = setting.Config() try: # Configure logging to write log messages to a file with a specific format logging.basicConfig( format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", filename=cfg.logfilename, level=logging.INFO, ) # Limita il numero di esecuzioni concorrenti a max_threads semaphore = asyncio.Semaphore(cfg.max_threads) # Enter an infinite loop to continuously process records while True: async with semaphore: try: await asyncio.create_task(load_csv(cfg)) except Exception as e: logger.error(f"Error: {e}.") except KeyboardInterrupt: # Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program logger.info("Info: Shutdown requested...exiting") except Exception as e: logger.error(f"Error: {e}.") if __name__ == "__main__": asyncio.run(main())