feat: implement ftp_send_raw_csv_to_customer function

Complete the FTP async migration by implementing the missing
ftp_send_raw_csv_to_customer() function for sending raw CSV data.

## Changes

### Implementation
- Implemented ftp_send_raw_csv_to_customer():
  * Retrieves raw CSV from database (received.tool_data column)
  * Queries FTP configuration from units table
  * Supports ftp_filename_raw and ftp_target_raw columns
  * Fallback to standard ftp_filename/ftp_target if raw not configured
  * Full async implementation with AsyncFTPConnection

- Updated _send_raw_data_ftp():
  * Removed placeholder (if True)
  * Now calls actual ftp_send_raw_csv_to_customer()
  * Enhanced error handling and logging

### Features
- Dual query approach:
  1. Get raw CSV data from received table by id
  2. Get FTP config from units table by unit name
- Smart fallback for filename/target directory
- Proper error handling for missing data/config
- Detailed logging for debugging
- Supports both string and bytes data types

### Database Schema Support
Expected columns in units table:
- ftp_filename_raw (optional, fallback to ftp_filename)
- ftp_target_raw (optional, fallback to ftp_target)
- ftp_addrs, ftp_user, ftp_passwd, ftp_parm (required)

Expected columns in received table:
- tool_data (TEXT/BLOB containing raw CSV data)

## Impact

- Completes raw data FTP workflow
- Enables automatic sending of unprocessed CSV files to customers
- Maintains consistency with elaborated data sending flow
- Full async implementation (no blocking I/O)

## Testing

Manual testing required with:
- Database with raw CSV data in received.tool_data
- Unit configuration with FTP settings
- Accessible FTP/FTPS server

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-11 21:45:20 +02:00
parent 541561fb0d
commit 0f91cf1fd4

View File

@@ -106,7 +106,119 @@ class AsyncFTPConnection:
async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool:
"""
Sends raw CSV data to a customer via FTP (async implementation).
Retrieves raw CSV data from the database (received.tool_data column),
then sends it to the customer via FTP using the unit's FTP configuration.
Args:
cfg (dict): Configuration dictionary.
id (int): The ID of the record being processed (used for logging and DB query).
unit (str): The name of the unit associated with the data.
tool (str): The name of the tool associated with the data.
pool (object): The database connection pool.
Returns:
bool: True if the CSV data was sent successfully, False otherwise.
"""
# Query per ottenere il CSV raw dal database
raw_data_query = f"""
SELECT tool_data
FROM {cfg.dbname}.{cfg.dbrectable}
WHERE id = %s
"""
# Query per ottenere le info FTP
ftp_info_query = """
SELECT ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename_raw, ftp_target_raw, duedate
FROM units
WHERE name = %s
"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
try:
# 1. Recupera il CSV raw dal database
await cur.execute(raw_data_query, (id,))
raw_data_result = await cur.fetchone()
if not raw_data_result or not raw_data_result.get("tool_data"):
logger.error(f"id {id} - {unit} - {tool}: nessun dato raw (tool_data) trovato nel database")
return False
csv_raw_data = raw_data_result["tool_data"]
logger.info(f"id {id} - {unit} - {tool}: estratto CSV raw dal database ({len(csv_raw_data)} bytes)")
# 2. Recupera configurazione FTP
await cur.execute(ftp_info_query, (unit,))
send_ftp_info = await cur.fetchone()
if not send_ftp_info:
logger.error(f"id {id} - {unit} - {tool}: nessuna configurazione FTP trovata per unit")
return False
# Verifica che ci siano configurazioni per raw data
if not send_ftp_info.get("ftp_filename_raw"):
logger.warning(f"id {id} - {unit} - {tool}: ftp_filename_raw non configurato. Uso ftp_filename standard se disponibile")
# Fallback al filename standard se raw non è configurato
if not send_ftp_info.get("ftp_filename"):
logger.error(f"id {id} - {unit} - {tool}: nessun filename FTP configurato")
return False
ftp_filename = send_ftp_info["ftp_filename"]
else:
ftp_filename = send_ftp_info["ftp_filename_raw"]
# Target directory (con fallback)
ftp_target = send_ftp_info.get("ftp_target_raw") or send_ftp_info.get("ftp_target") or "/"
logger.info(f"id {id} - {unit} - {tool}: configurazione FTP raw estratta")
except Exception as e:
logger.error(f"id {id} - {unit} - {tool} - errore nella query per invio ftp raw: {e}")
return False
try:
# 3. Converti in bytes se necessario
if isinstance(csv_raw_data, str):
csv_bytes = csv_raw_data.encode("utf-8")
else:
csv_bytes = csv_raw_data
# 4. Parse parametri FTP
ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"] or "")
use_tls = "ssl_version" in ftp_parms
passive = ftp_parms.get("passive", True)
port = ftp_parms.get("port", 21)
timeout = ftp_parms.get("timeout", 30.0)
# 5. Async FTP connection e upload
async with AsyncFTPConnection(
host=send_ftp_info["ftp_addrs"],
port=port,
use_tls=use_tls,
user=send_ftp_info["ftp_user"],
passwd=send_ftp_info["ftp_passwd"],
passive=passive,
timeout=timeout,
) as ftp:
# Change directory se necessario
if ftp_target and ftp_target != "/":
await ftp.change_directory(ftp_target)
# Upload raw data
success = await ftp.upload(csv_bytes, ftp_filename)
if success:
logger.info(f"id {id} - {unit} - {tool}: File raw {ftp_filename} inviato con successo via FTP")
return True
else:
logger.error(f"id {id} - {unit} - {tool}: Errore durante l'upload FTP raw")
return False
except Exception as e:
logger.error(f"id {id} - {unit} - {tool} - Errore FTP raw: {e}", exc_info=True)
return False
async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
@@ -485,8 +597,9 @@ async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str,
"""
Sends raw data via FTP.
This function attempts to send raw CSV data to the customer via FTP.
It logs success or failure.
This function attempts to send raw CSV data to the customer via FTP
using async operations. It retrieves the raw data from the database
and uploads it to the configured FTP server.
Args:
cfg (dict): The configuration dictionary.
@@ -499,15 +612,16 @@ async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str,
bool: True if the FTP sending was successful, False otherwise.
"""
try:
# if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
if True: # Placeholder per test
# Send raw CSV via async FTP
if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
logger.info(f"id {id} - {unit_name} - {tool_name}: invio FTP raw completato con successo")
return True
else:
logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito.")
logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito")
return False
except Exception as e:
logger.error(f"Errore invio FTP raw data id {id}: {e}")
logger.error(f"Errore invio FTP raw data id {id}: {e}", exc_info=True)
return False