add send ftp

This commit is contained in:
2025-07-18 15:26:41 +02:00
parent f003ba68ed
commit c23027918c
8 changed files with 182 additions and 20 deletions

View File

@@ -10,8 +10,10 @@ CREATE TABLE `received` (
`tool_data` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`locked` int DEFAULT '0',
`status` int DEFAULT '0',
`matlab_timestamp` timestamp NULL DEFAULT NULL,
`inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`loaded_at` datetime DEFAULT NULL,
`elaborated_at` timestamp NULL DEFAULT NULL,
`sent_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=694 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

View File

@@ -3,13 +3,18 @@
# Import necessary libraries
import logging
import asyncio
from datetime import datetime
# Import custom modules for configuration and database connection
from utils.config import loader_matlab_elab as setting
from utils.database import DATA_LOADED
from utils.database import DATA_LOADED, DATA_ELABORATED, DATA_SENT
from utils.database.matlab_query import get_matlab_command
from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.database.loader_action import update_status, unlock
from utils.database.elab_query import get_data_as_csv
#from utils.ftp.elab_send import send_csv_to_customer
# Initialize the logger for this module
logger = logging.getLogger()
@@ -45,26 +50,44 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
if record:
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
matlab_info = await get_matlab_command(cfg, tool_name, unit_name, pool)
if matlab_info:
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{matlab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}"
tool_elab_info = await get_matlab_command(cfg, tool_name, unit_name, pool)
if tool_elab_info:
if tool_elab_info['statustools'].lower() in cfg.elab_status:
logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ")
# matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt'
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}"
timestamp_matlab_elab = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
proc = await asyncio.create_subprocess_shell(
matlab_cmd,
cwd=cfg.matlab_func_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
proc = await asyncio.create_subprocess_shell(
matlab_cmd,
cwd=cfg.matlab_func_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f:
f.write(stderr.decode().strip())
else:
logger.info(stdout.decode().strip())
await update_status(cfg, id, DATA_ELABORATED, pool)
if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab:
if tool_elab_info['ftp_send']:
if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
print(elab_csv)
#if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
#await update_status(cfg, id, DATA_SENT, pool)
await update_status(cfg, id, DATA_SENT, pool)
else:
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.")
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
logger.info(stdout.decode().strip())
await asyncio.sleep(ELAB_PROCESSING_DELAY)
await unlock(cfg, id, pool)
await asyncio.sleep(ELAB_PROCESSING_DELAY)
else:
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['statustools']}: MatLab calc by-passed.")
else:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)

4
env/elab.ini vendored
View File

@@ -4,6 +4,10 @@
[threads]
max_num = 10
[tool]
# stati in minuscolo
elab_status = active|manual update
[matlab]
#runtime = /usr/local/MATLAB/MATLAB_Runtime/v93
#func_path = /usr/local/matlab_func/

View File

@@ -30,6 +30,9 @@ class Config:
self.dbrawdata = c.get("tables", "rawTableName")
self.dbnodes = c.get("tables", "nodesTableName")
# Tool
self.elab_status = [part for part in c.get("tool", "elab_status").split('|')]
# Matlab
self.matlab_runtime = c.get("matlab", "runtime")
self.matlab_func_path = c.get("matlab", "func_path")

View File

@@ -1,3 +1,4 @@
CSV_RECEIVED = 0
DATA_LOADED = 1
DATA_ELABORATED = 2
DATA_SENT = 3

View File

@@ -0,0 +1,60 @@
import csv
from io import StringIO
import logging
logger = logging.getLogger(__name__)
async def get_data_as_csv(cfg: dict, id_recv: int, unit: str, tool: str, matlab_timestamp: float, pool: object) -> str:
"""
Retrieves elaborated data from the database and formats it as a CSV string.
The query selects data from the `ElabDataView` based on `UnitName`, `ToolNameID`,
and a `updated_at` timestamp, then orders it. The first row of the CSV will be
the column headers.
Args:
cfg (dict): Configuration dictionary (not directly used in the query but passed for consistency).
id (int): The ID of the record being processed (used for logging).
pool (object): The database connection pool.
unit (str): The name of the unit to filter the data.
tool (str): The ID of the tool to filter the data.
matlab_timestamp (float): A timestamp used to filter data updated after this time.
Returns:
str: A string containing the elaborated data in CSV format.
"""
query = """
select * from (
select 'ToolNameID', 'EventDate', 'EventTime', 'NodeNum', 'NodeType', 'NodeDepth',
'XShift', 'YShift', 'ZShift' , 'X', 'Y', 'Z', 'HShift', 'HShiftDir', 'HShift_local',
'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr'
union all
select ToolNameID, EventDate, EventTime, NodeNum, NodeType, NodeDepth,
XShift, YShift, ZShift , X, Y, Z, HShift, HShiftDir, HShift_local,
speed, speed_local, acceleration, acceleration_local, T_node, water_level, pressure, load_value, AlfaX, AlfaY, calcerr
from ElabDataView
where UnitName = %s and ToolNameID = %s and updated_at > %s
order by ToolNameID DESC, concat(EventDate, EventTime), convert(`NodeNum`, UNSIGNED INTEGER) DESC
) resulting_set
"""
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(query, (unit, tool, matlab_timestamp))
results = await cur.fetchall()
logger.info(f"id {id_recv} - {unit} - {tool}: estratti i dati per invio CSV")
logger.info(f"Numero di righe estratte: {len(results)}")
# Creare CSV in memoria
output = StringIO()
writer = csv.writer(output, delimiter=";", lineterminator="\n", quoting=csv.QUOTE_MINIMAL)
for row in results:
writer.writerow(row)
csv_data = output.getvalue()
output.close()
return csv_data
except Exception as e:
logging.error(f"id {id_recv} - {unit} - {tool} - errore nel query creazione csv: {e}")
return None

View File

@@ -4,7 +4,7 @@ import asyncio
logger = logging.getLogger(__name__)
timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at"]
timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at", "sent_at"]
async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
@@ -109,7 +109,7 @@ async def update_status(cfg: object, id: int, status: int, pool: object) -> None
f"update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}"
)
await conn.commit()
logging.info("Status updated.")
logging.info(f"Status updated id {id}.")
except Exception as e:
await conn.rollback()
logging.error(f"Error: {e}")

69
utils/ftp/elab_send.py Normal file
View File

@@ -0,0 +1,69 @@
import ftplib
from io import BytesIO
import logging
import aiomysql
logger = logging.getLogger(__name__)
async def send_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
"""
Sends elaborated CSV data to a customer via FTP.
Retrieves FTP connection details from the database based on the unit name,
then establishes an FTP connection and uploads the CSV data.
Args:
cfg (dict): Configuration dictionary (not directly used in this function but passed for consistency).
id (int): The ID of the record being processed (used for logging).
unit (str): The name of the unit associated with the data.
tool (str): The name of the tool associated with the data.
csv_data (str): The CSV data as a string to be sent.
pool (object): The database connection pool.
Returns:
bool: True if the CSV data was sent successfully, False otherwise.
"""
query = """
select ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename, ftp_target, duedate from units
where name = '%s'";'
"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
try:
await cur.execute(query, (unit,))
send_ftp_info = await cur.fetchone()
logger.info(f"id {id} - {unit} - {tool}: estratti i dati per invio via ftp")
except Exception as e:
logging.error(f"id {id} - {unit} - {tool} - errore nel query per invio ftp: {e}")
try:
# Converti in bytes
csv_bytes = csv_data.encode('utf-8')
csv_buffer = BytesIO(csv_bytes)
# Connessione FTP
with ftplib.FTP(send_ftp_info["ftp_addrs"]) as ftp:
ftp.login(send_ftp_info["ftp_user"], send_ftp_info["ftp_passwd"])
# Cambia directory
if send_ftp_info["ftp_target"] != "/":
ftp.cwd(send_ftp_info["ftp_target"])
# Invia il file
result = ftp.storbinary(f'STOR {send_ftp_info["ftp_filename"]}', csv_buffer)
if result.startswith('226'):
logging.info(f"File {send_ftp_info["ftp_filename"]} inviato con successo")
return True
else:
logging.error(f"Errore nell'invio: {result}")
return False
except ftplib.all_errors as e:
logging.error(f"Errore FTP: {e}")
return False
except Exception as e:
logging.error(f"Errore generico: {e}")
return False
finally:
csv_buffer.close()