add flag stop elab
This commit is contained in:
@@ -12,6 +12,7 @@ import asyncio
|
||||
from utils.config import loader_load_data as setting
|
||||
from utils.database import WorkflowFlags
|
||||
from utils.csv.loaders import get_next_csv_atomic
|
||||
from utils.database.action_query import check_flag_elab
|
||||
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||
|
||||
# Initialize the logger for this module
|
||||
@@ -42,21 +43,24 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
||||
while True:
|
||||
try:
|
||||
logger.info("Inizio elaborazione")
|
||||
if not await check_flag_elab():
|
||||
record = await get_next_csv_atomic(
|
||||
pool,
|
||||
cfg.dbrectable,
|
||||
WorkflowFlags.CSV_RECEIVED,
|
||||
WorkflowFlags.DATA_LOADED,
|
||||
)
|
||||
|
||||
record = await get_next_csv_atomic(
|
||||
pool,
|
||||
cfg.dbrectable,
|
||||
WorkflowFlags.CSV_RECEIVED,
|
||||
WorkflowFlags.DATA_LOADED,
|
||||
)
|
||||
|
||||
if record:
|
||||
success = await load_csv(record, cfg, pool)
|
||||
if not success:
|
||||
logger.error("Errore durante l'elaborazione")
|
||||
await asyncio.sleep(CSV_PROCESSING_DELAY)
|
||||
if record:
|
||||
success = await load_csv(record, cfg, pool)
|
||||
if not success:
|
||||
logger.error("Errore durante l'elaborazione")
|
||||
await asyncio.sleep(CSV_PROCESSING_DELAY)
|
||||
else:
|
||||
logger.info("Nessun record disponibile")
|
||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||
else:
|
||||
logger.info("Nessun record disponibile")
|
||||
logger.info("Flag fermo elaborazione attivato")
|
||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import os
|
||||
from datetime import datetime
|
||||
import logging
|
||||
import re
|
||||
import mysql.connector
|
||||
|
||||
from utils.database.connection import connetti_db
|
||||
|
||||
from utils.csv.parser import extract_value
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -27,6 +27,9 @@ def on_file_received(self: object, file: str) -> None:
|
||||
cfg = self.cfg
|
||||
path, filenameExt = os.path.split(file)
|
||||
filename, fileExtension = os.path.splitext(filenameExt)
|
||||
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
new_filename = f"{filename}_{timestamp}{fileExtension}"
|
||||
os.rename(file, f"{path}/{new_filename}")
|
||||
if (fileExtension.upper() in (cfg.fileext)):
|
||||
with open(file, 'r', encoding='utf-8', errors='ignore') as csvfile:
|
||||
lines = csvfile.readlines()
|
||||
@@ -74,13 +77,15 @@ def on_file_received(self: object, file: str) -> None:
|
||||
tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}'
|
||||
|
||||
try:
|
||||
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)", (self.username, filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info))
|
||||
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)", (self.username, new_filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'File {file} not loaded. Held in user path.')
|
||||
logger.error(f'File {new_filename} not loaded. Held in user path.')
|
||||
logger.error(f'{e}')
|
||||
"""
|
||||
else:
|
||||
os.remove(file)
|
||||
logger.info(f'File {file} loaded: removed.')
|
||||
logger.info(f'File {new_filename} removed.')
|
||||
"""
|
||||
@@ -132,4 +132,16 @@ async def get_elab_timestamp(id_recv: int, pool: object) -> float:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"id {id_recv} - Errore nella query timestamp elaborazione: {e}")
|
||||
return None
|
||||
return None
|
||||
|
||||
async def check_flag_elab(pool: object) -> None:
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
try:
|
||||
await cur.execute("SELECT ferma_elab from admin_panel")
|
||||
results = await cur.fetchone()
|
||||
return results[0]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore nella query check flag stop elaborazioni: {e}")
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user