#!.venv/bin/python # Import necessary libraries import mysql.connector import logging import importlib import time import asyncio import subprocess # Import custom modules for configuration and database connection from utils.config import loader_ftp_csv as setting from utils.database.connection import connetti_db from utils.database.loader_action import get_matlab_cmd from utils.database import DATA_LOADED # Initialize the logger for this module logger = logging.getLogger(__name__) # Function to elaborate CSV data async def run_matlab_elab(id: int, unit_name: str, unit_type: str, tool_name: str, tool_type: str, semaphore: asyncio.Semaphore) -> bool: async with semaphore: if get_matlab_cmd(cfg, unit_name, tool_name): # If a record is found, lock it by updating the 'locked' field to 1 async def main(): # Load the configuration settings cfg = setting.Config() try: # Configure logging to write log messages to a file with a specific format logging.basicConfig( format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", filename=cfg.logfilename, level=logging.INFO, ) # Limita il numero di esecuzioni concorrenti a max_threads semaphore = asyncio.Semaphore(cfg.max_threads) running_tasks = set() # Enter an infinite loop to continuously process records while True: try: # Establish a database connection with connetti_db(cfg) as conn: cur = conn.cursor() # Select a single record from the raw data table that is not currently locked and has a status of 0 cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {DATA_LOADED} limit 1') id, unit_name, unit_type, tool_name, tool_type = cur.fetchone() if id: task = asyncio.create_task(run_matlab_elab(id, unit_name, unit_type, tool_name, tool_type, semaphore)) running_tasks.add(task) # Rimuovi i task completati dal set running_tasks = {t for t in running_tasks if not t.done()} # If a record was successfully processed, log the number of threads currently running #logger.info(f"Threads in execution: {len(threads)}") except Exception as e: logger.info(f"Error: {e}.") except KeyboardInterrupt: # Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program logger.info("Info: Shutdown requested...exiting") except Exception as e: logger.info(f"Error: {e}.") if __name__ == "__main__": asyncio.run(main())