From 2b976d06b33d58d4a0760a0043c7065e79b74dd0 Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 11 Aug 2025 22:59:38 +0200 Subject: [PATCH] util ftp renamed connect --- env/send.ini | 2 +- src/ftp_csv_receiver.py | 2 +- src/send_orchestrator.py | 2 +- src/utils/{ftp => connect}/__init__.py | 0 src/utils/{ftp => connect}/file_management.py | 0 src/utils/{ftp => connect}/send_data.py | 54 ++++++++++++++----- src/utils/{ftp => connect}/user_admin.py | 0 src/utils/database/action_query.py | 2 +- 8 files changed, 45 insertions(+), 17 deletions(-) rename src/utils/{ftp => connect}/__init__.py (100%) rename src/utils/{ftp => connect}/file_management.py (100%) rename src/utils/{ftp => connect}/send_data.py (86%) rename src/utils/{ftp => connect}/user_admin.py (100%) diff --git a/env/send.ini b/env/send.ini index 2bbc6ca..ff45a2c 100644 --- a/env/send.ini +++ b/env/send.ini @@ -2,4 +2,4 @@ logFilename = ../logs/send_data.log [threads] - max_num = 5 + max_num = 30 diff --git a/src/ftp_csv_receiver.py b/src/ftp_csv_receiver.py index e46187a..907d798 100755 --- a/src/ftp_csv_receiver.py +++ b/src/ftp_csv_receiver.py @@ -8,7 +8,7 @@ from pathlib import Path from utils.config import loader_ftp_csv as setting from utils.database.connection import connetti_db -from utils.ftp import user_admin, file_management +from utils.connect import user_admin, file_management from pyftpdlib.handlers import FTPHandler from pyftpdlib.servers import FTPServer diff --git a/src/send_orchestrator.py b/src/send_orchestrator.py index 2924825..83fd0b8 100755 --- a/src/send_orchestrator.py +++ b/src/send_orchestrator.py @@ -9,7 +9,7 @@ from utils.config import loader_send_data as setting from utils.database import WorkflowFlags from utils.csv.loaders import get_next_csv_atomic from utils.orchestrator_utils import run_orchestrator, worker_context -from utils.database.action_query import process_workflow_record +from utils.connect.send_data import process_workflow_record from utils.general import alterna_valori #from utils.ftp.send_data import ftp_send_elab_csv_to_customer, api_send_elab_csv_to_customer, ftp_send_raw_csv_to_customer, api_send_raw_csv_to_customer diff --git a/src/utils/ftp/__init__.py b/src/utils/connect/__init__.py similarity index 100% rename from src/utils/ftp/__init__.py rename to src/utils/connect/__init__.py diff --git a/src/utils/ftp/file_management.py b/src/utils/connect/file_management.py similarity index 100% rename from src/utils/ftp/file_management.py rename to src/utils/connect/file_management.py diff --git a/src/utils/ftp/send_data.py b/src/utils/connect/send_data.py similarity index 86% rename from src/utils/ftp/send_data.py rename to src/utils/connect/send_data.py index cbb58cc..5f4c76e 100644 --- a/src/utils/ftp/send_data.py +++ b/src/utils/connect/send_data.py @@ -2,7 +2,7 @@ from ftplib import FTP, FTP_TLS, all_errors from io import BytesIO import logging import aiomysql -import datetime +from datetime import datetime from utils.database.loader_action import update_status, unlock from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp @@ -77,7 +77,7 @@ async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str send_ftp_info = await cur.fetchone() logger.info(f"id {id} - {unit} - {tool}: estratti i dati per invio via ftp") except Exception as e: - logger.error(f"id {id} - {unit} - {tool} - errore nel query per invio ftp: {e}") + logger.error(f"id {id} - {unit} - {tool} - errore nella query per invio ftp: {e}") try: # Converti in bytes @@ -166,19 +166,24 @@ async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: obj try: # Recupero informazioni principali tool_elab_info = await get_tool_info(fase, unit_name.upper(), tool_name.upper(), pool) - timestamp_matlab_elab = await get_elab_timestamp(id, pool) + if tool_elab_info: + timestamp_matlab_elab = await get_elab_timestamp(id, pool) - # Verifica se il processing può essere eseguito - if not _should_process(tool_elab_info, timestamp_matlab_elab): - logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: " - "elaborazione saltata - due date raggiunta.") - return + # Verifica se il processing può essere eseguito + if not _should_process(tool_elab_info, timestamp_matlab_elab): + logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: " + "invio dati non eseguito - due date raggiunta.") - # Routing basato sulla fase - success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, - timestamp_matlab_elab, pool) + await update_status(cfg, id, fase, pool) + return - if success: + # Routing basato sulla fase + success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, + timestamp_matlab_elab, pool) + + if success: + await update_status(cfg, id, fase, pool) + else: await update_status(cfg, id, fase, pool) except Exception as e: @@ -188,7 +193,7 @@ async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: obj await unlock(cfg, id, pool) -def _should_process(tool_elab_info, timestamp_matlab_elab): +def _should_process_old(tool_elab_info, timestamp_matlab_elab): """Verifica se il record può essere processato basandosi sulla due date.""" duedate = tool_elab_info.get("duedate") @@ -201,6 +206,29 @@ def _should_process(tool_elab_info, timestamp_matlab_elab): return duedate > comparison_timestamp +def _should_process(tool_elab_info, timestamp_matlab_elab): + """Verifica se il record può essere processato basandosi sulla due date.""" + duedate = tool_elab_info.get("duedate") + + # Se non c'è duedate o è vuota/nulla, può essere processato + if not duedate or duedate in ('0000-00-00 00:00:00', ''): + return True + + # Se timestamp_matlab_elab è None/null, usa il timestamp corrente + comparison_timestamp = timestamp_matlab_elab if timestamp_matlab_elab is not None else datetime.now() + + # Converti duedate in datetime se è una stringa + if isinstance(duedate, str): + duedate = datetime.strptime(duedate, '%Y-%m-%d %H:%M:%S') + + # Assicurati che comparison_timestamp sia datetime + if isinstance(comparison_timestamp, str): + comparison_timestamp = datetime.strptime(comparison_timestamp, '%Y-%m-%d %H:%M:%S') + + return duedate > comparison_timestamp + + + async def _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): """ diff --git a/src/utils/ftp/user_admin.py b/src/utils/connect/user_admin.py similarity index 100% rename from src/utils/ftp/user_admin.py rename to src/utils/connect/user_admin.py diff --git a/src/utils/database/action_query.py b/src/utils/database/action_query.py index c4d1cd8..3db3cbf 100644 --- a/src/utils/database/action_query.py +++ b/src/utils/database/action_query.py @@ -58,7 +58,7 @@ async def get_tool_info(next_status: int, unit: str, tool: str, pool: object) -> result = await cur.fetchone() if not result: - logger.error(f"{unit} - {tool}: Tool info not found.") + logger.warning(f"{unit} - {tool}: Tool info not found.") return None else: return result