This commit is contained in:
2025-05-03 15:40:58 +02:00
parent 138474aa0b
commit e9dc7c1192
5 changed files with 117 additions and 23 deletions

View File

@@ -14,12 +14,14 @@ from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
# Configure logging (moved inside main function)
logger = logging.getLogger(__name__)
class DummySha256Authorizer(DummyAuthorizer):
"""Custom authorizer that uses SHA256 for password hashing and manages users from a database."""
def __init__(self, cfg):
def __init__(self: object, cfg: object) -> None:
"""Initializes the authorizer, adds the admin user, and loads users from the database.
Args:
@@ -47,7 +49,7 @@ class DummySha256Authorizer(DummyAuthorizer):
except Exception as e:
self.responde(f'551 Error in create virtual user path: {e}')
def validate_authentication(self, username, password, handler):
def validate_authentication(self: object, username: str, password: str, handler: object) -> None:
# Validate the user's password against the stored hash
hash = sha256(password.encode("UTF-8")).hexdigest()
try:
@@ -59,7 +61,7 @@ class DummySha256Authorizer(DummyAuthorizer):
class ASEHandler(FTPHandler):
"""Custom FTP handler that extends FTPHandler with custom commands and file handling."""
def __init__(self, conn, server, ioloop=None):
def __init__(self: object, conn: object, server: object, ioloop=None) -> None:
"""Initializes the handler, adds custom commands, and sets up command permissions.
Args:
@@ -87,26 +89,26 @@ class ASEHandler(FTPHandler):
help='Syntax: SITE <SP> LSTU (list virtual users).')}
)
def on_file_received(self, file):
def on_file_received(self: object, file: str) -> None:
return file_management.on_file_received(self, file)
def on_incomplete_file_received(self, file):
def on_incomplete_file_received(self: object, file: str) -> None:
"""Removes partially uploaded files.
Args:
file: The path to the incomplete file.
"""
os.remove(file)
def ftp_SITE_ADDU(self, line):
def ftp_SITE_ADDU(self: object, line: str) -> None:
return user_admin.ftp_SITE_ADDU(self, line)
def ftp_SITE_DISU(self, line):
def ftp_SITE_DISU(self: object, line: str) -> None:
return user_admin.ftp_SITE_DISU(self, line)
def ftp_SITE_ENAU(self, line):
def ftp_SITE_ENAU(self: object, line: str) -> None:
return user_admin.ftp_SITE_ENAU(self, line)
def ftp_SITE_LSTU(self, line):
def ftp_SITE_LSTU(self: object, line: str) -> None:
return user_admin.ftp_SITE_LSTU(self, line)
def main():
@@ -128,6 +130,7 @@ def main():
# Configure logging
logging.basicConfig(
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
# Use cfg.logfilename directly without checking its existence
filename=cfg.logfilename,
level=logging.INFO,
)

View File

@@ -1,41 +1,57 @@
#!.venv/bin/python
# Import necessary libraries
import mysql.connector
import logging
import importlib
import time
import threading
# Import custom modules for configuration and database connection
from utils.config import loader as setting
from utils.database.connection import connetti_db
# Initialize the logger for this module
logger = logging.getLogger(__name__)
def elab_csv(cfg, threads):
# Function to elaborate CSV data
def elab_csv(cfg: object, threads: list) -> bool:
try:
# Establish a database connection
with connetti_db(cfg) as conn:
cur = conn.cursor()
# Select a single record from the raw data table that is not currently locked and has a status of 0
cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = 0 limit 1')
id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone()
if id:
# If a record is found, lock it by updating the 'locked' field to 1
cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}')
# Construct the module name based on unit and tool types for dynamic import
module_name = f'utils.parsers.{unit_type.lower()}_{tool_type.lower()}'
# Dynamically import the module
modulo = importlib.import_module(module_name)
# Get the 'main_loader' function from the imported module
funzione = getattr(modulo, "main_loader")
# Chiamare la funzione
# Create a new thread to execute the 'main_loader' function with the configuration and record ID
thread = threading.Thread(target = funzione, args=(cfg, id))
# Add the thread to the list of active threads
threads.append(thread)
# Start the thread's execution
thread.start()
# Return True to indicate that a record was processed
return True
else:
# If no record is found, wait for 20 seconds before attempting to fetch again
time.sleep(20)
# Return False to indicate that no record was processed
return False
except mysql.connector.Error as e:
# Handle database connection errors
print(f"Error: {e}")
logger.error(f'{e}')
@@ -44,24 +60,32 @@ def main():
cfg = setting.Config()
try:
# Configura la connessione al database PostgreSQL
# Configure logging
# Configure logging to write log messages to a file with a specific format
logging.basicConfig(
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
filename=cfg.logfilename,
level=logging.INFO,
)
# Initialize an empty list to keep track of active threads
threads = []
# Enter an infinite loop to continuously process records
while True:
# Check if the number of active threads exceeds the maximum allowed
while len(threads) > cfg.max_threads:
# Iterate through the list of threads
for thread in threads:
# If a thread is no longer alive (has finished execution)
if not thread.is_alive():
# Remove it from the list of active threads
threads.remove(thread)
# Attempt to process a CSV record
if elab_csv(cfg, threads):
# If a record was successfully processed, log the number of threads currently running
logger.info(f"Threads in execution: {len(threads)}")
pass
except KeyboardInterrupt:
# Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program
logger.info("Info: Shutdown requested...exiting")
except Exception as e:

View File

@@ -4,13 +4,20 @@ import mysql.connector
logger = logging.getLogger(__name__)
def connetti_db(cfg):
"""Establishes a connection to the MySQL database.
"""
Establishes a connection to a MySQL database.
Args:
cfg: The configuration object containing database connection details.
cfg: A configuration object containing database connection parameters.
It should have the following attributes:
- dbuser: The database username.
- dbpass: The database password.
- dbhost: The database host address.
- dbport: The database port number.
- dbname: The name of the database to connect to.
Returns:
A MySQL database connection object.
A MySQL connection object if the connection is successful, otherwise None.
"""
try:
conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport, database=cfg.dbname)
@@ -18,6 +25,5 @@ def connetti_db(cfg):
logger.info("Connected")
return conn
except mysql.connector.Error as e:
logger.error(f'{e}')
exit(e.errno)
return None
logger.error(f"Database connection error: {e}")
raise # Re-raise the exception to be handled by the caller

View File

@@ -6,7 +6,22 @@ import re
logger = logging.getLogger(__name__)
def get_data(cfg, id):
def get_data(cfg: object, id: int) -> tuple:
"""
Retrieves data for a specific tool from the database.
This function connects to the database using the provided configuration,
executes a query to retrieve the unit name, tool name ID, and tool data
associated with the given ID from the raw data table, and returns the results.
Args:
cfg: A configuration object containing database connection parameters
and table names (cfg.dbname, cfg.dbrectable).
id: The ID of the tool record to retrieve.
Returns:
A tuple containing the unit name, tool name ID, and tool data.
"""
with connetti_db(cfg) as conn:
cur = conn.cursor()
cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbname}.{cfg.dbrectable} where id = {id}')
@@ -15,7 +30,28 @@ def get_data(cfg, id):
conn.close()
return unit_name, tool_name, tool_data
def make_matrix(cfg, id):
def make_matrix(cfg: object, id: int) -> list:
"""
Processes raw tool data and transforms it into a matrix format for database insertion.
This function retrieves raw tool data using `get_data`, splits it into individual
readings (rows), and further parses each reading into individual values. It
handles data where multiple nodes might be reporting values within a single
reading. The resulting matrix is a list of lists, where each inner list
represents a row of data ready for insertion into the database. Missing
values are padded with `None` to ensure consistent row length.
Args:
cfg: A configuration object containing database connection parameters
and table names.
id: The ID of the tool record to process.
Returns:
A list of lists (matrix) representing the processed data. Each inner list
contains the following elements: UnitName, ToolNameID, NodeNum, EventDate,
EventTime, BatLevel, Temperature, followed by up to 16 additional
measurement values (Val0 to ValF), padded with None if necessary.
"""
UnitName, ToolNameID, ToolData = get_data(cfg, id)
righe = ToolData.splitlines()
matrice_valori = []
@@ -29,7 +65,28 @@ def make_matrix(cfg, id):
return matrice_valori
def make_loc_matrix(cfg, id):
def make_loc_matrix(cfg: object, id: int) -> list:
"""
Processes raw location (LOC) tool data and transforms it into a matrix format for database insertion.
This function retrieves raw LOC tool data using `get_data`, splits it into
individual readings (rows), and parses each reading into individual values
specific to the LOC data format (timestamp, battery level, temperature, and
four additional values: ain1, ain2, din1, din2). The resulting matrix is a list
of lists, where each inner list represents a row of data ready for insertion
into the database. Missing values are padded with `None` to ensure consistent
row length. It uses a regular expression to filter lines that match the
expected LOC data format.
Args:
cfg: A configuration object containing database connection parameters
and table names.
id: The ID of the tool record to process.
Returns:
A list of lists (matrix) representing the processed LOC data. Each inner
list contains data fields similar to `make_matrix`, adjusted for LOC data.
"""
UnitName, ToolNameID, ToolData = get_data(cfg, id)
righe = ToolData.splitlines()
matrice_valori = []

View File

@@ -1,10 +1,14 @@
#!.venv/bin/python
# Import necessary modules
from utils.database.loader import load_data
from utils.parsers.data_preparation import make_matrix
import logging
logger = logging.getLogger(__name__)
def main_loader(cfg, id):
# Define the main function for loading data
def main_loader(cfg: object, id: int) -> None:
# Create a matrix of values from the data
matrice_valori = make_matrix(cfg, id)
# Load the data into the database
load_data(cfg, matrice_valori)