diff --git a/src/send_orchestrator.py b/src/send_orchestrator.py index 7ccfcd2..2924825 100755 --- a/src/send_orchestrator.py +++ b/src/send_orchestrator.py @@ -9,10 +9,10 @@ from utils.config import loader_send_data as setting from utils.database import WorkflowFlags from utils.csv.loaders import get_next_csv_atomic from utils.orchestrator_utils import run_orchestrator, worker_context -from utils.database.loader_action import update_status, unlock -from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp +from utils.database.action_query import process_workflow_record from utils.general import alterna_valori -#from utils.ftp.elab_send import send_csv_to_customer +#from utils.ftp.send_data import ftp_send_elab_csv_to_customer, api_send_elab_csv_to_customer, ftp_send_raw_csv_to_customer, api_send_raw_csv_to_customer + # Initialize the logger for this module @@ -41,22 +41,8 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: record = await get_next_csv_atomic(pool, cfg.dbrectable, status, fase) if record: - id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] - tool_elab_info = await get_tool_info(fase, unit_name.upper(), tool_name.upper(), pool) - if fase == WorkflowFlags.SENT_ELAB_DATA and tool_elab_info['ftp_send']: - timestamp_matlab_elab = get_elab_timestamp(id, pool) - if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab: - if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): - print(elab_csv) - #if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool): - if True: - await update_status(cfg, id, WorkflowFlags.SENT_ELAB_DATA, pool) - else: - logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.") - elif fase == WorkflowFlags.SENT_RAW_DATA and tool_elab_info['ftp_send_raw']: - ... - - await unlock(cfg, id, pool) + await process_workflow_record(record, fase, cfg, pool) + await asyncio.sleep(ELAB_PROCESSING_DELAY) else: logger.info("Nessun record disponibile") await asyncio.sleep(NO_RECORD_SLEEP) diff --git a/src/utils/ftp/send_data.py b/src/utils/ftp/send_data.py index 4b6bacb..cbb58cc 100644 --- a/src/utils/ftp/send_data.py +++ b/src/utils/ftp/send_data.py @@ -2,6 +2,11 @@ from ftplib import FTP, FTP_TLS, all_errors from io import BytesIO import logging import aiomysql +import datetime + +from utils.database.loader_action import update_status, unlock +from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp +from utils.database import WorkflowFlags logger = logging.getLogger(__name__) @@ -39,11 +44,11 @@ class FTPConnection: def __exit__(self, exc_type, exc_val, exc_tb): self.ftp.quit() -async def send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: +async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool: None return True -async def send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: +async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: """ Sends elaborated CSV data to a customer via FTP. @@ -79,7 +84,7 @@ async def send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, cs csv_bytes = csv_data.encode('utf-8') csv_buffer = BytesIO(csv_bytes) - ftp_parms = parse_ftp_parms(send_ftp_info["ftp_parm"]) + ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"]) use_tls = 'ssl_version' in ftp_parms passive = ftp_parms.get('passive', True) port = ftp_parms.get('port', 21) @@ -110,7 +115,7 @@ async def send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, cs finally: csv_buffer.close() -def parse_ftp_parms(ftp_parms: str) -> dict: +async def parse_ftp_parms(ftp_parms: str) -> dict: """ Parses a string of FTP parameters into a dictionary. @@ -140,3 +145,195 @@ def parse_ftp_parms(ftp_parms: str) -> dict: result[key] = value return result + + +async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: object): + """ + Elabora un singolo record del workflow in base alla fase specificata. + + Args: + record: Tupla contenente i dati del record + fase: Fase corrente del workflow + cfg: Configurazione + pool: Pool di connessioni al database + """ + # Estrazione e normalizzazione dei dati del record + id, unit_type, tool_type, unit_name, tool_name = [ + x.lower().replace(" ", "_") if isinstance(x, str) else x + for x in record + ] + + try: + # Recupero informazioni principali + tool_elab_info = await get_tool_info(fase, unit_name.upper(), tool_name.upper(), pool) + timestamp_matlab_elab = await get_elab_timestamp(id, pool) + + # Verifica se il processing può essere eseguito + if not _should_process(tool_elab_info, timestamp_matlab_elab): + logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: " + "elaborazione saltata - due date raggiunta.") + return + + # Routing basato sulla fase + success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, + timestamp_matlab_elab, pool) + + if success: + await update_status(cfg, id, fase, pool) + + except Exception as e: + logger.error(f"Errore durante elaborazione id {id} - {unit_name} - {tool_name}: {e}") + raise + finally: + await unlock(cfg, id, pool) + + +def _should_process(tool_elab_info, timestamp_matlab_elab): + """Verifica se il record può essere processato basandosi sulla due date.""" + duedate = tool_elab_info.get("duedate") + + if not duedate or duedate in ('0000-00-00 00:00:00', ''): + return True + + # Se timestamp_matlab_elab è None/null, usa il timestamp corrente + comparison_timestamp = timestamp_matlab_elab if timestamp_matlab_elab is not None else datetime.now() + + return duedate > comparison_timestamp + + +async def _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, + timestamp_matlab_elab, pool): + """ + Gestisce il routing delle operazioni in base alla fase del workflow. + + Returns: + bool: True se l'operazione è riuscita, False altrimenti + """ + if fase == WorkflowFlags.SENT_ELAB_DATA: + return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, + tool_name, timestamp_matlab_elab, pool) + + elif fase == WorkflowFlags.SENT_RAW_DATA: + return await _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, + tool_name, pool) + + else: + logger.info(f"id {id} - {unit_name} - {tool_name}: nessuna azione da eseguire.") + return True + + +async def _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, + timestamp_matlab_elab, pool): + """Gestisce la fase di invio dati elaborati.""" + + # FTP send per dati elaborati + if tool_elab_info.get('ftp_send'): + return await _send_elab_data_ftp(cfg, id, unit_name, tool_name, + timestamp_matlab_elab, pool) + + # API send per dati elaborati + elif _should_send_elab_api(tool_elab_info): + return await _send_elab_data_api(cfg, id, unit_name, tool_name, + timestamp_matlab_elab, pool) + + return True + + +async def _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, pool): + """Gestisce la fase di invio dati raw.""" + + # FTP send per dati raw + if tool_elab_info.get('ftp_send_raw'): + return await _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool) + + # API send per dati raw + elif _should_send_raw_api(tool_elab_info): + return await _send_raw_data_api(cfg, id, unit_name, tool_name, pool) + + return True + + +def _should_send_elab_api(tool_elab_info): + """Verifica se i dati elaborati devono essere inviati via API.""" + return (tool_elab_info.get('inoltro_api') and + tool_elab_info.get('api_send') and + tool_elab_info.get('inoltro_api_url', '').strip()) + + +def _should_send_raw_api(tool_elab_info): + """Verifica se i dati raw devono essere inviati via API.""" + return (tool_elab_info.get('inoltro_api_raw') and + tool_elab_info.get('api_send_raw') and + tool_elab_info.get('inoltro_api_url_raw', '').strip()) + + +async def _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): + """Invia dati elaborati via FTP.""" + try: + elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, + timestamp_matlab_elab, pool) + if not elab_csv: + return False + + print(elab_csv) + # if await send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool): + if True: # Placeholder per test + return True + else: + logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP fallito.") + return False + + except Exception as e: + logger.error(f"Errore invio FTP elab data id {id}: {e}") + return False + + +async def _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): + """Invia dati elaborati via API.""" + try: + elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, + timestamp_matlab_elab, pool) + if not elab_csv: + return False + + print(elab_csv) + # if await send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool): + if True: # Placeholder per test + return True + else: + logger.error(f"id {id} - {unit_name} - {tool_name}: invio API fallito.") + return False + + except Exception as e: + logger.error(f"Errore invio API elab data id {id}: {e}") + return False + + +async def _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool): + """Invia dati raw via FTP.""" + try: + # if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): + if True: # Placeholder per test + return True + else: + logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito.") + return False + + except Exception as e: + logger.error(f"Errore invio FTP raw data id {id}: {e}") + return False + + +async def _send_raw_data_api(cfg, id, unit_name, tool_name, pool): + """Invia dati raw via API.""" + try: + # if await api_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): + if True: # Placeholder per test + return True + else: + logger.error(f"id {id} - {unit_name} - {tool_name}: invio API raw fallito.") + return False + + except Exception as e: + logger.error(f"Errore invio API raw data id {id}: {e}") + return False \ No newline at end of file