fix send ftp e api
This commit is contained in:
@@ -9,10 +9,10 @@ from utils.config import loader_send_data as setting
|
|||||||
from utils.database import WorkflowFlags
|
from utils.database import WorkflowFlags
|
||||||
from utils.csv.loaders import get_next_csv_atomic
|
from utils.csv.loaders import get_next_csv_atomic
|
||||||
from utils.orchestrator_utils import run_orchestrator, worker_context
|
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||||
from utils.database.loader_action import update_status, unlock
|
from utils.database.action_query import process_workflow_record
|
||||||
from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp
|
|
||||||
from utils.general import alterna_valori
|
from utils.general import alterna_valori
|
||||||
#from utils.ftp.elab_send import send_csv_to_customer
|
#from utils.ftp.send_data import ftp_send_elab_csv_to_customer, api_send_elab_csv_to_customer, ftp_send_raw_csv_to_customer, api_send_raw_csv_to_customer
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Initialize the logger for this module
|
# Initialize the logger for this module
|
||||||
@@ -41,22 +41,8 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
|||||||
record = await get_next_csv_atomic(pool, cfg.dbrectable, status, fase)
|
record = await get_next_csv_atomic(pool, cfg.dbrectable, status, fase)
|
||||||
|
|
||||||
if record:
|
if record:
|
||||||
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
await process_workflow_record(record, fase, cfg, pool)
|
||||||
tool_elab_info = await get_tool_info(fase, unit_name.upper(), tool_name.upper(), pool)
|
await asyncio.sleep(ELAB_PROCESSING_DELAY)
|
||||||
if fase == WorkflowFlags.SENT_ELAB_DATA and tool_elab_info['ftp_send']:
|
|
||||||
timestamp_matlab_elab = get_elab_timestamp(id, pool)
|
|
||||||
if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab:
|
|
||||||
if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
|
|
||||||
print(elab_csv)
|
|
||||||
#if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
|
|
||||||
if True:
|
|
||||||
await update_status(cfg, id, WorkflowFlags.SENT_ELAB_DATA, pool)
|
|
||||||
else:
|
|
||||||
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.")
|
|
||||||
elif fase == WorkflowFlags.SENT_RAW_DATA and tool_elab_info['ftp_send_raw']:
|
|
||||||
...
|
|
||||||
|
|
||||||
await unlock(cfg, id, pool)
|
|
||||||
else:
|
else:
|
||||||
logger.info("Nessun record disponibile")
|
logger.info("Nessun record disponibile")
|
||||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||||
|
|||||||
@@ -2,6 +2,11 @@ from ftplib import FTP, FTP_TLS, all_errors
|
|||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import logging
|
import logging
|
||||||
import aiomysql
|
import aiomysql
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
from utils.database.loader_action import update_status, unlock
|
||||||
|
from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp
|
||||||
|
from utils.database import WorkflowFlags
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -39,11 +44,11 @@ class FTPConnection:
|
|||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
self.ftp.quit()
|
self.ftp.quit()
|
||||||
|
|
||||||
async def send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
|
async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool:
|
||||||
None
|
None
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
|
async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
|
||||||
"""
|
"""
|
||||||
Sends elaborated CSV data to a customer via FTP.
|
Sends elaborated CSV data to a customer via FTP.
|
||||||
|
|
||||||
@@ -79,7 +84,7 @@ async def send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, cs
|
|||||||
csv_bytes = csv_data.encode('utf-8')
|
csv_bytes = csv_data.encode('utf-8')
|
||||||
csv_buffer = BytesIO(csv_bytes)
|
csv_buffer = BytesIO(csv_bytes)
|
||||||
|
|
||||||
ftp_parms = parse_ftp_parms(send_ftp_info["ftp_parm"])
|
ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"])
|
||||||
use_tls = 'ssl_version' in ftp_parms
|
use_tls = 'ssl_version' in ftp_parms
|
||||||
passive = ftp_parms.get('passive', True)
|
passive = ftp_parms.get('passive', True)
|
||||||
port = ftp_parms.get('port', 21)
|
port = ftp_parms.get('port', 21)
|
||||||
@@ -110,7 +115,7 @@ async def send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, cs
|
|||||||
finally:
|
finally:
|
||||||
csv_buffer.close()
|
csv_buffer.close()
|
||||||
|
|
||||||
def parse_ftp_parms(ftp_parms: str) -> dict:
|
async def parse_ftp_parms(ftp_parms: str) -> dict:
|
||||||
"""
|
"""
|
||||||
Parses a string of FTP parameters into a dictionary.
|
Parses a string of FTP parameters into a dictionary.
|
||||||
|
|
||||||
@@ -140,3 +145,195 @@ def parse_ftp_parms(ftp_parms: str) -> dict:
|
|||||||
result[key] = value
|
result[key] = value
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: object):
|
||||||
|
"""
|
||||||
|
Elabora un singolo record del workflow in base alla fase specificata.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
record: Tupla contenente i dati del record
|
||||||
|
fase: Fase corrente del workflow
|
||||||
|
cfg: Configurazione
|
||||||
|
pool: Pool di connessioni al database
|
||||||
|
"""
|
||||||
|
# Estrazione e normalizzazione dei dati del record
|
||||||
|
id, unit_type, tool_type, unit_name, tool_name = [
|
||||||
|
x.lower().replace(" ", "_") if isinstance(x, str) else x
|
||||||
|
for x in record
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Recupero informazioni principali
|
||||||
|
tool_elab_info = await get_tool_info(fase, unit_name.upper(), tool_name.upper(), pool)
|
||||||
|
timestamp_matlab_elab = await get_elab_timestamp(id, pool)
|
||||||
|
|
||||||
|
# Verifica se il processing può essere eseguito
|
||||||
|
if not _should_process(tool_elab_info, timestamp_matlab_elab):
|
||||||
|
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: "
|
||||||
|
"elaborazione saltata - due date raggiunta.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Routing basato sulla fase
|
||||||
|
success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name,
|
||||||
|
timestamp_matlab_elab, pool)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
await update_status(cfg, id, fase, pool)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Errore durante elaborazione id {id} - {unit_name} - {tool_name}: {e}")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
await unlock(cfg, id, pool)
|
||||||
|
|
||||||
|
|
||||||
|
def _should_process(tool_elab_info, timestamp_matlab_elab):
|
||||||
|
"""Verifica se il record può essere processato basandosi sulla due date."""
|
||||||
|
duedate = tool_elab_info.get("duedate")
|
||||||
|
|
||||||
|
if not duedate or duedate in ('0000-00-00 00:00:00', ''):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Se timestamp_matlab_elab è None/null, usa il timestamp corrente
|
||||||
|
comparison_timestamp = timestamp_matlab_elab if timestamp_matlab_elab is not None else datetime.now()
|
||||||
|
|
||||||
|
return duedate > comparison_timestamp
|
||||||
|
|
||||||
|
|
||||||
|
async def _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name,
|
||||||
|
timestamp_matlab_elab, pool):
|
||||||
|
"""
|
||||||
|
Gestisce il routing delle operazioni in base alla fase del workflow.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True se l'operazione è riuscita, False altrimenti
|
||||||
|
"""
|
||||||
|
if fase == WorkflowFlags.SENT_ELAB_DATA:
|
||||||
|
return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name,
|
||||||
|
tool_name, timestamp_matlab_elab, pool)
|
||||||
|
|
||||||
|
elif fase == WorkflowFlags.SENT_RAW_DATA:
|
||||||
|
return await _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name,
|
||||||
|
tool_name, pool)
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.info(f"id {id} - {unit_name} - {tool_name}: nessuna azione da eseguire.")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name,
|
||||||
|
timestamp_matlab_elab, pool):
|
||||||
|
"""Gestisce la fase di invio dati elaborati."""
|
||||||
|
|
||||||
|
# FTP send per dati elaborati
|
||||||
|
if tool_elab_info.get('ftp_send'):
|
||||||
|
return await _send_elab_data_ftp(cfg, id, unit_name, tool_name,
|
||||||
|
timestamp_matlab_elab, pool)
|
||||||
|
|
||||||
|
# API send per dati elaborati
|
||||||
|
elif _should_send_elab_api(tool_elab_info):
|
||||||
|
return await _send_elab_data_api(cfg, id, unit_name, tool_name,
|
||||||
|
timestamp_matlab_elab, pool)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, pool):
|
||||||
|
"""Gestisce la fase di invio dati raw."""
|
||||||
|
|
||||||
|
# FTP send per dati raw
|
||||||
|
if tool_elab_info.get('ftp_send_raw'):
|
||||||
|
return await _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool)
|
||||||
|
|
||||||
|
# API send per dati raw
|
||||||
|
elif _should_send_raw_api(tool_elab_info):
|
||||||
|
return await _send_raw_data_api(cfg, id, unit_name, tool_name, pool)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _should_send_elab_api(tool_elab_info):
|
||||||
|
"""Verifica se i dati elaborati devono essere inviati via API."""
|
||||||
|
return (tool_elab_info.get('inoltro_api') and
|
||||||
|
tool_elab_info.get('api_send') and
|
||||||
|
tool_elab_info.get('inoltro_api_url', '').strip())
|
||||||
|
|
||||||
|
|
||||||
|
def _should_send_raw_api(tool_elab_info):
|
||||||
|
"""Verifica se i dati raw devono essere inviati via API."""
|
||||||
|
return (tool_elab_info.get('inoltro_api_raw') and
|
||||||
|
tool_elab_info.get('api_send_raw') and
|
||||||
|
tool_elab_info.get('inoltro_api_url_raw', '').strip())
|
||||||
|
|
||||||
|
|
||||||
|
async def _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
|
||||||
|
"""Invia dati elaborati via FTP."""
|
||||||
|
try:
|
||||||
|
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name,
|
||||||
|
timestamp_matlab_elab, pool)
|
||||||
|
if not elab_csv:
|
||||||
|
return False
|
||||||
|
|
||||||
|
print(elab_csv)
|
||||||
|
# if await send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
|
||||||
|
if True: # Placeholder per test
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP fallito.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Errore invio FTP elab data id {id}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
|
||||||
|
"""Invia dati elaborati via API."""
|
||||||
|
try:
|
||||||
|
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name,
|
||||||
|
timestamp_matlab_elab, pool)
|
||||||
|
if not elab_csv:
|
||||||
|
return False
|
||||||
|
|
||||||
|
print(elab_csv)
|
||||||
|
# if await send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
|
||||||
|
if True: # Placeholder per test
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error(f"id {id} - {unit_name} - {tool_name}: invio API fallito.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Errore invio API elab data id {id}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool):
|
||||||
|
"""Invia dati raw via FTP."""
|
||||||
|
try:
|
||||||
|
# if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
|
||||||
|
if True: # Placeholder per test
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Errore invio FTP raw data id {id}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def _send_raw_data_api(cfg, id, unit_name, tool_name, pool):
|
||||||
|
"""Invia dati raw via API."""
|
||||||
|
try:
|
||||||
|
# if await api_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
|
||||||
|
if True: # Placeholder per test
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error(f"id {id} - {unit_name} - {tool_name}: invio API raw fallito.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Errore invio API raw data id {id}: {e}")
|
||||||
|
return False
|
||||||
Reference in New Issue
Block a user