From dc6ccc1744d37a4ebd35257078cf6fc187c5343a Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 26 Apr 2025 16:39:39 +0200 Subject: [PATCH] prove --- CsvLoader.py | 22 +- FtpCsvReceiver.py | 157 ++++++---- ftpcsvreceiver.ini | 13 +- prova_no_pd.py | 53 ++++ prova_pd.py | 62 ++++ test/test_ftp_csv_receicer.py | 546 ++++++++++++++++++++++++++++++++++ transform_file.py | 31 +- utils/datefmt/__init__.py | 0 utils/datefmt/date_check.py | 24 ++ 9 files changed, 802 insertions(+), 106 deletions(-) create mode 100644 prova_no_pd.py create mode 100644 prova_pd.py create mode 100644 test/test_ftp_csv_receicer.py create mode 100644 utils/datefmt/__init__.py create mode 100644 utils/datefmt/date_check.py diff --git a/CsvLoader.py b/CsvLoader.py index 3d0087f..0c646d1 100755 --- a/CsvLoader.py +++ b/CsvLoader.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 +#!.venv/bin/python import sys import os @@ -6,11 +6,11 @@ import pika import logging import csv import re -import mariadb +import mysql.connector as mysql import shutil -from utils.timefmt import timestamp_fmt as ts -from utils.timefmt import date_refmt as df +from utils.time import timestamp_fmt as ts +from utils.time import date_refmt as df from utils.config import set_config as setting @@ -34,23 +34,19 @@ class sqlraw: def write_db(self): try: - conn = mariadb.connect(**self.config, database=self.dbname) - except mariadb.Error as err: + conn = mysql.connect(**self.config, database=self.dbname) + except Exception as err: logging.error( - "PID {:>5} >> Error to connet to DB {} - System error {}.".format( - os.getpid(), self.dbname, err - ) + f"PID {os.getpid():>5} >> Error to connet to DB {self.dbname} - System error {err}." ) sys.exit(1) cur = conn.cursor() try: cur.execute(self.sql) - except mariadb.ProgrammingError as err: + except Exception as err: logging.error( - "PID {:>5} >> Error write into DB {} - System error {}.".format( - os.getpid(), self.dbname, err - ) + f"PID {os.getpid():>5} >> Error write into DB {self.dbname} - System error {err}." ) print(err) sys.exit(1) diff --git a/FtpCsvReceiver.py b/FtpCsvReceiver.py index 63b5658..9c35e87 100755 --- a/FtpCsvReceiver.py +++ b/FtpCsvReceiver.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python3 +#!.venv/bin/python +"""This module implements an FTP server with custom commands for managing virtual users and handling CSV file uploads.""" import sys import os @@ -7,7 +8,7 @@ import os import re import logging -import psycopg2 +import mysql.connector from hashlib import sha256 from pathlib import Path @@ -15,16 +16,25 @@ from pathlib import Path from utils.time import timestamp_fmt as ts from utils.config import set_config as setting -from pyftpdlib.handlers import FTPHandler, TLS_FTPHandler +from pyftpdlib.handlers import FTPHandler from pyftpdlib.servers import FTPServer from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed + def conn_db(cfg): - return psycopg2.connect(dbname=cfg.dbname, user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport ) + """Establishes a connection to the MySQL database. + + Args: + cfg: The configuration object containing database connection details. + + Returns: + A MySQL database connection object. + """ + return mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport) + def extract_value(patterns, primary_source, secondary_source, default='Not Defined'): - """ - Extracts the first match for a list of patterns from the primary source. + """Extracts the first match for a list of patterns from the primary source. Falls back to the secondary source if no match is found. """ for source in (primary_source, secondary_source): @@ -35,26 +45,36 @@ def extract_value(patterns, primary_source, secondary_source, default='Not Defin return default # Return default if no matches are found + class DummySha256Authorizer(DummyAuthorizer): + """Custom authorizer that uses SHA256 for password hashing and manages users from a database.""" + def __init__(self, cfg): - # Initialize the DummyAuthorizer and add the admin user + """Initializes the authorizer, adds the admin user, and loads users from the database. + + Args: + cfg: The configuration object. + """ super().__init__() self.add_user( cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3]) - # Definisci la connessione al database + # Define the database connection conn = conn_db(cfg) - # Crea un cursore + # Create a cursor cur = conn.cursor() - cur.execute(f'SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbschema}.{cfg.dbusertable} WHERE deleted_at IS NULL') + cur.execute(f'SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL') for ftpuser, hash, virtpath, perm in cur.fetchall(): self.add_user(ftpuser, hash, virtpath, perm) + """ + Create the user's directory if it does not exist. + """ try: Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True) - except: - self.responde('551 Error in create virtual user path.') + except Exception as e: + self.responde(f'551 Error in create virtual user path: {e}') def validate_authentication(self, username, password, handler): # Validate the user's password against the stored hash @@ -66,9 +86,16 @@ class DummySha256Authorizer(DummyAuthorizer): raise AuthenticationFailed class ASEHandler(FTPHandler): + """Custom FTP handler that extends FTPHandler with custom commands and file handling.""" def __init__(self, conn, server, ioloop=None): - # Initialize the FTPHandler and add custom commands + """Initializes the handler, adds custom commands, and sets up command permissions. + + Args: + conn: The connection object. + server: The FTP server object. + ioloop: The I/O loop object. + """ super().__init__(conn, server, ioloop) self.proto_cmds = FTPHandler.proto_cmds.copy() # Add custom FTP commands for managing virtual users - command in lowercase @@ -77,12 +104,12 @@ class ASEHandler(FTPHandler): help='Syntax: SITE ADDU USERNAME PASSWORD (add virtual user).')} ) self.proto_cmds.update( - {'SITE DELU': dict(perm='M', auth=True, arg=True, - help='Syntax: SITE DELU USERNAME (remove virtual user).')} + {'SITE DISU': dict(perm='M', auth=True, arg=True, + help='Syntax: SITE DISU USERNAME (disable virtual user).')} ) self.proto_cmds.update( - {'SITE RESU': dict(perm='M', auth=True, arg=True, - help='Syntax: SITE RESU USERNAME (restore virtual user).')} + {'SITE ENAU': dict(perm='M', auth=True, arg=True, + help='Syntax: SITE ENAU USERNAME (enable virtual user).')} ) self.proto_cmds.update( {'SITE LSTU': dict(perm='M', auth=True, arg=None, @@ -90,6 +117,11 @@ class ASEHandler(FTPHandler): ) def on_file_received(self, file): + """Handles the event when a file is successfully received. + + Args: + file: The path to the received file. + """ if not os.stat(file).st_size: os.remove(file) logging.info(f'File {file} was empty: removed.') @@ -108,14 +140,14 @@ class ASEHandler(FTPHandler): conn = conn_db(cfg) - # Crea un cursore + # Create a cursor cur = conn.cursor() try: - cur.execute(f"INSERT INTO {cfg.dbschema}.{cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)", (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), lines)) + cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)", (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines))) conn.commit() conn.close() - except psycopg2.Error as e: + except Exception as e: logging.error(f'File {file} not loaded. Held in user path.') logging.error(f'{e}') else: @@ -123,13 +155,14 @@ class ASEHandler(FTPHandler): logging.info(f'File {file} loaded: removed.') def on_incomplete_file_received(self, file): - # Remove partially uploaded files + """Removes partially uploaded files. + Args: + file: The path to the incomplete file. + """ os.remove(file) def ftp_SITE_ADDU(self, line): - """ - Add a virtual user and save the virtuser configuration file. - Create a directory for the virtual user in the specified virtpath. + """Adds a virtual user, creates their directory, and saves their details to the database. """ cfg = self.cfg try: @@ -137,38 +170,36 @@ class ASEHandler(FTPHandler): user = os.path.basename(parms[0]) # Extract the username password = parms[1] # Get the password hash = sha256(password.encode("UTF-8")).hexdigest() # Hash the password - except: + except IndexError: self.respond('501 SITE ADDU failed. Command needs 2 arguments') else: try: # Create the user's directory Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True) - except: - self.respond('551 Error in create virtual user path.') + except Exception as e: + self.respond(f'551 Error in create virtual user path: {e}') else: try: # Add the user to the authorizer self.authorizer.add_user(str(user), hash, cfg.virtpath + "/" + user, perm=cfg.defperm) # Save the user to the database - # Definisci la connessione al database + # Define the database connection conn = conn_db(cfg) - # Crea un cursore + # Create a cursor cur = conn.cursor() - cur.execute(f"INSERT INTO {cfg.dbschema}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('{user}', '{hash}', '{cfg.virtpath + user}', '{cfg.defperm}')") + cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('{user}', '{hash}', '{cfg.virtpath + user}', '{cfg.defperm}')") conn.commit() conn.close() - logging.info("User {} created.".format(user)) + logging.info(f"User {user} created.") self.respond('200 SITE ADDU successful.') - except psycopg2.Error as e: - self.respond('501 SITE ADDU failed.') + except Exception as e: + self.respond(f'501 SITE ADDU failed: {e}.') print(e) - def ftp_SITE_DELU(self, line): - """ - Remove a virtual user and save the virtuser configuration file. - """ + def ftp_SITE_DISU(self, line): + """Removes a virtual user from the authorizer and marks them as deleted in the database.""" cfg = self.cfg parms = line.split() user = os.path.basename(parms[0]) # Extract the username @@ -180,20 +211,18 @@ class ASEHandler(FTPHandler): # Crea un cursore cur = conn.cursor() - cur.execute(f"UPDATE {cfg.dbschema}.{cfg.dbusertable} SET deleted_at = now() WHERE ftpuser = '{user}'") + cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = now() WHERE ftpuser = '{user}'") conn.commit() conn.close() - logging.info("User {} deleted.".format(user)) - self.respond('200 SITE DELU successful.') - except psycopg2.Error as e: - self.respond('501 SITE DELU failed.') + logging.info(f"User {user} deleted.") + self.respond('200 SITE DISU successful.') + except Exception as e: + self.respond('501 SITE DISU failed.') print(e) - def ftp_SITE_RESU(self, line): - """ - Restore a virtual user and save the virtuser configuration file. - """ + def ftp_SITE_ENAU(self, line): + """Restores a virtual user by updating their status in the database and adding them back to the authorizer.""" cfg = self.cfg parms = line.split() user = os.path.basename(parms[0]) # Extract the username @@ -204,33 +233,31 @@ class ASEHandler(FTPHandler): # Crea un cursore cur = conn.cursor() try: - cur.execute(f"UPDATE {cfg.dbschema}.{cfg.dbusertable} SET deleted_at = null WHERE ftpuser = '{user}'") + cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = null WHERE ftpuser = '{user}'") conn.commit() - except psycopg2.Error as e: - logging.error("Update DB failed: {}".format(e)) + except Exception as e: + logging.error(f"Update DB failed: {e}") - cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbschema}.{cfg.dbusertable} WHERE ftpuser = '{user}'") + cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE ftpuser = '{user}'") ftpuser, hash, virtpath, perm = cur.fetchone() self.authorizer.add_user(ftpuser, hash, virtpath, perm) try: Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True) - except: - self.responde('551 Error in create virtual user path.') + except Exception as e: + self.responde(f'551 Error in create virtual user path: {e}') conn.close() - logging.info("User {} restored.".format(user)) - self.respond('200 SITE RESU successful.') + logging.info(f"User {user} restored.") + self.respond('200 SITE ENAU successful.') except Exception as e: - self.respond('501 SITE RESU failed.') + self.respond('501 SITE ENAU failed.') print(e) def ftp_SITE_LSTU(self, line): - """ - List all virtual users. - """ + """Lists all virtual users from the database.""" cfg = self.cfg users_list = [] try: @@ -240,15 +267,16 @@ class ASEHandler(FTPHandler): # Crea un cursore cur = conn.cursor() self.push("214-The following virtual users are defined:\r\n") - cur.execute(f'SELECT ftpuser, perm FROM {cfg.dbschema}.{cfg.dbusertable} WHERE deleted_at IS NULL ') + cur.execute(f'SELECT ftpuser, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL ') [users_list.append(f'Username: {ftpuser}\tPerms: {perm}\r\n') for ftpuser, perm in cur.fetchall()] self.push(''.join(users_list)) self.respond("214 LSTU SITE command successful.") - except: - self.respond('501 list users failed.') + except Exception as e: + self.respond(f'501 list users failed: {e}') def main(): + """Main function to start the FTP server.""" # Load the configuration settings cfg = setting.config() @@ -275,15 +303,14 @@ def main(): server.serve_forever() except KeyboardInterrupt: logging.info( - "Info: {}.".format("Shutdown requested...exiting") + "Info: Shutdown requested...exiting" ) except Exception: print( - "{} - PID {:>5} >> Error: {}.".format( - ts.timestamp("log"), os.getpid(), sys.exc_info()[1] - ) + f"{ts.timestamp("log")} - PID {os.getpid():>5} >> Error: {sys.exc_info()[1]}." ) + if __name__ == "__main__": main() \ No newline at end of file diff --git a/ftpcsvreceiver.ini b/ftpcsvreceiver.ini index c76c73e..4b550da 100644 --- a/ftpcsvreceiver.ini +++ b/ftpcsvreceiver.ini @@ -1,12 +1,13 @@ # to generete adminuser password hash: -# python3 -c 'from hashlib import md5;print(md5("????password???".encode("UTF-8")).hexdigest())' +# python3 -c 'from hashlib import sha256;print(sha256("????password???".encode("UTF-8")).hexdigest())' + [ftpserver] firstPort = 40000 logFilename = ./ftppylog.log proxyAddr = 0.0.0.0 portRangeWidth = 500 virtpath = /home/alex/aseftp/ - adminuser = admin|83e61ecb0e9871aff37a12491aa848f884f5657ddbfd46454878e28afbecfc20|/home/alex/aseftp/|elradfmwMT + adminuser = admin|87b164c8d4c0af8fbab7e05db6277aea8809444fb28244406e489b66c92ba2bd|/home/alex/aseftp/|elradfmwMT servertype = FTPHandler certfile = /home/alex/aseftp/keycert.pem fileext = .CSV|.TXT @@ -20,11 +21,11 @@ logFilename = csvElab.log [db] - hostname = 10.211.114.101 - port = 5432 - user = asepg + hostname = 10.211.114.173 + port = 3306 + user = root password = batt1l0 - dbName = asedb + dbName = ase_lar dbSchema = public userTableName = virtusers recTableName = received diff --git a/prova_no_pd.py b/prova_no_pd.py new file mode 100644 index 0000000..3e86243 --- /dev/null +++ b/prova_no_pd.py @@ -0,0 +1,53 @@ + +import mysql.connector +import utils.datefmt.date_check as date_check + + +righe = ["17/03/2022 15:10;13.7;14.8;|;401;832;17373;-8;920;469;9;|;839;133;17116;675;941;228;10;|;-302;-1252;17165;288;75;-940;10;|;739;76;17203;562;879;604;9;|;1460;751;16895;672;1462;132;10;|;-1088;-1883;16675;244;1071;518;10;|;-29;-1683;16923;384;1039;505;11;|;1309;-1095;17066;-36;324;-552;10;|;-36;-713;16701;-121;372;122;10;|;508;-1318;16833;475;1154;405;10;|;1178;878;17067;636;1114;428;10;|;1613;-573;17243;291;-234;-473;9;|;-107;-259;17287;94;421;369;10;|;-900;-647;16513;168;1330;252;10;|;1372;286;17035;202;263;469;10;|;238;-2006;17142;573;1201;492;9;|;2458;589;17695;356;187;208;11;|;827;-1085;17644;308;233;66;10;|;1;-1373;17214;557;1279;298;9;|;-281;-244;17071;209;517;-36;10;|;-486;-961;17075;467;440;367;10;|;1264;-339;16918;374;476;116;8;|;661;-1330;16789;-37;478;15;9;|;1208;-724;16790;558;1303;335;8;|;-236;-1404;16678;309;426;376;8;|;367;-1402;17308;-32;428;-957;7;|;-849;-360;17640;1;371;635;7;|;-784;90;17924;533;128;-661;5;|;-723;-1062;16413;270;-79;702;7;|;458;-1235;16925;354;-117;194;5;|;-411;-1116;17403;280;777;530;1;;;;;;;;;;;;;;", +"17/03/2022 15:13;13.6;14.8;|;398;836;17368;-3;924;472;9;|;838;125;17110;675;938;230;10;|;-298;-1253;17164;290;75;-942;10;|;749;78;17221;560;883;601;9;|;1463;752;16904;673;1467;134;10;|;-1085;-1884;16655;239;1067;520;10;|;-27;-1680;16923;393;1032;507;10;|;1308;-1095;17065;-43;328;-548;10;|;-38;-712;16704;-124;373;122;10;|;512;-1318;16830;473;1155;408;10;|;1181;879;17070;637;1113;436;10;|;1610;-567;17239;287;-240;-462;10;|;-108;-250;17297;94;420;370;10;|;-903;-652;16518;169;1326;257;9;|;1371;282;17047;198;263;471;10;|;244;-2006;17137;570;1205;487;9;|;2461;589;17689;354;199;210;11;|;823;-1081;17642;310;235;68;10;|;1;-1370;17214;560;1278;290;9;|;-280;-245;17062;209;517;-31;9;|;-484;-963;17074;463;440;374;10;|;1271;-340;16912;374;477;125;8;|;668;-1331;16786;-37;478;7;9;|;1209;-724;16784;557;1301;329;8;|;-237;-1406;16673;316;425;371;8;|;371;-1401;17307;-30;429;-961;7;|;-854;-356;17647;7;368;631;7;|;-781;85;17934;531;130;-664;5;|;-726;-1062;16400;274;-79;707;6;|;460;-1233;16931;355;-113;196;5;|;-413;-1119;17405;280;780;525;1", +"17/03/2022 15:28;13.6;14.3;|;396;832;17379;-3;919;470;10;|;837;128;17114;670;945;233;10;|;-304;-1246;17167;292;77;-931;10;|;744;70;17211;567;888;601;9;|;1459;748;16893;672;1480;141;10;|;-1084;-1887;16658;236;1068;522;10;|;-29;-1686;16912;388;1035;500;10;|;1312;-1092;17062;-35;328;-545;10;|;-40;-709;16701;-120;374;121;10;|;515;-1327;16826;475;1148;402;10;|;1179;881;17063;635;1114;430;9;|;1613;-568;17246;293;-230;-461;9;|;-103;-265;17289;96;420;363;10;|;-896;-656;16522;167;1320;250;10;|;1368;288;17039;195;263;471;9;|;239;-2003;17129;578;1203;490;9;|;2461;586;17699;356;202;209;11;|;823;-1092;17649;310;237;65;10;|;-7;-1369;17215;550;1279;288;9;|;-290;-249;17072;208;515;-33;9;|;-488;-965;17071;472;439;372;10;|;1270;-342;16923;377;476;120;8;|;671;-1337;16788;-33;482;14;9;|;1206;-725;16783;556;1306;344;9;|;-232;-1404;16681;309;423;379;8;|;364;-1400;17305;-28;432;-952;7;|;-854;-363;17644;1;369;626;8;|;-782;89;17931;529;134;-661;5;|;-723;-1057;16407;269;-82;700;6;|;459;-1235;16929;358;-119;193;5;|;-414;-1122;17400;282;775;526;2"] + +#Dividi la riga principale usando il primo delimitatore ';' + +sql_insert_RAWDATA = ''' +INSERT IGNORE INTO ase_lar.RAWDATACOR ( + `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, + `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, + `Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`, + `BatLevelModule`,`TemperatureModule`, `RssiModule` +) +VALUES ( + %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s +) +''' + +def make_matrix(righe): + UnitName = 'ID0003' + ToolNameID = 'DT0002' + matrice_valori = [] + for riga in righe: + timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) + EventDate, EventTime = timestamp.split(' ') + valori_nodi = rilevazioni.rstrip(';').split(';|;')[1:] # Toglie eventuali ';' finali, dividi per '|' e prendi gli elementi togliendo il primo che è vuoto + for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): + valori = valori_nodo.split(';')[1:-1] + matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + return matrice_valori + +matrice_valori = make_matrix(righe) + +with mysql.connector.connect(user='root', password='batt1l0', host='10.211.114.173', port=3306) as conn: + cur = conn.cursor() + try: + cur.executemany(sql_insert_RAWDATA, matrice_valori) + conn.commit() + except Exception as e: + conn.rollback() + print(f'Error: {e}') + + + + diff --git a/prova_pd.py b/prova_pd.py new file mode 100644 index 0000000..679d9cd --- /dev/null +++ b/prova_pd.py @@ -0,0 +1,62 @@ + +#import mysql.connector +from sqlalchemy import create_engine, MetaData, Table +from sqlalchemy.orm import declarative_base, Session +from sqlalchemy.exc import IntegrityError +import pandas as pd +import utils.datefmt.date_check as date_check + +righe = ["17/03/2022 15:10;13.7;14.8;|;401;832;17373;-8;920;469;9;|;839;133;17116;675;941;228;10;|;-302;-1252;17165;288;75;-940;10;|;739;76;17203;562;879;604;9;|;1460;751;16895;672;1462;132;10;|;-1088;-1883;16675;244;1071;518;10;|;-29;-1683;16923;384;1039;505;11;|;1309;-1095;17066;-36;324;-552;10;|;-36;-713;16701;-121;372;122;10;|;508;-1318;16833;475;1154;405;10;|;1178;878;17067;636;1114;428;10;|;1613;-573;17243;291;-234;-473;9;|;-107;-259;17287;94;421;369;10;|;-900;-647;16513;168;1330;252;10;|;1372;286;17035;202;263;469;10;|;238;-2006;17142;573;1201;492;9;|;2458;589;17695;356;187;208;11;|;827;-1085;17644;308;233;66;10;|;1;-1373;17214;557;1279;298;9;|;-281;-244;17071;209;517;-36;10;|;-486;-961;17075;467;440;367;10;|;1264;-339;16918;374;476;116;8;|;661;-1330;16789;-37;478;15;9;|;1208;-724;16790;558;1303;335;8;|;-236;-1404;16678;309;426;376;8;|;367;-1402;17308;-32;428;-957;7;|;-849;-360;17640;1;371;635;7;|;-784;90;17924;533;128;-661;5;|;-723;-1062;16413;270;-79;702;7;|;458;-1235;16925;354;-117;194;5;|;-411;-1116;17403;280;777;530;1", +"19/03/2022 15:13;13.6;14.8;|;398;836;17368;-3;924;472;9;|;838;125;17110;675;938;230;10;|;-298;-1253;17164;290;75;-942;10;|;749;78;17221;560;883;601;9;|;1463;752;16904;673;1467;134;10;|;-1085;-1884;16655;239;1067;520;10;|;-27;-1680;16923;393;1032;507;10;|;1308;-1095;17065;-43;328;-548;10;|;-38;-712;16704;-124;373;122;10;|;512;-1318;16830;473;1155;408;10;|;1181;879;17070;637;1113;436;10;|;1610;-567;17239;287;-240;-462;10;|;-108;-250;17297;94;420;370;10;|;-903;-652;16518;169;1326;257;9;|;1371;282;17047;198;263;471;10;|;244;-2006;17137;570;1205;487;9;|;2461;589;17689;354;199;210;11;|;823;-1081;17642;310;235;68;10;|;1;-1370;17214;560;1278;290;9;|;-280;-245;17062;209;517;-31;9;|;-484;-963;17074;463;440;374;10;|;1271;-340;16912;374;477;125;8;|;668;-1331;16786;-37;478;7;9;|;1209;-724;16784;557;1301;329;8;|;-237;-1406;16673;316;425;371;8;|;371;-1401;17307;-30;429;-961;7;|;-854;-356;17647;7;368;631;7;|;-781;85;17934;531;130;-664;5;|;-726;-1062;16400;274;-79;707;6;|;460;-1233;16931;355;-113;196;5;|;-413;-1119;17405;280;780;525;1", +"19/03/2022 15:28;13.6;14.3;|;396;832;17379;-3;919;470;10;|;837;128;17114;670;945;233;10;|;-304;-1246;17167;292;77;-931;10;|;744;70;17211;567;888;601;9;|;1459;748;16893;672;1480;141;10;|;-1084;-1887;16658;236;1068;522;10;|;-29;-1686;16912;388;1035;500;10;|;1312;-1092;17062;-35;328;-545;10;|;-40;-709;16701;-120;374;121;10;|;515;-1327;16826;475;1148;402;10;|;1179;881;17063;635;1114;430;9;|;1613;-568;17246;293;-230;-461;9;|;-103;-265;17289;96;420;363;10;|;-896;-656;16522;167;1320;250;10;|;1368;288;17039;195;263;471;9;|;239;-2003;17129;578;1203;490;9;|;2461;586;17699;356;202;209;11;|;823;-1092;17649;310;237;65;10;|;-7;-1369;17215;550;1279;288;9;|;-290;-249;17072;208;515;-33;9;|;-488;-965;17071;472;439;372;10;|;1270;-342;16923;377;476;120;8;|;671;-1337;16788;-33;482;14;9;|;1206;-725;16783;556;1306;344;9;|;-232;-1404;16681;309;423;379;8;|;364;-1400;17305;-28;432;-952;7;|;-854;-363;17644;1;369;626;8;|;-782;89;17931;529;134;-661;5;|;-723;-1057;16407;269;-82;700;6;|;459;-1235;16929;358;-119;193;5;|;-414;-1122;17400;282;775;526;2"] + +''' +righe = ["17/03/2022 15:10;13.7;14.8;|;401;832;17373;-8;920;469;9;|;839;133;17116;675;941;228;10;|;-302;-1252;17165;288;75;-940;10;|;739;76;17203;562;879;604;9;|;1460;751;16895;672;1462;132;10;|;-1088;-1883;16675;244;1071;518;10;|;-29;-1683;16923;384;1039;505;11;|;1309;-1095;17066;-36;324;-552;10;|;-36;-713;16701;-121;372;122;10;|;508;-1318;16833;475;1154;405;10;|;1178;878;17067;636;1114;428;10;|;1613;-573;17243;291;-234;-473;9;|;-107;-259;17287;94;421;369;10;|;-900;-647;16513;168;1330;252;10;|;1372;286;17035;202;263;469;10;|;238;-2006;17142;573;1201;492;9;|;2458;589;17695;356;187;208;11;|;827;-1085;17644;308;233;66;10;|;1;-1373;17214;557;1279;298;9;|;-281;-244;17071;209;517;-36;10;|;-486;-961;17075;467;440;367;10;|;1264;-339;16918;374;476;116;8;|;661;-1330;16789;-37;478;15;9;|;1208;-724;16790;558;1303;335;8;|;-236;-1404;16678;309;426;376;8;|;367;-1402;17308;-32;428;-957;7;|;-849;-360;17640;1;371;635;7;|;-784;90;17924;533;128;-661;5;|;-723;-1062;16413;270;-79;702;7;|;458;-1235;16925;354;-117;194;5;|;-411;-1116;17403;280;777;530;1", +"17/03/2022 15:13;13.6;14.8;|;398;836;17368;-3;924;472;9;|;838;125;17110;675;938;230;10;|;-298;-1253;17164;290;75;-942;10;|;749;78;17221;560;883;601;9;|;1463;752;16904;673;1467;134;10;|;-1085;-1884;16655;239;1067;520;10;|;-27;-1680;16923;393;1032;507;10;|;1308;-1095;17065;-43;328;-548;10;|;-38;-712;16704;-124;373;122;10;|;512;-1318;16830;473;1155;408;10;|;1181;879;17070;637;1113;436;10;|;1610;-567;17239;287;-240;-462;10;|;-108;-250;17297;94;420;370;10;|;-903;-652;16518;169;1326;257;9;|;1371;282;17047;198;263;471;10;|;244;-2006;17137;570;1205;487;9;|;2461;589;17689;354;199;210;11;|;823;-1081;17642;310;235;68;10;|;1;-1370;17214;560;1278;290;9;|;-280;-245;17062;209;517;-31;9;|;-484;-963;17074;463;440;374;10;|;1271;-340;16912;374;477;125;8;|;668;-1331;16786;-37;478;7;9;|;1209;-724;16784;557;1301;329;8;|;-237;-1406;16673;316;425;371;8;|;371;-1401;17307;-30;429;-961;7;|;-854;-356;17647;7;368;631;7;|;-781;85;17934;531;130;-664;5;|;-726;-1062;16400;274;-79;707;6;|;460;-1233;16931;355;-113;196;5;|;-413;-1119;17405;280;780;525;1", +"17/03/2022 15:28;13.6;14.3;|;396;832;17379;-3;919;470;10;|;837;128;17114;670;945;233;10;|;-304;-1246;17167;292;77;-931;10;|;744;70;17211;567;888;601;9;|;1459;748;16893;672;1480;141;10;|;-1084;-1887;16658;236;1068;522;10;|;-29;-1686;16912;388;1035;500;10;|;1312;-1092;17062;-35;328;-545;10;|;-40;-709;16701;-120;374;121;10;|;515;-1327;16826;475;1148;402;10;|;1179;881;17063;635;1114;430;9;|;1613;-568;17246;293;-230;-461;9;|;-103;-265;17289;96;420;363;10;|;-896;-656;16522;167;1320;250;10;|;1368;288;17039;195;263;471;9;|;239;-2003;17129;578;1203;490;9;|;2461;586;17699;356;202;209;11;|;823;-1092;17649;310;237;65;10;|;-7;-1369;17215;550;1279;288;9;|;-290;-249;17072;208;515;-33;9;|;-488;-965;17071;472;439;372;10;|;1270;-342;16923;377;476;120;8;|;671;-1337;16788;-33;482;14;9;|;1206;-725;16783;556;1306;344;9;|;-232;-1404;16681;309;423;379;8;|;364;-1400;17305;-28;432;-952;7;|;-854;-363;17644;1;369;626;8;|;-782;89;17931;529;134;-661;5;|;-723;-1057;16407;269;-82;700;6;|;459;-1235;16929;358;-119;193;5;|;-414;-1122;17400;282;775;526;2"] +''' +#Dividi la riga principale usando il primo delimitatore ';' +UnitName = '' +ToolNameID = '' +matrice_valori = [] +for riga in righe: + timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) + EventDate, EventTime = timestamp.split(' ') + valori_nodi = rilevazioni.split('|')[1:-1] # Dividi per '|' e prendi gli elementi interni togliendo primo e ultimo + for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): + valori = valori_nodo.split(';')[1:-1] + matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (16 - len(valori)))) + +# Crea un DataFrame pandas per visualizzare la matrice in forma tabellare +colonne = ['UnitName', 'ToolNameID', 'NodeNum', 'EventDate', 'EventTime', 'BatLevel', 'Temperature', 'Val0', 'Val1', 'Val2', 'Val3', 'Val4', 'Val5', 'Val6', 'Val7', 'Val8', 'Val9', 'ValA', 'ValB', 'ValC', 'ValD', 'ValE', 'ValF'] +df = pd.DataFrame(matrice_valori, columns=colonne) + +# Stampa il DataFrame +#print(df.to_string()) + +engine = create_engine('mysql+mysqlconnector://root:batt1l0@10.211.114.173/ase_lar') + +metadata = MetaData() + +table = Table('RAWDATACOR', metadata, autoload_with=engine) + +Base = declarative_base() + +class RawDataCor(Base): + __table__ = table + +with Session(engine) as session: + for index, row in df.iterrows(): + try: + nuova_riga = RawDataCor(**row.to_dict()) + session.add(nuova_riga) + session.commit() + except IntegrityError: + session.rollback() # Ignora l'errore di chiave duplicata + print(f"Riga con chiavi duplicate ignorata: {row.to_dict()}") + except Exception as e: + session.rollback() + print(f"Errore inatteso durante l'inserimento: {e}, riga: {row.to_dict()}") + + +#df.to_sql('RAWDATACOR', con=engine, if_exists='', index=False) diff --git a/test/test_ftp_csv_receicer.py b/test/test_ftp_csv_receicer.py new file mode 100644 index 0000000..25e9071 --- /dev/null +++ b/test/test_ftp_csv_receicer.py @@ -0,0 +1,546 @@ +import unittest +import os +import sys +from unittest.mock import patch, MagicMock, mock_open, call, ANY +from hashlib import sha256 +from pathlib import Path +from types import SimpleNamespace # Used to create mock config objects + +# Add the parent directory to sys.path to allow importing FtpCsvReceiver +# Adjust this path if your test file is located differently +script_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(script_dir) +# If FtpCsvReceiver.py is in the same directory as the test file, you might not need this +# If it's in the parent directory (like /home/alex/devel/ASE/), use this: +sys.path.insert(0, parent_dir) + +# Now import the components to test +# We need to import AFTER modifying sys.path if necessary +# Also, mock dependencies BEFORE importing the module that uses them +# Mock mysql.connector BEFORE importing FtpCsvReceiver +mock_mysql_connector = MagicMock() +sys.modules['mysql.connector'] = mock_mysql_connector + +# Mock the custom utils modules as well if they aren't available in the test environment +mock_utils_time = MagicMock() +mock_utils_config = MagicMock() +sys.modules['utils.time'] = mock_utils_time +sys.modules['utils.config'] = mock_utils_config +# Mock the setting.config() call specifically +mock_config_instance = MagicMock() +mock_utils_config.set_config.config.return_value = mock_config_instance + +# Mock pyftpdlib classes if needed for specific tests, but often mocking methods is enough +# sys.modules['pyftpdlib.handlers'] = MagicMock() +# sys.modules['pyftpdlib.authorizers'] = MagicMock() +# sys.modules['pyftpdlib.servers'] = MagicMock() + +# Import the module AFTER mocking dependencies +import FtpCsvReceiver +from FtpCsvReceiver import ( + extract_value, + DummySha256Authorizer, + ASEHandler, + conn_db, # Import even though we mock mysql.connector +) + +# --- Test Configuration Setup --- +def create_mock_cfg(): + """Creates a mock configuration object for testing.""" + cfg = SimpleNamespace() + cfg.adminuser = ['admin', sha256(b'adminpass').hexdigest(), '/fake/admin/path', 'elradfmwMT'] + cfg.dbhost = 'mockhost' + cfg.dbport = 3306 + cfg.dbuser = 'mockuser' + cfg.dbpass = 'mockpass' + cfg.dbname = 'mockdb' + cfg.dbusertable = 'mock_virtusers' + cfg.dbrectable = 'mock_received' + cfg.virtpath = '/fake/ftp/root/' + cfg.defperm = 'elmw' + cfg.fileext = ['.CSV', '.TXT'] + # Add patterns as lists of strings + cfg.units_name = [r'ID\d{4}', r'IX\d{4}'] + cfg.units_type = [r'G801', r'G201'] + cfg.tools_name = [r'LOC\d{4}', r'DT\d{4}'] + cfg.tools_type = [r'MUX', r'MUMS'] + # Add other necessary config values + cfg.logfilename = 'test_ftp.log' + cfg.proxyaddr = '0.0.0.0' + cfg.firstport = 40000 + cfg.portrangewidth = 10 + return cfg + +# --- Test Cases --- + +class TestExtractValue(unittest.TestCase): + + def test_extract_from_primary(self): + patterns = [r'ID(\d+)'] + primary = "File_ID1234_data.csv" + secondary = "Some other text" + self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") + + def test_extract_from_secondary(self): + patterns = [r'Type(A|B)'] + primary = "Filename_without_type.txt" + secondary = "Log data: TypeB found" + self.assertEqual(extract_value(patterns, primary, secondary), "TypeB") + + def test_no_match(self): + patterns = [r'XYZ\d+'] + primary = "File_ID1234_data.csv" + secondary = "Log data: TypeB found" + self.assertEqual(extract_value(patterns, primary, secondary, default="NotFound"), "NotFound") + + def test_case_insensitive(self): + patterns = [r'id(\d+)'] + primary = "File_ID1234_data.csv" + secondary = "Some other text" + self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") # Note: re.findall captures original case + + def test_multiple_patterns(self): + patterns = [r'Type(A|B)', r'ID(\d+)'] + primary = "File_ID1234_data.csv" + secondary = "Log data: TypeB found" + # Should match the first pattern found in the primary source + self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") + + def test_multiple_patterns_secondary_match(self): + patterns = [r'XYZ\d+', r'Type(A|B)'] + primary = "File_ID1234_data.csv" + secondary = "Log data: TypeB found" + # Should match the second pattern in the secondary source + self.assertEqual(extract_value(patterns, primary, secondary), "TypeB") + + +class TestDummySha256Authorizer(unittest.TestCase): + + def setUp(self): + self.mock_cfg = create_mock_cfg() + # Mock the database connection and cursor + self.mock_conn = MagicMock() + self.mock_cursor = MagicMock() + mock_mysql_connector.connect.return_value = self.mock_conn + self.mock_conn.cursor.return_value = self.mock_cursor + + @patch('FtpCsvReceiver.Path') # Mock Path object + def test_init_loads_users(self, mock_path_constructor): + # Mock Path instance methods + mock_path_instance = MagicMock() + mock_path_constructor.return_value = mock_path_instance + + # Simulate database result + db_users = [ + ('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr'), + ('user2', sha256(b'pass2').hexdigest(), '/fake/ftp/root/user2', 'elmw'), + ] + self.mock_cursor.fetchall.return_value = db_users + + authorizer = DummySha256Authorizer(self.mock_cfg) + + # Verify DB connection + mock_mysql_connector.connect.assert_called_once_with( + user=self.mock_cfg.dbuser, password=self.mock_cfg.dbpass, + host=self.mock_cfg.dbhost, port=self.mock_cfg.dbport + ) + # Verify query + self.mock_cursor.execute.assert_called_once_with( + f'SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL' + ) + # Verify admin user added + self.assertIn('admin', authorizer.user_table) + self.assertEqual(authorizer.user_table['admin']['pwd'], self.mock_cfg.adminuser[1]) + # Verify DB users added + self.assertIn('user1', authorizer.user_table) + self.assertEqual(authorizer.user_table['user1']['pwd'], db_users[0][1]) + self.assertEqual(authorizer.user_table['user1']['home'], db_users[0][2]) + self.assertEqual(authorizer.user_table['user1']['perm'], db_users[0][3]) + self.assertIn('user2', authorizer.user_table) + # Verify directories were "created" + expected_path_calls = [ + call(self.mock_cfg.virtpath + 'user1'), + call(self.mock_cfg.virtpath + 'user2'), + ] + mock_path_constructor.assert_has_calls(expected_path_calls, any_order=True) + self.assertEqual(mock_path_instance.mkdir.call_count, 2) + mock_path_instance.mkdir.assert_called_with(parents=True, exist_ok=True) + + @patch('FtpCsvReceiver.Path') + def test_init_mkdir_exception(self, mock_path_constructor): + # Simulate database result + db_users = [('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr')] + self.mock_cursor.fetchall.return_value = db_users + + # Mock Path to raise an exception + mock_path_instance = MagicMock() + mock_path_constructor.return_value = mock_path_instance + mock_path_instance.mkdir.side_effect = OSError("Permission denied") + + # We expect initialization to continue, but maybe log an error (though the code uses self.responde which isn't available here) + # For a unit test, we just check that the user is still added + authorizer = DummySha256Authorizer(self.mock_cfg) + self.assertIn('user1', authorizer.user_table) + mock_path_instance.mkdir.assert_called_once() + + + def test_validate_authentication_success(self): + self.mock_cursor.fetchall.return_value = [] # No DB users for simplicity + authorizer = DummySha256Authorizer(self.mock_cfg) + # Test admin user + authorizer.validate_authentication('admin', 'adminpass', None) # Handler not used in this method + + def test_validate_authentication_wrong_password(self): + self.mock_cursor.fetchall.return_value = [] + authorizer = DummySha256Authorizer(self.mock_cfg) + with self.assertRaises(FtpCsvReceiver.AuthenticationFailed): + authorizer.validate_authentication('admin', 'wrongpass', None) + + def test_validate_authentication_unknown_user(self): + self.mock_cursor.fetchall.return_value = [] + authorizer = DummySha256Authorizer(self.mock_cfg) + with self.assertRaises(FtpCsvReceiver.AuthenticationFailed): + authorizer.validate_authentication('unknown', 'somepass', None) + + +class TestASEHandler(unittest.TestCase): + + def setUp(self): + self.mock_cfg = create_mock_cfg() + self.mock_conn = MagicMock() # Mock FTP connection object + self.mock_server = MagicMock() # Mock FTP server object + self.mock_authorizer = MagicMock(spec=DummySha256Authorizer) # Mock authorizer + + # Instantiate the handler + # We need to manually set cfg and authorizer as done in main() + self.handler = ASEHandler(self.mock_conn, self.mock_server) + self.handler.cfg = self.mock_cfg + self.handler.authorizer = self.mock_authorizer + self.handler.respond = MagicMock() # Mock the respond method + self.handler.push = MagicMock() # Mock the push method + + # Mock database for handler methods + self.mock_db_conn = MagicMock() + self.mock_db_cursor = MagicMock() + # Patch conn_db globally for this test class + self.patcher_conn_db = patch('FtpCsvReceiver.conn_db', return_value=self.mock_db_conn) + self.mock_conn_db = self.patcher_conn_db.start() + self.mock_db_conn.cursor.return_value = self.mock_db_cursor + + # Mock logging + self.patcher_logging = patch('FtpCsvReceiver.logging') + self.mock_logging = self.patcher_logging.start() + + + def tearDown(self): + # Stop the patchers + self.patcher_conn_db.stop() + self.patcher_logging.stop() + # Reset mocks if needed between tests (though setUp does this) + mock_mysql_connector.reset_mock() + + + @patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV')) + @patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV')) + @patch('FtpCsvReceiver.os.stat') + @patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3') + @patch('FtpCsvReceiver.os.remove') + @patch('FtpCsvReceiver.extract_value') # Mock extract_value for focused testing + def test_on_file_received_success(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split): + mock_os_stat.return_value.st_size = 100 # Non-empty file + test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV' + + # Setup mock return values for extract_value + mock_extract.side_effect = ['ID1234', 'G801', 'LOC5678', 'MUX'] + + self.handler.on_file_received(test_file_path) + + # Verify file stats checked + mock_os_stat.assert_called_once_with(test_file_path) + # Verify file opened + mock_file_open.assert_called_once_with(test_file_path, 'r') + # Verify path splitting + mock_split.assert_called_once_with(test_file_path) + mock_splitext.assert_called_once_with('ID1234_data.CSV') + # Verify extract_value calls + expected_extract_calls = [ + call(self.mock_cfg.units_name, 'ID1234_data', ANY), # ANY for the lines string + call(self.mock_cfg.units_type, 'ID1234_data', ANY), + call(self.mock_cfg.tools_name, 'ID1234_data', ANY), + call(self.mock_cfg.tools_type, 'ID1234_data', ANY), + ] + mock_extract.assert_has_calls(expected_extract_calls) + # Verify DB connection + self.mock_conn_db.assert_called_once_with(self.mock_cfg) + # Verify DB insert + expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)" + expected_data = ('ID1234_data', 'ID1234', 'G801', 'LOC5678', 'MUX', 'G801,col2,col3\nval1,val2,val3') + self.mock_db_cursor.execute.assert_called_once_with(expected_sql, expected_data) + self.mock_db_conn.commit.assert_called_once() + self.mock_db_conn.close.assert_called_once() + # Verify file removed + mock_os_remove.assert_called_once_with(test_file_path) + # Verify logging + self.mock_logging.info.assert_called_with(f'File {test_file_path} loaded: removed.') + + @patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'data.WRONGEXT')) + @patch('FtpCsvReceiver.os.path.splitext', return_value=('data', '.WRONGEXT')) + @patch('FtpCsvReceiver.os.stat') + @patch('FtpCsvReceiver.os.remove') + def test_on_file_received_wrong_extension(self, mock_os_remove, mock_os_stat, mock_splitext, mock_split): + mock_os_stat.return_value.st_size = 100 + test_file_path = '/fake/ftp/root/user1/data.WRONGEXT' + + self.handler.on_file_received(test_file_path) + + # Verify only stat, split, and splitext were called + mock_os_stat.assert_called_once_with(test_file_path) + mock_split.assert_called_once_with(test_file_path) + mock_splitext.assert_called_once_with('data.WRONGEXT') + # Verify DB, open, remove were NOT called + self.mock_conn_db.assert_not_called() + mock_os_remove.assert_not_called() + self.mock_logging.info.assert_not_called() # No logging in this path + + @patch('FtpCsvReceiver.os.stat') + @patch('FtpCsvReceiver.os.remove') + def test_on_file_received_empty_file(self, mock_os_remove, mock_os_stat): + mock_os_stat.return_value.st_size = 0 # Empty file + test_file_path = '/fake/ftp/root/user1/empty.CSV' + + self.handler.on_file_received(test_file_path) + + # Verify stat called + mock_os_stat.assert_called_once_with(test_file_path) + # Verify file removed + mock_os_remove.assert_called_once_with(test_file_path) + # Verify logging + self.mock_logging.info.assert_called_with(f'File {test_file_path} was empty: removed.') + # Verify DB not called + self.mock_conn_db.assert_not_called() + + @patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV')) + @patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV')) + @patch('FtpCsvReceiver.os.stat') + @patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3') + @patch('FtpCsvReceiver.os.remove') + @patch('FtpCsvReceiver.extract_value', side_effect=['ID1234', 'G801', 'LOC5678', 'MUX']) + def test_on_file_received_db_error(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split): + mock_os_stat.return_value.st_size = 100 + test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV' + db_error = Exception("DB connection failed") + self.mock_db_cursor.execute.side_effect = db_error # Simulate DB error + + self.handler.on_file_received(test_file_path) + + # Verify DB interaction attempted + self.mock_conn_db.assert_called_once_with(self.mock_cfg) + self.mock_db_cursor.execute.assert_called_once() + # Verify commit/close not called after error + self.mock_db_conn.commit.assert_not_called() + self.mock_db_conn.close.assert_not_called() # Should close be called in finally? Original code doesn't. + # Verify file was NOT removed + mock_os_remove.assert_not_called() + # Verify error logging + self.mock_logging.error.assert_any_call(f'File {test_file_path} not loaded. Held in user path.') + self.mock_logging.error.assert_any_call(f'{db_error}') + + @patch('FtpCsvReceiver.os.remove') + def test_on_incomplete_file_received(self, mock_os_remove): + test_file_path = '/fake/ftp/root/user1/incomplete.part' + self.handler.on_incomplete_file_received(test_file_path) + mock_os_remove.assert_called_once_with(test_file_path) + + @patch('FtpCsvReceiver.Path') + @patch('FtpCsvReceiver.os.path.basename', return_value='newuser') + def test_ftp_SITE_ADDU_success(self, mock_basename, mock_path_constructor): + mock_path_instance = MagicMock() + mock_path_constructor.return_value = mock_path_instance + password = 'newpassword' + expected_hash = sha256(password.encode("UTF-8")).hexdigest() + expected_home = self.mock_cfg.virtpath + 'newuser' + + self.handler.ftp_SITE_ADDU(f'newuser {password}') + + # Verify path creation + mock_path_constructor.assert_called_once_with(expected_home) + mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True) + # Verify authorizer call + self.handler.authorizer.add_user.assert_called_once_with( + 'newuser', expected_hash, expected_home + '/', perm=self.mock_cfg.defperm # Note: Original code adds trailing slash here + ) + # Verify DB interaction + self.mock_conn_db.assert_called_once_with(self.mock_cfg) + expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('newuser', '{expected_hash}', '{expected_home}', '{self.mock_cfg.defperm}')" + self.mock_db_cursor.execute.assert_called_once_with(expected_sql) + self.mock_db_conn.commit.assert_called_once() + self.mock_db_conn.close.assert_called_once() + # Verify response + self.handler.respond.assert_called_once_with('200 SITE ADDU successful.') + # Verify logging + self.mock_logging.info.assert_called_with('User newuser created.') + + def test_ftp_SITE_ADDU_missing_args(self): + self.handler.ftp_SITE_ADDU('newuser') # Missing password + self.handler.respond.assert_called_once_with('501 SITE ADDU failed. Command needs 2 arguments') + self.handler.authorizer.add_user.assert_not_called() + self.mock_conn_db.assert_not_called() + + @patch('FtpCsvReceiver.Path') + @patch('FtpCsvReceiver.os.path.basename', return_value='newuser') + def test_ftp_SITE_ADDU_mkdir_error(self, mock_basename, mock_path_constructor): + mock_path_instance = MagicMock() + mock_path_constructor.return_value = mock_path_instance + error = OSError("Cannot create dir") + mock_path_instance.mkdir.side_effect = error + + self.handler.ftp_SITE_ADDU('newuser newpassword') + + self.handler.respond.assert_called_once_with(f'551 Error in create virtual user path: {error}') + self.handler.authorizer.add_user.assert_not_called() + self.mock_conn_db.assert_not_called() + + @patch('FtpCsvReceiver.Path') + @patch('FtpCsvReceiver.os.path.basename', return_value='newuser') + def test_ftp_SITE_ADDU_db_error(self, mock_basename, mock_path_constructor): + mock_path_instance = MagicMock() + mock_path_constructor.return_value = mock_path_instance + error = Exception("DB insert failed") + self.mock_db_cursor.execute.side_effect = error + + self.handler.ftp_SITE_ADDU('newuser newpassword') + + # Verify mkdir called + mock_path_instance.mkdir.assert_called_once() + # Verify authorizer called (happens before DB) + self.handler.authorizer.add_user.assert_called_once() + # Verify DB interaction attempted + self.mock_conn_db.assert_called_once() + self.mock_db_cursor.execute.assert_called_once() + # Verify response + self.handler.respond.assert_called_once_with(f'501 SITE ADDU failed: {error}.') + + + @patch('FtpCsvReceiver.os.path.basename', return_value='olduser') + def test_ftp_SITE_DELU_success(self, mock_basename): + self.handler.ftp_SITE_DELU('olduser') + + # Verify authorizer call + self.handler.authorizer.remove_user.assert_called_once_with('olduser') + # Verify DB interaction + self.mock_conn_db.assert_called_once_with(self.mock_cfg) + expected_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = now() WHERE ftpuser = 'olduser'" + self.mock_db_cursor.execute.assert_called_once_with(expected_sql) + self.mock_db_conn.commit.assert_called_once() + self.mock_db_conn.close.assert_called_once() + # Verify response + self.handler.respond.assert_called_once_with('200 SITE DELU successful.') + # Verify logging + self.mock_logging.info.assert_called_with('User olduser deleted.') + + @patch('FtpCsvReceiver.os.path.basename', return_value='olduser') + def test_ftp_SITE_DELU_error(self, mock_basename): + error = Exception("DB update failed") + self.mock_db_cursor.execute.side_effect = error + + self.handler.ftp_SITE_DELU('olduser') + + # Verify authorizer call (happens first) + self.handler.authorizer.remove_user.assert_called_once_with('olduser') + # Verify DB interaction attempted + self.mock_conn_db.assert_called_once() + self.mock_db_cursor.execute.assert_called_once() + # Verify response + self.handler.respond.assert_called_once_with('501 SITE DELU failed.') + + @patch('FtpCsvReceiver.Path') + @patch('FtpCsvReceiver.os.path.basename', return_value='restoreme') + def test_ftp_SITE_RESU_success(self, mock_basename, mock_path_constructor): + mock_path_instance = MagicMock() + mock_path_constructor.return_value = mock_path_instance + user_data = ('restoreme', 'somehash', '/fake/ftp/root/restoreme', 'elmw') + self.mock_db_cursor.fetchone.return_value = user_data + + self.handler.ftp_SITE_RESU('restoreme') + + # Verify DB interaction + self.mock_conn_db.assert_called_once_with(self.mock_cfg) + expected_update_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = null WHERE ftpuser = 'restoreme'" + expected_select_sql = f"SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE ftpuser = 'restoreme'" + expected_db_calls = [ + call(expected_update_sql), + call(expected_select_sql) + ] + self.mock_db_cursor.execute.assert_has_calls(expected_db_calls) + self.mock_db_conn.commit.assert_called_once() # For the update + self.mock_db_cursor.fetchone.assert_called_once() + # Verify authorizer call + self.handler.authorizer.add_user.assert_called_once_with(*user_data) + # Verify path creation + mock_path_constructor.assert_called_once_with(self.mock_cfg.virtpath + 'restoreme') + mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True) + # Verify DB close + self.mock_db_conn.close.assert_called_once() + # Verify response + self.handler.respond.assert_called_once_with('200 SITE RESU successful.') + # Verify logging + self.mock_logging.info.assert_called_with('User restoreme restored.') + + @patch('FtpCsvReceiver.os.path.basename', return_value='restoreme') + def test_ftp_SITE_RESU_db_error(self, mock_basename): + error = Exception("DB fetch failed") + # Simulate error on the SELECT statement + self.mock_db_cursor.execute.side_effect = [None, error] # First call (UPDATE) ok, second (SELECT) fails + + self.handler.ftp_SITE_RESU('restoreme') + + # Verify DB interaction attempted + self.mock_conn_db.assert_called_once() + self.assertEqual(self.mock_db_cursor.execute.call_count, 2) # Both UPDATE and SELECT attempted + self.mock_db_conn.commit.assert_called_once() # Commit for UPDATE happened + # Verify response + self.handler.respond.assert_called_once_with('501 SITE RESU failed.') + # Verify authorizer not called, mkdir not called + self.handler.authorizer.add_user.assert_not_called() + + + def test_ftp_SITE_LSTU_success(self): + user_list_data = [ + ('userA', 'elr'), + ('userB', 'elmw'), + ] + self.mock_db_cursor.fetchall.return_value = user_list_data + + self.handler.ftp_SITE_LSTU('') # No argument needed + + # Verify DB interaction + self.mock_conn_db.assert_called_once_with(self.mock_cfg) + expected_sql = f'SELECT ftpuser, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL ' + self.mock_db_cursor.execute.assert_called_once_with(expected_sql) + self.mock_db_cursor.fetchall.assert_called_once() + # Verify push calls + expected_push_calls = [ + call("214-The following virtual users are defined:\r\n"), + call('Username: userA\tPerms: elr\r\nUsername: userB\tPerms: elmw\r\n') + ] + self.handler.push.assert_has_calls(expected_push_calls) + # Verify final response + self.handler.respond.assert_called_once_with("214 LSTU SITE command successful.") + + def test_ftp_SITE_LSTU_db_error(self): + error = Exception("DB select failed") + self.mock_db_cursor.execute.side_effect = error + + self.handler.ftp_SITE_LSTU('') + + # Verify DB interaction attempted + self.mock_conn_db.assert_called_once() + self.mock_db_cursor.execute.assert_called_once() + # Verify response + self.handler.respond.assert_called_once_with(f'501 list users failed: {error}') + # Verify push not called + self.handler.push.assert_not_called() + + +if __name__ == '__main__': + unittest.main(argv=['first-arg-is-ignored'], exit=False) diff --git a/transform_file.py b/transform_file.py index cab7577..e3bbd21 100755 --- a/transform_file.py +++ b/transform_file.py @@ -6,18 +6,15 @@ import os import re from datetime import datetime import json -import psycopg2 -from psycopg2.extras import execute_values +import mysql.connector as mysql import logging -import psycopg2.sql - from utils.time import timestamp_fmt as ts from utils.config import set_config as setting def conn_db(cfg): - return psycopg2.connect(dbname=cfg.dbname, user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport ) + return mysql.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport ) def extract_value(patterns, source, default='Not Defined'): ip = {} @@ -41,21 +38,11 @@ def write_db(records, cfg): ] query = f""" - INSERT INTO {cfg.dbschema}.{cfg.dbdataraw} ( + INSERT IGNORE INTO {cfg.dbname}.{cfg.dbdataraw} ( unit_name, unit_type, tool_name, tool_type, unit_ip, unit_subnet, unit_gateway, event_timestamp, battery_level, temperature, nodes_jsonb ) - VALUES %s - ON CONFLICT ON CONSTRAINT dataraw_unique - DO UPDATE SET - unit_type = EXCLUDED.unit_type, - tool_type = EXCLUDED.tool_type, - unit_ip = EXCLUDED.unit_ip, - unit_subnet = EXCLUDED.unit_subnet, - unit_gateway = EXCLUDED.unit_gateway, - battery_level = EXCLUDED.battery_level, - temperature = EXCLUDED.temperature, - nodes_jsonb = EXCLUDED.nodes_jsonb; + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ try: @@ -63,12 +50,12 @@ def write_db(records, cfg): conn.autocommit = True with conn.cursor() as cur: try: - execute_values(cur, query, insert_values) + cur.executemany(query, insert_values) cur.close() conn.commit() - except psycopg2.Error as e: + except Exception as e: logging.error(f'Records not inserted: {e}') - logging.info(f'Exit') + logging.info('Exit') exit() except Exception as e: logging.error(f'Records not inserted: {e}') @@ -78,9 +65,9 @@ def elab_csv(cfg): try: with conn_db(cfg) as conn: cur = conn.cursor() - cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbschema}.{cfg.dbrectable} where locked = 0 and status = 0') + cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = 0 limit 1') id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() - cur.execute(f'update {cfg.dbschema}.{cfg.dbrectable} set locked = 1 where id = {id}') + cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}') data_list = str(tool_data).strip("('{\"").strip("\"}\',)").split('","') # Estrarre le informazioni degli ip dalla header infos = extract_value(cfg.csv_infos, str(data_list[:9])) diff --git a/utils/datefmt/__init__.py b/utils/datefmt/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/datefmt/date_check.py b/utils/datefmt/date_check.py new file mode 100644 index 0000000..18e03fc --- /dev/null +++ b/utils/datefmt/date_check.py @@ -0,0 +1,24 @@ +from datetime import datetime + +def conforma_data(data_string): + """ + Conforma una stringa di data al formato YYYY-MM-DD, provando diversi formati di input. + + Args: + data_string (str): La stringa di data da conformare. + + Returns: + str: La data conformata nel formato YYYY-MM-DD, + o None se la stringa non può essere interpretata come una data. + """ + formato_desiderato = "%Y-%m-%d" + formati_input = ["%Y/%m/%d", "%Y-%m-%d", "%d-%m-%Y","%d/%m/%Y", ] # Ordine importante: prova prima il più probabile + + for formato_input in formati_input: + try: + data_oggetto = datetime.strptime(data_string, formato_input) + return data_oggetto.strftime(formato_desiderato) + except ValueError: + continue # Prova il formato successivo se quello attuale fallisce + + return None # Se nessun formato ha avuto successo \ No newline at end of file