add funcs docs

This commit is contained in:
2025-08-19 12:01:15 +02:00
parent 2b976d06b3
commit b79f07b407
17 changed files with 285 additions and 41 deletions

View File

@@ -24,6 +24,17 @@ ELAB_PROCESSING_DELAY = 0.2
NO_RECORD_SLEEP = 30
async def worker(worker_id: int, cfg: object, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'invio dei dati.
Il worker preleva un record dal database che indica dati pronti per
l'invio (sia raw che elaborati), li processa e attende prima di
iniziare un nuovo ciclo.
Args:
worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database.
"""
# Imposta il context per questo worker
worker_context.set(f"W{worker_id:02d}")

View File

@@ -5,6 +5,10 @@ from configparser import ConfigParser
class Config:
def __init__(self):
"""
Initializes the Config class by reading configuration files.
It loads settings from 'ftp.ini' and 'db.ini' for FTP server, CSV, logging, and database.
"""
c = ConfigParser()
c.read(["../env/ftp.ini", "../env/db.ini"])
@@ -71,4 +75,3 @@ class Config:
for item in c.get("ts_pini", "path_match").split('|')
for key, value in [item.split(':', 1)]
}

View File

@@ -5,6 +5,10 @@ from configparser import ConfigParser
class Config:
def __init__(self):
"""
Initializes the Config class by reading configuration files.
It loads settings from 'load.ini' and 'db.ini' for logging, worker, database, and table configurations.
"""
c = ConfigParser()
c.read(["../env/load.ini", "../env/db.ini"])
@@ -29,4 +33,3 @@ class Config:
self.dbrawdata = c.get("tables", "rawTableName")
self.dbrawdata = c.get("tables", "rawTableName")
self.dbnodes = c.get("tables", "nodesTableName")

View File

@@ -5,6 +5,10 @@ from configparser import ConfigParser
class Config:
def __init__(self):
"""
Initializes the Config class by reading configuration files.
It loads settings from 'elab.ini' and 'db.ini' for logging, worker, database, table, tool, and Matlab configurations.
"""
c = ConfigParser()
c.read(["../env/elab.ini", "../env/db.ini"])

View File

@@ -5,6 +5,10 @@ from configparser import ConfigParser
class Config:
def __init__(self):
"""
Initializes the Config class by reading configuration files.
It loads settings from 'send.ini' and 'db.ini' for logging, worker, database, and table configurations.
"""
c = ConfigParser()
c.read(["../env/send.ini", "../env/db.ini"])

View File

@@ -4,6 +4,9 @@
from configparser import ConfigParser
class Config:
"""
Handles configuration loading for database settings to load ftp users.
"""
def __init__(self):
c = ConfigParser()
@@ -16,5 +19,3 @@ class Config:
self.dbpass = c.get("db", "password")
self.dbname = c.get("db", "dbName")
self.max_retries = c.getint("db", "maxRetries")

View File

@@ -10,11 +10,16 @@ from utils.csv.parser import extract_value
logger = logging.getLogger(__name__)
def on_file_received(self: object, file: str) -> None:
"""Handles the event when a file is successfully received.
"""
Processes a received file, extracts relevant information, and inserts it into the database.
If the file is empty, it is removed. Otherwise, it extracts unit and tool
information from the filename and the first few lines of the CSV, handles
aliases, and then inserts the data into the configured database table.
Args:
file: The path to the received file.
"""
file (str): The path to the received file."""
if not os.stat(file).st_size:
os.remove(file)
logger.info(f'File {file} is empty: removed.')

View File

@@ -193,20 +193,16 @@ async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: obj
await unlock(cfg, id, pool)
def _should_process_old(tool_elab_info, timestamp_matlab_elab):
"""Verifica se il record può essere processato basandosi sulla due date."""
duedate = tool_elab_info.get("duedate")
def _should_process(tool_elab_info: dict, timestamp_matlab_elab: datetime) -> bool:
"""
Determines if a record should be processed based on its due date.
if not duedate or duedate in ('0000-00-00 00:00:00', ''):
return True
Args:
tool_elab_info (dict): A dictionary containing information about the tool and its due date.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
# Se timestamp_matlab_elab è None/null, usa il timestamp corrente
comparison_timestamp = timestamp_matlab_elab if timestamp_matlab_elab is not None else datetime.now()
return duedate > comparison_timestamp
def _should_process(tool_elab_info, timestamp_matlab_elab):
Returns:
bool: True if the record should be processed, False otherwise."""
"""Verifica se il record può essere processato basandosi sulla due date."""
duedate = tool_elab_info.get("duedate")
@@ -229,13 +225,26 @@ def _should_process(tool_elab_info, timestamp_matlab_elab):
async def _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool):
async def _route_by_phase(fase: int, tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str,
timestamp_matlab_elab: datetime, pool: object) -> bool:
"""
Gestisce il routing delle operazioni in base alla fase del workflow.
Routes the processing of a workflow record based on the current phase.
This function acts as a dispatcher, calling the appropriate handler function
for sending elaborated data or raw data based on the `fase` (phase) parameter.
Args:
fase (int): The current phase of the workflow (e.g., WorkflowFlags.SENT_ELAB_DATA, WorkflowFlags.SENT_RAW_DATA).
tool_elab_info (dict): A dictionary containing information about the tool and its elaboration status.
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
pool (object): The database connection pool.
Returns:
bool: True se l'operazione è riuscita, False altrimenti
bool: True if the data sending operation was successful or no action was needed, False otherwise.
"""
if fase == WorkflowFlags.SENT_ELAB_DATA:
return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name,
@@ -250,10 +259,27 @@ async def _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name,
return True
async def _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool):
"""Gestisce la fase di invio dati elaborati."""
async def _handle_elab_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str,
timestamp_matlab_elab: datetime, pool: object) -> bool:
"""
Handles the phase of sending elaborated data.
This function checks if elaborated data needs to be sent via FTP or API
based on the `tool_elab_info` and calls the appropriate sending function.
Args:
tool_elab_info (dict): A dictionary containing information about the tool and its elaboration status,
including flags for FTP and API sending.
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
pool (object): The database connection pool.
Returns:
bool: True if the data sending operation was successful or no action was needed, False otherwise.
"""
# FTP send per dati elaborati
if tool_elab_info.get('ftp_send'):
return await _send_elab_data_ftp(cfg, id, unit_name, tool_name,
@@ -267,8 +293,26 @@ async def _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name,
return True
async def _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, pool):
"""Gestisce la fase di invio dati raw."""
async def _handle_raw_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool:
"""
Handles the phase of sending raw data.
This function checks if raw data needs to be sent via FTP or API
based on the `tool_elab_info` and calls the appropriate sending function.
Args:
tool_elab_info (dict): A dictionary containing information about the tool and its raw data sending status,
including flags for FTP and API sending.
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
pool (object): The database connection pool.
Returns:
bool: True if the data sending operation was successful or no action was needed, False otherwise.
"""
# FTP send per dati raw
if tool_elab_info.get('ftp_send_raw'):
@@ -281,22 +325,38 @@ async def _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name,
return True
def _should_send_elab_api(tool_elab_info):
def _should_send_elab_api(tool_elab_info: dict) -> bool:
"""Verifica se i dati elaborati devono essere inviati via API."""
return (tool_elab_info.get('inoltro_api') and
tool_elab_info.get('api_send') and
tool_elab_info.get('inoltro_api_url', '').strip())
def _should_send_raw_api(tool_elab_info):
def _should_send_raw_api(tool_elab_info: dict) -> bool:
"""Verifica se i dati raw devono essere inviati via API."""
return (tool_elab_info.get('inoltro_api_raw') and
tool_elab_info.get('api_send_raw') and
tool_elab_info.get('inoltro_api_url_raw', '').strip())
async def _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
"""Invia dati elaborati via FTP."""
async def _send_elab_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool:
"""
Sends elaborated data via FTP.
This function retrieves the elaborated CSV data and attempts to send it
to the customer via FTP. It logs success or failure.
Args:
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
pool (object): The database connection pool.
Returns:
bool: True if the FTP sending was successful, False otherwise.
"""
try:
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
@@ -316,8 +376,24 @@ async def _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_el
return False
async def _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
"""Invia dati elaborati via API."""
async def _send_elab_data_api(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool:
"""
Sends elaborated data via API.
This function retrieves the elaborated CSV data and attempts to send it
to the customer via an API. It logs success or failure.
Args:
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
pool (object): The database connection pool.
Returns:
bool: True if the API sending was successful, False otherwise.
"""
try:
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
@@ -337,8 +413,23 @@ async def _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_el
return False
async def _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool):
"""Invia dati raw via FTP."""
async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool:
"""
Sends raw data via FTP.
This function attempts to send raw CSV data to the customer via FTP.
It logs success or failure.
Args:
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
pool (object): The database connection pool.
Returns:
bool: True if the FTP sending was successful, False otherwise.
"""
try:
# if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
if True: # Placeholder per test
@@ -352,8 +443,23 @@ async def _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool):
return False
async def _send_raw_data_api(cfg, id, unit_name, tool_name, pool):
"""Invia dati raw via API."""
async def _send_raw_data_api(cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool:
"""
Sends raw data via API.
This function attempts to send raw CSV data to the customer via an API.
It logs success or failure.
Args:
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
pool (object): The database connection pool.
Returns:
bool: True if the API sending was successful, False otherwise.
"""
try:
# if await api_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
if True: # Placeholder per test

View File

@@ -38,8 +38,24 @@ async def main_loader(cfg: object, id: int, pool: object, action: str) -> None:
logger.warning(f"Action '{action}' non riconosciuta.")
async def get_next_csv_atomic(pool, table_name, status, next_status):
"""Preleva atomicamente il prossimo CSV da elaborare"""
async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_status: int) -> tuple:
"""
Retrieves the next available CSV record for processing in an atomic manner.
This function acquires a database connection from the pool, begins a transaction,
and attempts to select and lock a single record from the specified table that
matches the given status and has not yet reached the next_status. It uses
`SELECT FOR UPDATE SKIP LOCKED` to ensure atomicity and prevent other workers
from processing the same record concurrently.
Args:
pool (object): The database connection pool.
table_name (str): The name of the table to query.
status (int): The current status flag that the record must have.
next_status (int): The status flag that the record should NOT have yet.
Returns:
tuple: The next available received record if found, otherwise None.
"""
async with pool.acquire() as conn:
# IMPORTANTE: Disabilita autocommit per questa transazione
await conn.begin()

View File

@@ -1,7 +1,23 @@
import re
def extract_value(patterns: list, primary_source: str, secondary_source: str = None, default='Not Defined') -> str:
def extract_value(patterns: list, primary_source: str, secondary_source: str = None, default: str='Not Defined') -> str:
"""
Extracts a value from a given source (or sources) based on a list of regex patterns.
It iterates through the provided patterns and attempts to find a match in the
primary source first, then in the secondary source if provided. The first
successful match is returned. If no match is found after checking all sources
with all patterns, a default value is returned.
Args:
patterns (list): A list of regular expression strings to search for.
primary_source (str): The main string to search within.
secondary_source (str, optional): An additional string to search within if no match is found in the primary source. Defaults to None.
default (str, optional): The value to return if no match is found. Defaults to 'Not Defined'.
Returns:
str: The first matched value, or the default value if no match is found.
"""
for source in [source for source in (primary_source, secondary_source) if source is not None]:
for pattern in patterns:
matches = re.findall(pattern, source, re.IGNORECASE)

View File

@@ -11,7 +11,17 @@ import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'hirpinia_hirpinia' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('hirpiniaLoadScript.py') to process it,
and then updates the workflow status in the database.
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:

View File

@@ -11,6 +11,17 @@ import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'isi_csv_log_vulink' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('vulinkScript.py') to process it,
and then updates the workflow status in the database.
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo

View File

@@ -11,6 +11,17 @@ import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'sisgeo_health' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('sisgeoLoadScript.py') to process it,
and then updates the workflow status in the database.
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo

View File

@@ -11,7 +11,17 @@ import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'sisgeo_readings' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('sisgeoLoadScript.py') to process it,
and then updates the workflow status in the database.
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:

View File

@@ -11,6 +11,17 @@ import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'sorotecpini_co' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('sorotecPini.py') to process it,
and then updates the workflow status in the database.
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo

View File

@@ -11,6 +11,17 @@ import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'stazionetotale_integrity_monitor' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('TS_PiniScript.py') to process it,
and then updates the workflow status in the database.
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo

View File

@@ -12,6 +12,17 @@ logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'stazionetotale_messpunktepini' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('TS_PiniScript.py') to process it,
and then updates the workflow status in the database.
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: