refactory old script

This commit is contained in:
2025-08-19 12:36:27 +02:00
parent b79f07b407
commit c6d486d0bd
8 changed files with 126 additions and 348 deletions

View File

@@ -1,6 +1,11 @@
import asyncio
import tempfile
import os
from utils.database.loader_action import load_data, update_status, unlock
from utils.database import WorkflowFlags
from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix
from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix, get_data
import logging
@@ -90,4 +95,48 @@ async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_s
except Exception as e:
# Rollback in caso di errore
await conn.rollback()
raise e
raise e
async def main_old_script_loader(cfg: object, id: int, pool: object, script_name: str) -> None:
#async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script to process it,
and then updates the workflow status in the database.
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
script_name (str): The name of the script to execute (without the .py extension).
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', f'old_script/{script_name}.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma {script_name}.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info(f"Programma {script_name}.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)

View File

@@ -1,55 +1,17 @@
import asyncio
import tempfile
import os
from utils.csv.loaders import main_old_script_loader as hirpinia_main_loader
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'hirpinia_hirpinia' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('hirpiniaLoadScript.py') to process it,
and then updates the workflow status in the database.
"""
Carica ed elabora i dati CSV specifici per il tipo 'hirpinia_hirpinia'.
Questa funzione è un wrapper per `main_old_script_loader` e passa il nome
dello script di elaborazione come "hirpiniaLoadScript".
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/hirpiniaLoadScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma hirpiniaLoadScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma hirpiniaLoadScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await hirpinia_main_loader(cfg, id, pool, "hirpiniaLoadScript")

View File

@@ -1,56 +1,17 @@
import asyncio
import tempfile
import os
from utils.csv.loaders import main_old_script_loader as vulink_main_loader
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'isi_csv_log_vulink' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('vulinkScript.py') to process it,
and then updates the workflow status in the database.
"""
Carica ed elabora i dati CSV specifici per il tipo 'isi_csv_log_vulink'.
Questa funzione è un wrapper per `vulink_main_loader` e passa il nome
dello script di elaborazione come "vulinkScript".
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/vulinkScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma vulinkScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma vulinkScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await vulink_main_loader(cfg, id, pool, "vulinkScript")

View File

@@ -1,56 +1,17 @@
import asyncio
import tempfile
import os
from utils.csv.loaders import main_old_script_loader as sisgeo_main_loader
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'sisgeo_health' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('sisgeoLoadScript.py') to process it,
and then updates the workflow status in the database.
"""
Carica ed elabora i dati CSV specifici per il tipo 'sisgeo_health'.
Questa funzione è un wrapper per `main_old_script_loader` e passa il nome
dello script di elaborazione come "sisgeoLoadScript".
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/sisgeoLoadScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma sisgeoLoadScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma sisgeoLoadScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await sisgeo_main_loader(cfg, id, pool, "sisgeoLoadScript")

View File

@@ -1,55 +1,17 @@
import asyncio
import tempfile
import os
from utils.csv.loaders import main_old_script_loader as sisgeo_main_loader
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'sisgeo_readings' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('sisgeoLoadScript.py') to process it,
and then updates the workflow status in the database.
"""
Carica ed elabora i dati CSV specifici per il tipo 'sisgeo_readings'.
Questa funzione è un wrapper per `main_old_script_loader` e passa il nome
dello script di elaborazione come "sisgeoLoadScript".
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/sisgeoLoadScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma sisgeoLoadScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma sisgeoLoadScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await sisgeo_main_loader(cfg, id, pool, "sisgeoLoadScript")

View File

@@ -1,56 +1,17 @@
import asyncio
import tempfile
import os
from utils.csv.loaders import main_old_script_loader as sorotecPini_main_loader
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'sorotecpini_co' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('sorotecPini.py') to process it,
and then updates the workflow status in the database.
"""
Carica ed elabora i dati CSV specifici per il tipo 'sorotecpini_co'.
Questa funzione è un wrapper per `sorotecPini_main_loader` e passa il nome
dello script di elaborazione come "sorotecPini".
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/sorotecPini.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma sorotecPini.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma sorotecPini.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await sorotecPini_main_loader(cfg, id, pool, "sorotecPini")

View File

@@ -1,56 +1,17 @@
import asyncio
import tempfile
import os
from utils.csv.loaders import main_old_script_loader as ts_pini_main_loader
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'stazionetotale_integrity_monitor' type.
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('TS_PiniScript.py') to process it,
and then updates the workflow status in the database.
"""
Carica ed elabora i dati CSV specifici per il tipo 'stazionetotale_integrity_monitor'.
Questa funzione è un wrapper per `main_old_script_loader` e passa il nome
dello script di elaborazione come "TS_PiniScript".
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/TS_PiniScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma TS_PiniScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await ts_pini_main_loader(cfg, id, pool, "TS_PiniScript")

View File

@@ -1,56 +1,17 @@
import asyncio
import tempfile
import os
from utils.csv.loaders import main_old_script_loader as ts_pini_main_loader
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Loads and processes CSV data specific to the 'stazionetotale_messpunktepini' type.
Carica ed elabora i dati CSV specifici per il tipo 'stazionetotale_messpunktepini'.
Questa funzione è un wrapper per `ts_pini_main_loader` e passa il nome
dello script di elaborazione come "TS_PiniScript".
This function retrieves CSV data, writes it to a temporary file,
executes an external Python script ('TS_PiniScript.py') to process it,
and then updates the workflow status in the database.
Args:
cfg (object): The configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/TS_PiniScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma TS_PiniScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await ts_pini_main_loader(cfg, id, pool, "TS_PiniScript")