From c6d486d0bd6b5ae787361e4fa8826bb09c8b3788 Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 19 Aug 2025 12:36:27 +0200 Subject: [PATCH] refactory old script --- src/utils/csv/loaders.py | 53 +++++++++++++++- .../parsers/by_type/hirpinia_hirpinia.py | 60 ++++-------------- .../parsers/by_type/isi_csv_log_vulink.py | 61 ++++--------------- src/utils/parsers/by_type/sisgeo_health.py | 61 ++++--------------- src/utils/parsers/by_type/sisgeo_readings.py | 60 ++++-------------- src/utils/parsers/by_type/sorotecpini_co.py | 61 ++++--------------- .../stazionetotale_integrity_monitor.py | 61 ++++--------------- .../by_type/stazionetotale_messpunktepini.py | 57 +++-------------- 8 files changed, 126 insertions(+), 348 deletions(-) diff --git a/src/utils/csv/loaders.py b/src/utils/csv/loaders.py index 5caebed..5e23328 100644 --- a/src/utils/csv/loaders.py +++ b/src/utils/csv/loaders.py @@ -1,6 +1,11 @@ +import asyncio +import tempfile +import os + from utils.database.loader_action import load_data, update_status, unlock from utils.database import WorkflowFlags -from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix +from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix, get_data + import logging @@ -90,4 +95,48 @@ async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_s except Exception as e: # Rollback in caso di errore await conn.rollback() - raise e \ No newline at end of file + raise e + +async def main_old_script_loader(cfg: object, id: int, pool: object, script_name: str) -> None: + #async def main_loader(cfg: object, id: int, pool: object) -> None: + """ + This function retrieves CSV data, writes it to a temporary file, + executes an external Python script to process it, + and then updates the workflow status in the database. + Args: + cfg (object): The configuration object. + id (int): The ID of the CSV record to process. + pool (object): The database connection pool. + script_name (str): The name of the script to execute (without the .py extension). + """ + filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) + # Creare un file temporaneo + with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: + temp_file.write(ToolData) + temp_filename = temp_file.name + + try: + # Usa asyncio.subprocess per vero async + process = await asyncio.create_subprocess_exec( + 'python3', f'old_script/{script_name}.py', temp_filename, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + result_stdout = stdout.decode('utf-8') + result_stderr = stderr.decode('utf-8') + + finally: + # Pulire il file temporaneo + os.unlink(temp_filename) + + if process.returncode != 0: + logger.error(f"Errore nell'esecuzione del programma {script_name}.py: {result_stderr}") + raise Exception(f"Errore nel programma: {result_stderr}") + else: + logger.info(f"Programma {script_name}.py eseguito con successo.") + logger.debug(f"Stdout: {result_stdout}") + await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) + await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) + await unlock(cfg, id, pool) \ No newline at end of file diff --git a/src/utils/parsers/by_type/hirpinia_hirpinia.py b/src/utils/parsers/by_type/hirpinia_hirpinia.py index 52470b5..8939a40 100644 --- a/src/utils/parsers/by_type/hirpinia_hirpinia.py +++ b/src/utils/parsers/by_type/hirpinia_hirpinia.py @@ -1,55 +1,17 @@ -import asyncio -import tempfile -import os +from utils.csv.loaders import main_old_script_loader as hirpinia_main_loader -from utils.database import WorkflowFlags -from utils.database.loader_action import update_status, unlock -from utils.csv.data_preparation import get_data - -import logging - -logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: - """ - Loads and processes CSV data specific to the 'hirpinia_hirpinia' type. - This function retrieves CSV data, writes it to a temporary file, - executes an external Python script ('hirpiniaLoadScript.py') to process it, - and then updates the workflow status in the database. + """ + Carica ed elabora i dati CSV specifici per il tipo 'hirpinia_hirpinia'. + + Questa funzione è un wrapper per `main_old_script_loader` e passa il nome + dello script di elaborazione come "hirpiniaLoadScript". + Args: - cfg (object): The configuration object. - id (int): The ID of the CSV record to process. - pool (object): The database connection pool. + cfg (object): L'oggetto di configurazione. + id (int): L'ID del record CSV da elaborare. + pool (object): Il pool di connessioni al database. """ - filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - # Creare un file temporaneo - with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: - temp_file.write(ToolData) - temp_filename = temp_file.name - - try: - # Usa asyncio.subprocess per vero async - process = await asyncio.create_subprocess_exec( - 'python3', 'old_script/hirpiniaLoadScript.py', temp_filename, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - result_stdout = stdout.decode('utf-8') - result_stderr = stderr.decode('utf-8') - - finally: - # Pulire il file temporaneo - os.unlink(temp_filename) - - if process.returncode != 0: - logger.error(f"Errore nell'esecuzione del programma hirpiniaLoadScript.py: {result_stderr}") - raise Exception(f"Errore nel programma: {result_stderr}") - else: - logger.info("Programma hirpiniaLoadScript.py eseguito con successo.") - logger.debug(f"Stdout: {result_stdout}") - await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) - await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) - await unlock(cfg, id, pool) \ No newline at end of file + await hirpinia_main_loader(cfg, id, pool, "hirpiniaLoadScript") diff --git a/src/utils/parsers/by_type/isi_csv_log_vulink.py b/src/utils/parsers/by_type/isi_csv_log_vulink.py index 7b3099b..6771b83 100644 --- a/src/utils/parsers/by_type/isi_csv_log_vulink.py +++ b/src/utils/parsers/by_type/isi_csv_log_vulink.py @@ -1,56 +1,17 @@ -import asyncio -import tempfile -import os +from utils.csv.loaders import main_old_script_loader as vulink_main_loader -from utils.database import WorkflowFlags -from utils.database.loader_action import update_status, unlock -from utils.csv.data_preparation import get_data - -import logging - -logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: - """ - Loads and processes CSV data specific to the 'isi_csv_log_vulink' type. - This function retrieves CSV data, writes it to a temporary file, - executes an external Python script ('vulinkScript.py') to process it, - and then updates the workflow status in the database. + """ + Carica ed elabora i dati CSV specifici per il tipo 'isi_csv_log_vulink'. + + Questa funzione è un wrapper per `vulink_main_loader` e passa il nome + dello script di elaborazione come "vulinkScript". + Args: - cfg (object): The configuration object. - id (int): The ID of the CSV record to process. - pool (object): The database connection pool. + cfg (object): L'oggetto di configurazione. + id (int): L'ID del record CSV da elaborare. + pool (object): Il pool di connessioni al database. """ - - filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - # Creare un file temporaneo - with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: - temp_file.write(ToolData) - temp_filename = temp_file.name - - try: - # Usa asyncio.subprocess per vero async - process = await asyncio.create_subprocess_exec( - 'python3', 'old_script/vulinkScript.py', temp_filename, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - result_stdout = stdout.decode('utf-8') - result_stderr = stderr.decode('utf-8') - - finally: - # Pulire il file temporaneo - os.unlink(temp_filename) - - if process.returncode != 0: - logger.error(f"Errore nell'esecuzione del programma vulinkScript.py: {result_stderr}") - raise Exception(f"Errore nel programma: {result_stderr}") - else: - logger.info("Programma vulinkScript.py eseguito con successo.") - logger.debug(f"Stdout: {result_stdout}") - await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) - await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) - await unlock(cfg, id, pool) \ No newline at end of file + await vulink_main_loader(cfg, id, pool, "vulinkScript") diff --git a/src/utils/parsers/by_type/sisgeo_health.py b/src/utils/parsers/by_type/sisgeo_health.py index ba3c2ca..00491b1 100644 --- a/src/utils/parsers/by_type/sisgeo_health.py +++ b/src/utils/parsers/by_type/sisgeo_health.py @@ -1,56 +1,17 @@ -import asyncio -import tempfile -import os +from utils.csv.loaders import main_old_script_loader as sisgeo_main_loader -from utils.database import WorkflowFlags -from utils.database.loader_action import update_status, unlock -from utils.csv.data_preparation import get_data - -import logging - -logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: - """ - Loads and processes CSV data specific to the 'sisgeo_health' type. - This function retrieves CSV data, writes it to a temporary file, - executes an external Python script ('sisgeoLoadScript.py') to process it, - and then updates the workflow status in the database. + """ + Carica ed elabora i dati CSV specifici per il tipo 'sisgeo_health'. + + Questa funzione è un wrapper per `main_old_script_loader` e passa il nome + dello script di elaborazione come "sisgeoLoadScript". + Args: - cfg (object): The configuration object. - id (int): The ID of the CSV record to process. - pool (object): The database connection pool. + cfg (object): L'oggetto di configurazione. + id (int): L'ID del record CSV da elaborare. + pool (object): Il pool di connessioni al database. """ - - filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - # Creare un file temporaneo - with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: - temp_file.write(ToolData) - temp_filename = temp_file.name - - try: - # Usa asyncio.subprocess per vero async - process = await asyncio.create_subprocess_exec( - 'python3', 'old_script/sisgeoLoadScript.py', temp_filename, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - result_stdout = stdout.decode('utf-8') - result_stderr = stderr.decode('utf-8') - - finally: - # Pulire il file temporaneo - os.unlink(temp_filename) - - if process.returncode != 0: - logger.error(f"Errore nell'esecuzione del programma sisgeoLoadScript.py: {result_stderr}") - raise Exception(f"Errore nel programma: {result_stderr}") - else: - logger.info("Programma sisgeoLoadScript.py eseguito con successo.") - logger.debug(f"Stdout: {result_stdout}") - await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) - await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) - await unlock(cfg, id, pool) \ No newline at end of file + await sisgeo_main_loader(cfg, id, pool, "sisgeoLoadScript") diff --git a/src/utils/parsers/by_type/sisgeo_readings.py b/src/utils/parsers/by_type/sisgeo_readings.py index 6774a64..19d3b8f 100644 --- a/src/utils/parsers/by_type/sisgeo_readings.py +++ b/src/utils/parsers/by_type/sisgeo_readings.py @@ -1,55 +1,17 @@ -import asyncio -import tempfile -import os +from utils.csv.loaders import main_old_script_loader as sisgeo_main_loader -from utils.database import WorkflowFlags -from utils.database.loader_action import update_status, unlock -from utils.csv.data_preparation import get_data - -import logging - -logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: - """ - Loads and processes CSV data specific to the 'sisgeo_readings' type. - This function retrieves CSV data, writes it to a temporary file, - executes an external Python script ('sisgeoLoadScript.py') to process it, - and then updates the workflow status in the database. + """ + Carica ed elabora i dati CSV specifici per il tipo 'sisgeo_readings'. + + Questa funzione è un wrapper per `main_old_script_loader` e passa il nome + dello script di elaborazione come "sisgeoLoadScript". + Args: - cfg (object): The configuration object. - id (int): The ID of the CSV record to process. - pool (object): The database connection pool. + cfg (object): L'oggetto di configurazione. + id (int): L'ID del record CSV da elaborare. + pool (object): Il pool di connessioni al database. """ - filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - # Creare un file temporaneo - with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: - temp_file.write(ToolData) - temp_filename = temp_file.name - - try: - # Usa asyncio.subprocess per vero async - process = await asyncio.create_subprocess_exec( - 'python3', 'old_script/sisgeoLoadScript.py', temp_filename, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - result_stdout = stdout.decode('utf-8') - result_stderr = stderr.decode('utf-8') - - finally: - # Pulire il file temporaneo - os.unlink(temp_filename) - - if process.returncode != 0: - logger.error(f"Errore nell'esecuzione del programma sisgeoLoadScript.py: {result_stderr}") - raise Exception(f"Errore nel programma: {result_stderr}") - else: - logger.info("Programma sisgeoLoadScript.py eseguito con successo.") - logger.debug(f"Stdout: {result_stdout}") - await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) - await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) - await unlock(cfg, id, pool) \ No newline at end of file + await sisgeo_main_loader(cfg, id, pool, "sisgeoLoadScript") diff --git a/src/utils/parsers/by_type/sorotecpini_co.py b/src/utils/parsers/by_type/sorotecpini_co.py index 8c7d2a1..97517ea 100644 --- a/src/utils/parsers/by_type/sorotecpini_co.py +++ b/src/utils/parsers/by_type/sorotecpini_co.py @@ -1,56 +1,17 @@ -import asyncio -import tempfile -import os +from utils.csv.loaders import main_old_script_loader as sorotecPini_main_loader -from utils.database import WorkflowFlags -from utils.database.loader_action import update_status, unlock -from utils.csv.data_preparation import get_data - -import logging - -logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: - """ - Loads and processes CSV data specific to the 'sorotecpini_co' type. - This function retrieves CSV data, writes it to a temporary file, - executes an external Python script ('sorotecPini.py') to process it, - and then updates the workflow status in the database. + """ + Carica ed elabora i dati CSV specifici per il tipo 'sorotecpini_co'. + + Questa funzione è un wrapper per `sorotecPini_main_loader` e passa il nome + dello script di elaborazione come "sorotecPini". + Args: - cfg (object): The configuration object. - id (int): The ID of the CSV record to process. - pool (object): The database connection pool. + cfg (object): L'oggetto di configurazione. + id (int): L'ID del record CSV da elaborare. + pool (object): Il pool di connessioni al database. """ - - filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - # Creare un file temporaneo - with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: - temp_file.write(ToolData) - temp_filename = temp_file.name - - try: - # Usa asyncio.subprocess per vero async - process = await asyncio.create_subprocess_exec( - 'python3', 'old_script/sorotecPini.py', temp_filename, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - result_stdout = stdout.decode('utf-8') - result_stderr = stderr.decode('utf-8') - - finally: - # Pulire il file temporaneo - os.unlink(temp_filename) - - if process.returncode != 0: - logger.error(f"Errore nell'esecuzione del programma sorotecPini.py: {result_stderr}") - raise Exception(f"Errore nel programma: {result_stderr}") - else: - logger.info("Programma sorotecPini.py eseguito con successo.") - logger.debug(f"Stdout: {result_stdout}") - await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) - await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) - await unlock(cfg, id, pool) \ No newline at end of file + await sorotecPini_main_loader(cfg, id, pool, "sorotecPini") diff --git a/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py b/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py index a473537..49781ca 100644 --- a/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py +++ b/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py @@ -1,56 +1,17 @@ -import asyncio -import tempfile -import os +from utils.csv.loaders import main_old_script_loader as ts_pini_main_loader -from utils.database import WorkflowFlags -from utils.database.loader_action import update_status, unlock -from utils.csv.data_preparation import get_data - -import logging - -logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: - """ - Loads and processes CSV data specific to the 'stazionetotale_integrity_monitor' type. - This function retrieves CSV data, writes it to a temporary file, - executes an external Python script ('TS_PiniScript.py') to process it, - and then updates the workflow status in the database. + """ + Carica ed elabora i dati CSV specifici per il tipo 'stazionetotale_integrity_monitor'. + + Questa funzione è un wrapper per `main_old_script_loader` e passa il nome + dello script di elaborazione come "TS_PiniScript". + Args: - cfg (object): The configuration object. - id (int): The ID of the CSV record to process. - pool (object): The database connection pool. + cfg (object): L'oggetto di configurazione. + id (int): L'ID del record CSV da elaborare. + pool (object): Il pool di connessioni al database. """ - - filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - # Creare un file temporaneo - with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: - temp_file.write(ToolData) - temp_filename = temp_file.name - - try: - # Usa asyncio.subprocess per vero async - process = await asyncio.create_subprocess_exec( - 'python3', 'old_script/TS_PiniScript.py', temp_filename, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - result_stdout = stdout.decode('utf-8') - result_stderr = stderr.decode('utf-8') - - finally: - # Pulire il file temporaneo - os.unlink(temp_filename) - - if process.returncode != 0: - logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result_stderr}") - raise Exception(f"Errore nel programma: {result_stderr}") - else: - logger.info("Programma TS_PiniScript.py eseguito con successo.") - logger.debug(f"Stdout: {result_stdout}") - await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) - await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) - await unlock(cfg, id, pool) \ No newline at end of file + await ts_pini_main_loader(cfg, id, pool, "TS_PiniScript") diff --git a/src/utils/parsers/by_type/stazionetotale_messpunktepini.py b/src/utils/parsers/by_type/stazionetotale_messpunktepini.py index 83ced58..ddead36 100644 --- a/src/utils/parsers/by_type/stazionetotale_messpunktepini.py +++ b/src/utils/parsers/by_type/stazionetotale_messpunktepini.py @@ -1,56 +1,17 @@ -import asyncio -import tempfile -import os +from utils.csv.loaders import main_old_script_loader as ts_pini_main_loader -from utils.database import WorkflowFlags -from utils.database.loader_action import update_status, unlock -from utils.csv.data_preparation import get_data - -import logging - -logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: - """ - Loads and processes CSV data specific to the 'stazionetotale_messpunktepini' type. + Carica ed elabora i dati CSV specifici per il tipo 'stazionetotale_messpunktepini'. + + Questa funzione è un wrapper per `ts_pini_main_loader` e passa il nome + dello script di elaborazione come "TS_PiniScript". - This function retrieves CSV data, writes it to a temporary file, - executes an external Python script ('TS_PiniScript.py') to process it, - and then updates the workflow status in the database. Args: - cfg (object): The configuration object. - id (int): The ID of the CSV record to process. - pool (object): The database connection pool. + cfg (object): L'oggetto di configurazione. + id (int): L'ID del record CSV da elaborare. + pool (object): Il pool di connessioni al database. """ - filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) - # Creare un file temporaneo - with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: - temp_file.write(ToolData) - temp_filename = temp_file.name - try: - # Usa asyncio.subprocess per vero async - process = await asyncio.create_subprocess_exec( - 'python3', 'old_script/TS_PiniScript.py', temp_filename, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - - result_stdout = stdout.decode('utf-8') - result_stderr = stderr.decode('utf-8') - - finally: - # Pulire il file temporaneo - os.unlink(temp_filename) - - if process.returncode != 0: - logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result_stderr}") - raise Exception(f"Errore nel programma: {result_stderr}") - else: - logger.info("Programma TS_PiniScript.py eseguito con successo.") - logger.debug(f"Stdout: {result_stdout}") - await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) - await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) - await unlock(cfg, id, pool) \ No newline at end of file + await ts_pini_main_loader(cfg, id, pool, "TS_PiniScript")