Files
ASE/orchestrator.py
2025-05-01 00:58:07 +02:00

71 lines
2.2 KiB
Python
Executable File

#!.venv/bin/python
import mysql.connector
import logging
import importlib
import time
import threading
from utils.config import loader as setting
from utils.database.connection import connetti_db
logger = logging.getLogger(__name__)
def elab_csv(cfg, threads):
try:
with connetti_db(cfg) as conn:
cur = conn.cursor()
cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = 0 limit 1')
id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone()
if id:
cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}')
module_name = f'utils.parsers.{unit_type.lower()}_{tool_type.lower()}'
modulo = importlib.import_module(module_name)
funzione = getattr(modulo, "main_loader")
# Chiamare la funzione
thread = threading.Thread(target = funzione, args=(cfg, id))
threads.append(thread)
thread.start()
return True
else:
time.sleep(20)
return False
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
def main():
# Load the configuration settings
cfg = setting.Config()
try:
# Configura la connessione al database PostgreSQL
# Configure logging
logging.basicConfig(
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
filename=cfg.logfilename,
level=logging.INFO,
)
threads = []
while True:
while len(threads) > cfg.max_threads:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
if elab_csv(cfg, threads):
logger.info(f"Threads in execution: {len(threads)}")
pass
except KeyboardInterrupt:
logger.info("Info: Shutdown requested...exiting")
except Exception as e:
logger.info(f"Error: {e}.")
if __name__ == "__main__":
main()