From a752210a336d8279a1cabf8e15d5853882b89d77 Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 1 May 2025 15:34:55 +0200 Subject: [PATCH] query channels ain din --- CsvLoader.py | 193 ------------------- demonize_ftpd.py | 207 --------------------- ftpcsvreceiver.ini => ftp_csv_receiver.ini | 1 + mqtt_pub.py | 65 ------- prova_no_pd.py | 56 ------ prova_pd.py | 62 ------ run_trans.sh | 4 - transform_file.py | 165 ---------------- utils/config/loader.py | 5 +- utils/database/connection.py | 2 +- utils/database/get_nodes_type.py | 96 ---------- utils/database/nodes_query.py | 37 ++++ utils/timestamp/date_check.py | 2 +- 13 files changed, 43 insertions(+), 852 deletions(-) delete mode 100755 CsvLoader.py delete mode 100644 demonize_ftpd.py rename ftpcsvreceiver.ini => ftp_csv_receiver.ini (97%) delete mode 100755 mqtt_pub.py delete mode 100644 prova_no_pd.py delete mode 100644 prova_pd.py delete mode 100755 run_trans.sh delete mode 100755 transform_file.py delete mode 100644 utils/database/get_nodes_type.py create mode 100644 utils/database/nodes_query.py diff --git a/CsvLoader.py b/CsvLoader.py deleted file mode 100755 index 733ed81..0000000 --- a/CsvLoader.py +++ /dev/null @@ -1,193 +0,0 @@ -#!.venv/bin/python - -import sys -import os -import pika -import logging -import csv -import re -import mysql.connector as mysql -import shutil - -from utils.time import timestamp_fmt as ts -from utils.time import date_refmt as df -from utils.config import loader as setting - - -class sqlraw: - def __init__(self, cfg): - self.config = {"host": cfg.dbhost, "user": cfg.dbuser, "password": cfg.dbpass} - self.dbname = cfg.dbname - self.table = cfg.table - self.sql_head = ( - "INSERT IGNORE INTO " - + self.dbname - + "." - + self.table - + " (`UnitName`,`ToolName`,`eventDT`,`BatteryLevel`,`Temperature`,`NodeNum`," - + "`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`," - + "`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`) VALUES " - ) - - def add_data(self, values): - self.sql = self.sql_head + "(" + "),(".join(values) + ");" - - def write_db(self): - try: - conn = mysql.connect(**self.config, database=self.dbname) - except Exception as err: - logging.error( - f"PID {os.getpid():>5} >> Error to connet to DB {self.dbname} - System error {err}." - ) - sys.exit(1) - - cur = conn.cursor() - try: - cur.execute(self.sql) - except Exception as err: - logging.error( - f"PID {os.getpid():>5} >> Error write into DB {self.dbname} - System error {err}." - ) - print(err) - sys.exit(1) - - finally: - conn.close() - - -def callback_ase(ch, method, properties, body, config): # body รจ di tipo byte - logging.info( - "PID {0:>5} >> Read message {1}".format(os.getpid(), body.decode("utf-8")) - ) - msg = body.decode("utf-8").split(";") - sql = sqlraw(config) - stmlst = [] - commonData = '"{0}","{1}"'.format(msg[1], msg[2]) - tooltype = msg[3] - with open(msg[6], "r") as csvfile: - lines = csvfile.read().splitlines() - for line in lines: - - fields = line.split(";|;") - - if mG501 := re.match( - r"^(\d\d\d\d\/\d\d\/\d\d\s\d\d:\d\d:\d\d);(.+);(.+)$", fields[0] - ): - rowData = ',"{0}",{1},{2}'.format( - mG501.group(1), mG501.group(2), mG501.group(3) - ) - fields.pop(0) - - elif mG201 := re.match( - r"^(\d\d\/\d\d\/\d\d\d\d\s\d\d:\d\d:\d\d)$", fields[0] - ): - mbtG201 = re.match(r"^(.+);(.+)$", fields[1]) - - rowData = ',"{0}",{1},{2}'.format( - df.dateTimeFmt(mG201.group(1)), mbtG201.group(1), mbtG201.group(2) - ) - fields.pop(0) - fields.pop(0) - - else: - continue - - nodeNum = 0 - for field in fields: - nodeNum += 1 - vals = field.split(";") - stmlst.append( - commonData - + rowData - + ",{0},".format(nodeNum) - + ",".join('"{0}"'.format(d) for d in vals) - + "," - + ",".join(["null"] * (config.valueNum - len(vals))) - ) - - if config.maxInsertRow < len(stmlst): - sql.add_data(stmlst) - try: - sql.write_db() - stmlst.clear() - except: - print("errore nell'inserimento") - sys.exit(1) - - if len(stmlst) > 0: - sql.add_data(stmlst) - try: - sql.write_db() - ch.basic_ack(delivery_tag=method.delivery_tag) - except: - print("errore nell'inserimento") - sys.exit(1) - newFilename = msg[6].replace("received", "loaded") - newPath, filenameExt = os.path.split(newFilename) - try: - os.makedirs(newPath) - logging.info("PID {:>5} >> path {} created.".format(os.getpid(), newPath)) - except FileExistsError: - logging.info( - "PID {:>5} >> path {} already exists.".format(os.getpid(), newPath) - ) - try: - shutil.move(msg[6], newFilename) - logging.info( - "PID {:>5} >> {} moved into {}.".format( - os.getpid(), filenameExt, newFilename - ) - ) - except OSError: - logging.error( - "PID {:>5} >> Error to move {} into {}.".format( - os.getpid(), filenameExt, newFilename - ) - ) - - -def main(): - cfg = setting.config() - - logging.basicConfig( - format="%(asctime)s %(message)s", - filename="/var/log/" + cfg.elablog, - level=logging.INFO, - ) - - parameters = pika.URLParameters( - "amqp://" - + cfg.mquser - + ":" - + cfg.mqpass - + "@" - + cfg.mqhost - + ":" - + cfg.mqport - + "/%2F" - ) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.queue_declare(queue=cfg.csv_queue, durable=True) - - channel.basic_qos(prefetch_count=1) - channel.basic_consume( - queue=cfg.csv_queue, - on_message_callback=lambda ch, method, properties, body: callback_ase( - ch, method, properties, body, config=cfg - ), - ) - - # channel.basic_consume(queue=cfg.csv_queue, on_message_callback=callback,arguments=cfg) - try: - channel.start_consuming() - except KeyboardInterrupt: - logging.info( - "PID {0:>5} >> Info: {1}.".format( - os.getpid(), "Shutdown requested...exiting" - ) - ) - - -if __name__ == "__main__": - main() diff --git a/demonize_ftpd.py b/demonize_ftpd.py deleted file mode 100644 index a972169..0000000 --- a/demonize_ftpd.py +++ /dev/null @@ -1,207 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (C) 2007 Giampaolo Rodola' . -# Use of this source code is governed by MIT license that can be -# found in the LICENSE file. - -"""A basic unix daemon using the python-daemon library: -http://pypi.python.org/pypi/python-daemon - -Example usages: - - $ python unix_daemon.py start - $ python unix_daemon.py stop - $ python unix_daemon.py status - $ python unix_daemon.py # foreground (no daemon) - $ python unix_daemon.py --logfile /var/log/ftpd.log start - $ python unix_daemon.py --pidfile /var/run/ftpd.pid start - -This is just a proof of concept which demonstrates how to daemonize -the FTP server. -You might want to use this as an example and provide the necessary -customizations. - -Parts you might want to customize are: - - UMASK, WORKDIR, HOST, PORT constants - - get_server() function (to define users and customize FTP handler) - -Authors: - - Ben Timby - btimby gmail.com - - Giampaolo Rodola' - g.rodola gmail.com - -""" - -import atexit -import errno -import optparse -import os -import signal -import sys -import time - -from pyftpdlib.authorizers import UnixAuthorizer -from pyftpdlib.filesystems import UnixFilesystem -from pyftpdlib.handlers import FTPHandler -from pyftpdlib.servers import FTPServer - - -# overridable options -HOST = "" -PORT = 21 -PID_FILE = "/var/run/pyftpdlib.pid" -LOG_FILE = "/var/log/pyftpdlib.log" -WORKDIR = os.getcwd() -UMASK = 0 - - -def pid_exists(pid): - """Return True if a process with the given PID is currently running.""" - try: - os.kill(pid, 0) - except OSError as err: - return err.errno == errno.EPERM - else: - return True - - -def get_pid(): - """Return the PID saved in the pid file if possible, else None.""" - try: - with open(PID_FILE) as f: - return int(f.read().strip()) - except IOError as err: - if err.errno != errno.ENOENT: - raise - - -def stop(): - """Keep attempting to stop the daemon for 5 seconds, first using - SIGTERM, then using SIGKILL. - """ - pid = get_pid() - if not pid or not pid_exists(pid): - sys.exit("daemon not running") - sig = signal.SIGTERM - i = 0 - while True: - sys.stdout.write('.') - sys.stdout.flush() - try: - os.kill(pid, sig) - except OSError as err: - if err.errno == errno.ESRCH: - print("\nstopped (pid %s)" % pid) - return - else: - raise - i += 1 - if i == 25: - sig = signal.SIGKILL - elif i == 50: - sys.exit("\ncould not kill daemon (pid %s)" % pid) - time.sleep(0.1) - - -def status(): - """Print daemon status and exit.""" - pid = get_pid() - if not pid or not pid_exists(pid): - print("daemon not running") - else: - print("daemon running with pid %s" % pid) - sys.exit(0) - - -def get_server(): - """Return a pre-configured FTP server instance.""" - handler = FTPHandler - handler.authorizer = UnixAuthorizer() - handler.abstracted_fs = UnixFilesystem - server = FTPServer((HOST, PORT), handler) - return server - - -def daemonize(): - """A wrapper around python-daemonize context manager.""" - def _daemonize(): - pid = os.fork() - if pid > 0: - # exit first parent - sys.exit(0) - - # decouple from parent environment - os.chdir(WORKDIR) - os.setsid() - os.umask(0) - - # do second fork - pid = os.fork() - if pid > 0: - # exit from second parent - sys.exit(0) - - # redirect standard file descriptors - sys.stdout.flush() - sys.stderr.flush() - si = open(LOG_FILE, 'r') - so = open(LOG_FILE, 'a+') - se = open(LOG_FILE, 'a+', 0) - os.dup2(si.fileno(), sys.stdin.fileno()) - os.dup2(so.fileno(), sys.stdout.fileno()) - os.dup2(se.fileno(), sys.stderr.fileno()) - - # write pidfile - pid = str(os.getpid()) - with open(PID_FILE, 'w') as f: - f.write("%s\n" % pid) - atexit.register(lambda: os.remove(PID_FILE)) - - pid = get_pid() - if pid and pid_exists(pid): - sys.exit('daemon already running (pid %s)' % pid) - # instance FTPd before daemonizing, so that in case of problems we - # get an exception here and exit immediately - server = get_server() - _daemonize() - server.serve_forever() - - -def main(): - global PID_FILE, LOG_FILE - USAGE = "python [-p PIDFILE] [-l LOGFILE]\n\n" \ - "Commands:\n - start\n - stop\n - status" - parser = optparse.OptionParser(usage=USAGE) - parser.add_option('-l', '--logfile', dest='logfile', - help='the log file location') - parser.add_option('-p', '--pidfile', dest='pidfile', default=PID_FILE, - help='file to store/retreive daemon pid') - options, args = parser.parse_args() - - if options.pidfile: - PID_FILE = options.pidfile - if options.logfile: - LOG_FILE = options.logfile - - if not args: - server = get_server() - server.serve_forever() - else: - if len(args) != 1: - sys.exit('too many commands') - elif args[0] == 'start': - daemonize() - elif args[0] == 'stop': - stop() - elif args[0] == 'restart': - try: - stop() - finally: - daemonize() - elif args[0] == 'status': - status() - else: - sys.exit('invalid command') - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/ftpcsvreceiver.ini b/ftp_csv_receiver.ini similarity index 97% rename from ftpcsvreceiver.ini rename to ftp_csv_receiver.ini index 392cb7b..eb93652 100644 --- a/ftpcsvreceiver.ini +++ b/ftp_csv_receiver.ini @@ -31,6 +31,7 @@ userTableName = virtusers recTableName = received rawTableName = RAWDATACOR + nodesTableName = nodes [unit] Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1 diff --git a/mqtt_pub.py b/mqtt_pub.py deleted file mode 100755 index f1a7d8e..0000000 --- a/mqtt_pub.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python3 -import paho.mqtt.client as mqtt -import time -import ssl - -version = '5' # or '3' -mytransport = 'tcp' # or 'websockets' - -if version == '5': - mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, - client_id="myPy", - transport=mytransport, - protocol=mqtt.MQTTv5) -if version == '3': - mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, - client_id="myPy", - transport=mytransport, - protocol=mqtt.MQTTv311, - clean_session=True) - -mqttc.username_pw_set("alex", "BatManu#171017") - -'''client.tls_set(certfile=None, - keyfile=None, - cert_reqs=ssl.CERT_REQUIRED)''' - -def on_message(client, obj, message, properties=None): - print(" Received message " + str(message.payload) - + " on topic '" + message.topic - + "' with QoS " + str(message.qos)) -''' -def on_connect(client, obj, flags, reason_code, properties): - print("reason_code: " + str(reason_code)) - -def on_publish(client, obj, mid, reason_code, properties): - print("mid: " + str(mid)) - -def on_log(client, obj, level, string): - print(string) -''' - -mqttc.on_message = on_message; -''' -client.on_connect = mycallbacks.on_connect; -client.on_publish = mycallbacks.on_publish; -client.on_subscribe = mycallbacks.on_subscribe; -''' - -broker = 'mqtt' -myport = 1883 -if version == '5': - from paho.mqtt.properties import Properties - from paho.mqtt.packettypes import PacketTypes - properties=Properties(PacketTypes.CONNECT) - properties.SessionExpiryInterval=30*60 # in seconds - mqttc.connect(broker, - port=myport, - clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, - properties=properties, - keepalive=60); - -elif version == '3': - mqttc.connect(broker,port=myport,keepalive=60); - -mqttc.loop_start(); \ No newline at end of file diff --git a/prova_no_pd.py b/prova_no_pd.py deleted file mode 100644 index 2870afc..0000000 --- a/prova_no_pd.py +++ /dev/null @@ -1,56 +0,0 @@ - -import mysql.connector -import utils.timestamp.date_check as date_check - - -righe = [ - "2019/05/29 17:31:17;14.0;15.6;|;-4036;2706;-12885;-181;455;-289;-1302;|;-1074;567;-16756;-110;419;148;-1359;|;-973;718;-17951;-269;458;-187;-1481;|;-2594;517;-12813;314;407;-132;-1346;|;-1908;660;-16466;119;408;140;-1412;|;-2;634;-17249;273;418;-337;-1529;|;-44;689;-17622;-49;418;-461;-924;|;-2088;947;-16491;355;377;-101;-1634;|;-679;1280;-17315;131;405;-427;-1552;|;-1348;143;-16968;-42;400;158;-1275;|;-1229;337;-15634;312;388;-229;-1208;|;-303;1213;-17223;-231;398;-312;-1323;|;-1764;-110;-12365;-123;412;153;-1045;|;-1326;1149;-17401;-112;405;150;-937;|;-918;1595;-18541;250;417;26;-1160;|;-37;-952;-17323;-141;393;115;-1368;|;-322;705;-15059;274;389;-181;-1351;|;-139;764;-15408;-296;377;-315;-1044;|;-2243;-127;-11819;127;371;130;-1573;|;-910;81;-13546;-67;388;-442;-970;|;-87;-130;-14869;124;383;128;-1307;|;-1263;1976;-11610;-26;392;136;-1336;|;-1895;1092;-15215;-248;366;-287;-1123;|;-473;1183;-14193;-256;337;-247;-1174;|;520;493;-18394;-110;361;-430;-1584;|;-608;639;-14391;-168;361;71;-1354;|;6;263;-14750;-291;369;-102;-1163;|;-658;-208;-14123;-212;401;-330;-594;|;-1153;1045;-17704;-186;395;-38;-1465;|;-107;-4;-16871;-146;352;106;-1387;|;-639;1641;-15675;-57;355;113;-1216;|;-770;300;-14141;276;345;-69;-1632;|;-1476;1326;-16517;160;314;-469;-1291;|;-632;1362;-14201;242;348;19;-1118;|;-94;85;-14397;47;334;-473;-1292;|;853;-339;-17249;161;374;-423;-1341;|;-999;1107;-15241;339;338;-40;-1054;|;-1677;1531;-14531;-125;379;87;-1337;|;-783;955;-15564;-295;345;-178;-1215;|;1700;-768;-16200;64;365;-441;-1276;|;-1238;-442;-15508;-171;363;-294;-1102;|;-2471;1640;-15797;141;361;71;-1056;|;-1135;1410;-15452;-102;338;86;-1039;|;-2294;895;-15409;-218;352;-366;-1237;|;-1235;2429;-16575;-80;373;61;-1132;|;-1304;467;-16855;226;343;-425;-1008;|;-2340;-154;-16390;-57;327;119;-1033;|;-2288;2180;-14009;292;356;-10;-1053;|;-1106;36;-15806;-257;351;-81;-1206;|;-1778;610;-14479;157;354;116;-1126;|;-2142;1735;-14860;143;366;92;-835;|;-385;-117;-15141;-189;338;67;-1499;|;-434;950;-14577;214;359;66;-942;|;40;255;-18690;345;330;-250;-1466;|;-1235;1558;-15598;233;351;63;-1160;|;-1162;732;-14129;-176;338;52;-1424;|;-270;1318;-16862;316;324;-375;-1334;|;35;-1313;-19446;190;324;112;-1153;|;109;502;-15526;310;354;-334;-1128;|;-566;-1187;-15934;-192;349;-374;-1325;|;397;665;-15063;264;354;-99;-1501;|;-1126;2257;-15976;-266;342;-129;-1441;|;-1576;1197;-13151;324;359;-378;-1235;|;-2713;844;-15693;183;366;118;-1286;|;-2120;307;-14872;-8;361;-533;-1287;|;-2236;582;-16406;386;320;-246;-1450;|;-2549;904;-13694;368;325;-115;-1331;|;-1633;1657;-13434;375;317;-370;-1843;|;-2230;-350;-11589;66;-3;-522;-1047;|;1229;1639;-13878;106;372;-535;-1347;|;9231;1087;|;9811;1091", - "2019/05/29 17:32:58;13.6;15.7;|;-4035;2693;-12877;-181;454;-289;-1269;|;-1098;571;-16785;-110;419;149;-1332;|;-982;727;-17893;-271;457;-187;-1470;|;-2581;497;-12843;314;407;-133;-1320;|;-1913;665;-16464;115;407;140;-1386;|;2;628;-17245;274;419;-337;-1516;|;-32;672;-17624;-45;419;-460;-921;|;-2091;943;-16468;354;377;-100;-1628;|;-677;1280;-17323;133;404;-427;-1544;|;-1366;150;-16978;-43;398;158;-1500;|;-1226;329;-15636;313;386;-228;-1206;|;-311;1202;-17263;-230;402;-312;-1322;|;-1750;-107;-12353;-121;410;153;-1279;|;-1338;1147;-17418;-111;402;148;-1207;|;-939;1593;-18537;250;418;31;-1154;|;-49;-941;-17323;-141;393;114;-1343;|;-330;717;-15037;273;388;-181;-1348;|;-142;785;-15453;-296;375;-313;-1048;|;-2266;-130;-11838;127;370;129;-1558;|;-895;77;-13571;-66;389;-442;-965;|;-88;-119;-14865;125;384;129;-1284;|;-1259;1987;-11617;-25;392;136;-1336;|;-1901;1093;-15210;-248;367;-287;-1106;|;-454;1212;-14192;-269;355;-260;-1169;|;523;495;-18386;-110;362;-433;-1591;|;-632;638;-14401;-169;361;73;-1356;|;3;256;-14749;-292;370;-101;-1145;|;-659;-187;-14134;-213;401;-336;-573;|;-1157;1037;-17694;-187;394;-35;-1454;|;-111;2;-16855;-150;352;98;-1375;|;-651;1641;-15722;-60;356;120;-1198;|;-791;309;-14141;272;346;-67;-1620;|;-1462;1333;-16481;160;314;-470;-1287;|;-628;1371;-14212;244;350;23;-1107;|;-81;78;-14387;49;334;-475;-1273;|;836;-339;-17269;160;373;-429;-1343;|;-988;1110;-15227;337;339;-40;-1035;|;-1681;1533;-14534;-120;378;87;-1343;|;-813;952;-15565;-296;345;-177;-1225;|;1699;-785;-16184;69;366;-450;-1267;|;-1229;-435;-15505;-176;364;-294;-1099;|;-2471;1652;-15805;141;360;67;-1034;|;-1158;1406;-15451;-106;340;86;-1018;|;-2306;898;-15427;-220;351;-363;-1223;|;-1231;2446;-16584;-78;373;58;-1123;|;-1306;460;-16889;223;344;-433;-1007;|;-2337;-149;-16379;-55;327;120;-1026;|;-2292;2187;-14002;295;356;-10;-1046;|;-1099;57;-15790;-256;349;-76;-1222;|;-1777;629;-14490;157;354;115;-1115;|;-2145;1727;-14861;143;365;93;-843;|;-384;-118;-15147;-186;337;63;-1484;|;-461;940;-14639;214;359;66;-945;|;59;254;-18691;344;332;-249;-1217;|;-1240;1558;-15599;233;351;65;-1147;|;-1157;720;-14114;-177;337;53;-1396;|;-238;1310;-16855;313;323;-374;-1318;|;35;-1322;-19451;194;324;113;-1155;|;114;484;-15521;308;355;-338;-1115;|;-564;-1189;-15915;-188;349;-377;-1344;|;390;653;-15085;268;356;-97;-1513;|;-1127;2266;-15988;-266;342;-127;-1447;|;-1552;1195;-13171;319;359;-369;-1237;|;-2691;868;-15712;181;367;116;-1289;|;-2115;302;-14855;-19;362;-538;-1297;|;-2249;561;-16438;395;319;-236;-1461;|;-2536;915;-13684;364;323;-116;-1321;|;-1647;1659;-13423;373;318;-365;-1838;|;-2244;-351;-11568;66;-3;-526;-1017;|;1238;1628;-13803;93;375;-543;-1346;|;9211;1099;|;9702;1083", - "2019/05/29 18:20:27;13.6;19.9;|;-4026;2699;-12885;-180;455;-289;-1310;|;-1076;572;-16748;-109;418;146;-1358;|;-974;731;-17918;-270;458;-186;-1490;|;-2594;529;-12819;313;408;-132;-1333;|;-1907;662;-16461;118;407;140;-1409;|;6;619;-17243;272;419;-337;-1527;|;-63;680;-17597;-46;418;-460;-932;|;-2098;953;-16490;355;377;-101;-1637;|;-671;1283;-17317;133;405;-426;-1538;|;-1359;136;-16982;-43;400;159;-1282;|;-1228;318;-15602;315;389;-230;-1195;|;-316;1201;-17236;-198;341;-266;-1319;|;-1779;-120;-12374;-122;412;154;-1040;|;-1330;1134;-17429;-111;402;147;-1205;|;-919;1594;-18535;253;415;28;-1173;|;-45;-946;-17352;-141;393;115;-1362;|;-348;712;-15046;275;388;-181;-1358;|;-127;773;-15429;-294;376;-314;-1067;|;-2272;-117;-11825;126;371;128;-1576;|;-903;89;-13539;-66;389;-443;-960;|;-76;-144;-14870;125;383;128;-1287;|;-1255;1986;-11621;-27;392;136;-1350;|;-1893;1091;-15204;-241;365;-287;-1115;|;-469;1193;-14206;-274;356;-257;-1166;|;530;504;-18410;-109;362;-432;-1580;|;-627;640;-14386;-170;361;71;-1342;|;12;254;-14769;-293;370;-102;-1158;|;-660;-187;-14132;-212;402;-335;-593;|;-1149;1042;-17689;-186;393;-31;-1478;|;-94;-4;-16854;-142;351;101;-1383;|;-657;1650;-15715;-55;354;114;-1217;|;-779;307;-14141;273;345;-65;-1623;|;-1471;1328;-16526;162;314;-465;-1291;|;-638;1377;-14170;246;348;21;-1120;|;-82;81;-14392;46;335;-465;-1286;|;826;-324;-17271;157;373;-428;-1339;|;-996;1099;-15238;330;339;-48;-1046;|;-1664;1527;-14520;-122;378;85;-1345;|;-779;964;-15604;-296;345;-181;-1210;|;1698;-788;-16201;67;364;-447;-1247;|;-1225;-439;-15485;-178;365;-293;-1092;|;-2467;1650;-15773;140;360;73;-1047;|;-1118;1417;-15455;-107;338;86;-1023;|;-2311;904;-15445;-221;353;-361;-1234;|;-1243;2417;-16569;-79;372;60;-1109;|;-1324;471;-16891;220;343;-427;-1001;|;-2329;-169;-16371;-54;326;121;-1034;|;-2280;2190;-14002;296;355;-9;-1052;|;-1125;36;-15812;-254;351;-75;-1216;|;-1776;619;-14482;154;354;124;-1127;|;-2166;1748;-14870;144;365;95;-837;|;-392;-120;-15145;-187;338;70;-1494;|;-437;946;-14585;212;358;67;-947;|;42;247;-18665;347;332;-250;-1232;|;-1242;1560;-15596;231;352;66;-1148;|;-1156;727;-14117;-176;337;49;-1389;|;-266;1322;-16856;314;323;-374;-1318;|;40;-1337;-19450;192;325;106;-1164;|;126;504;-15528;312;354;-341;-1118;|;-567;-1204;-15918;-195;349;-377;-1321;|;399;644;-15075;272;355;-91;-1503;|;-1136;2263;-15948;-267;343;-124;-1445;|;-1556;1192;-13222;325;358;-370;-1241;|;-2707;860;-15665;179;367;103;-1298;|;-2114;319;-14877;-13;350;-521;-1270;|;-2243;560;-16368;392;316;-236;-1454;|;-2552;905;-13683;367;322;-112;-1321;|;-1654;1653;-13427;375;317;-369;-1858;|;-2268;-350;-11573;64;-3;-521;-1028;|;1239;1624;-13860;94;375;-543;-1332;|;9527;1079;|;9966;1070", - "2019/05/29 18:22:45;13.2;20.1;|;-4042;2692;-12915;-181;455;-289;-1291;|;-1087;562;-16766;-113;419;148;-1357;|;-974;730;-17893;-269;458;-187;-1468;|;-2597;510;-12822;313;408;-132;-1339;|;-1915;687;-16472;118;408;142;-1409;|;1;625;-17275;275;417;-337;-1517;|;-34;689;-17613;-44;418;-460;-929;|;-2114;958;-16451;355;377;-101;-1624;|;-670;1269;-17280;132;406;-426;-1537;|;-1365;152;-16972;-42;400;159;-1269;|;-1239;316;-15620;312;388;-227;-1198;|;-310;1204;-17245;-205;357;-275;-1321;|;-1766;-102;-12379;-123;411;154;-1039;|;-1355;1136;-17429;-110;402;148;-1194;|;-935;1591;-18524;250;416;28;-1144;|;-34;-940;-17350;-141;393;115;-1357;|;-337;709;-15038;274;387;-181;-1361;|;-132;767;-15408;-295;377;-314;-1046;|;-2270;-134;-11831;127;369;130;-1561;|;-899;96;-13575;-66;389;-443;-965;|;-80;-148;-14886;126;384;128;-1280;|;-1272;1996;-11625;-27;392;135;-1330;|;-1902;1104;-15252;-247;366;-287;-1103;|;-453;1200;-14215;-270;355;-257;-1161;|;510;489;-18401;-111;362;-433;-1583;|;-617;626;-14398;-169;361;71;-1339;|;8;264;-14744;-291;369;-101;-1148;|;-664;-202;-14128;-217;401;-336;-585;|;-1145;1030;-17697;-184;395;-38;-1464;|;-107;-12;-16862;-143;353;106;-1375;|;-664;1635;-15708;-58;356;110;-1202;|;-751;299;-14121;279;344;-65;-1616;|;-1462;1337;-16507;160;315;-468;-1302;|;-635;1372;-14208;245;350;18;-1122;|;-85;78;-14394;45;335;-469;-1283;|;834;-341;-17264;159;374;-431;-1353;|;-982;1119;-15236;332;339;-37;-1046;|;-1686;1536;-14537;-120;378;88;-1343;|;-780;958;-15548;-297;344;-178;-1193;|;1704;-795;-16187;67;365;-453;-1258;|;-1235;-436;-15499;-178;364;-290;-1092;|;-2482;1637;-15811;143;358;72;-1295;|;-1151;1414;-15433;-103;339;85;-1018;|;-2333;885;-15407;-216;353;-370;-1221;|;-1262;2438;-16611;-79;371;65;-1328;|;-1315;456;-16897;219;343;-436;-994;|;-2335;-153;-16369;-56;327;119;-1034;|;-2278;2176;-14001;294;356;-10;-1042;|;-1106;46;-15798;-251;351;-78;-1214;|;-1798;610;-14491;153;357;114;-1135;|;-2159;1730;-14888;144;365;95;-865;|;-400;-133;-15134;-184;338;72;-1497;|;-452;932;-14596;212;358;68;-937;|;45;238;-18676;349;332;-250;-1230;|;-1233;1549;-15601;233;349;65;-1380;|;-1150;732;-14105;-175;338;48;-1405;|;-257;1328;-16848;315;323;-375;-1330;|;44;-1317;-19442;194;325;116;-1158;|;113;512;-15534;307;354;-331;-1132;|;-551;-1199;-15927;-195;349;-373;-1341;|;417;647;-15088;271;354;-92;-1503;|;-1135;2259;-15976;-267;341;-124;-1459;|;-1552;1182;-13164;324;358;-370;-1246;|;-2712;851;-15670;182;366;120;-1287;|;-2116;310;-14859;-14;362;-542;-1277;|;-2225;556;-16375;392;316;-242;-1438;|;-2539;894;-13692;364;323;-108;-1335;|;-1633;1675;-13414;373;318;-366;-1841;|;-2233;-343;-11582;66;-2;-515;-1051;|;1231;1634;-13817;97;374;-538;-1324;|;9506;1070;|;9734;1010" -] - -#Dividi la riga principale usando il primo delimitatore ';' - -sql_insert_RAWDATA = ''' -INSERT IGNORE INTO ase_lar.RAWDATACOR ( - `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, - `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, - `Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`, - `BatLevelModule`,`TemperatureModule`, `RssiModule` -) -VALUES ( - %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s -) -''' - -def make_matrix(righe): - UnitName = 'ID0003' - ToolNameID = 'DT0002' - matrice_valori = [] - for riga in righe: - timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) - EventDate, EventTime = timestamp.split(' ') - valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' - for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): - valori = valori_nodo.split(';') - matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) - return matrice_valori - -matrice_valori = make_matrix(righe) - -with mysql.connector.connect(user='root', password='batt1l0', host='10.211.114.173', port=3306) as conn: - cur = conn.cursor() - try: - cur.executemany(sql_insert_RAWDATA, matrice_valori) - conn.commit() - except Exception as e: - conn.rollback() - print(f'Error: {e}') - - - - diff --git a/prova_pd.py b/prova_pd.py deleted file mode 100644 index a81c229..0000000 --- a/prova_pd.py +++ /dev/null @@ -1,62 +0,0 @@ - -#import mysql.connector -from sqlalchemy import create_engine, MetaData, Table -from sqlalchemy.orm import declarative_base, Session -from sqlalchemy.exc import IntegrityError -import pandas as pd -import utils.timestamp.date_check as date_check - -righe = ["17/03/2022 15:10;13.7;14.8;|;401;832;17373;-8;920;469;9;|;839;133;17116;675;941;228;10;|;-302;-1252;17165;288;75;-940;10;|;739;76;17203;562;879;604;9;|;1460;751;16895;672;1462;132;10;|;-1088;-1883;16675;244;1071;518;10;|;-29;-1683;16923;384;1039;505;11;|;1309;-1095;17066;-36;324;-552;10;|;-36;-713;16701;-121;372;122;10;|;508;-1318;16833;475;1154;405;10;|;1178;878;17067;636;1114;428;10;|;1613;-573;17243;291;-234;-473;9;|;-107;-259;17287;94;421;369;10;|;-900;-647;16513;168;1330;252;10;|;1372;286;17035;202;263;469;10;|;238;-2006;17142;573;1201;492;9;|;2458;589;17695;356;187;208;11;|;827;-1085;17644;308;233;66;10;|;1;-1373;17214;557;1279;298;9;|;-281;-244;17071;209;517;-36;10;|;-486;-961;17075;467;440;367;10;|;1264;-339;16918;374;476;116;8;|;661;-1330;16789;-37;478;15;9;|;1208;-724;16790;558;1303;335;8;|;-236;-1404;16678;309;426;376;8;|;367;-1402;17308;-32;428;-957;7;|;-849;-360;17640;1;371;635;7;|;-784;90;17924;533;128;-661;5;|;-723;-1062;16413;270;-79;702;7;|;458;-1235;16925;354;-117;194;5;|;-411;-1116;17403;280;777;530;1", -"19/03/2022 15:13;13.6;14.8;|;398;836;17368;-3;924;472;9;|;838;125;17110;675;938;230;10;|;-298;-1253;17164;290;75;-942;10;|;749;78;17221;560;883;601;9;|;1463;752;16904;673;1467;134;10;|;-1085;-1884;16655;239;1067;520;10;|;-27;-1680;16923;393;1032;507;10;|;1308;-1095;17065;-43;328;-548;10;|;-38;-712;16704;-124;373;122;10;|;512;-1318;16830;473;1155;408;10;|;1181;879;17070;637;1113;436;10;|;1610;-567;17239;287;-240;-462;10;|;-108;-250;17297;94;420;370;10;|;-903;-652;16518;169;1326;257;9;|;1371;282;17047;198;263;471;10;|;244;-2006;17137;570;1205;487;9;|;2461;589;17689;354;199;210;11;|;823;-1081;17642;310;235;68;10;|;1;-1370;17214;560;1278;290;9;|;-280;-245;17062;209;517;-31;9;|;-484;-963;17074;463;440;374;10;|;1271;-340;16912;374;477;125;8;|;668;-1331;16786;-37;478;7;9;|;1209;-724;16784;557;1301;329;8;|;-237;-1406;16673;316;425;371;8;|;371;-1401;17307;-30;429;-961;7;|;-854;-356;17647;7;368;631;7;|;-781;85;17934;531;130;-664;5;|;-726;-1062;16400;274;-79;707;6;|;460;-1233;16931;355;-113;196;5;|;-413;-1119;17405;280;780;525;1", -"19/03/2022 15:28;13.6;14.3;|;396;832;17379;-3;919;470;10;|;837;128;17114;670;945;233;10;|;-304;-1246;17167;292;77;-931;10;|;744;70;17211;567;888;601;9;|;1459;748;16893;672;1480;141;10;|;-1084;-1887;16658;236;1068;522;10;|;-29;-1686;16912;388;1035;500;10;|;1312;-1092;17062;-35;328;-545;10;|;-40;-709;16701;-120;374;121;10;|;515;-1327;16826;475;1148;402;10;|;1179;881;17063;635;1114;430;9;|;1613;-568;17246;293;-230;-461;9;|;-103;-265;17289;96;420;363;10;|;-896;-656;16522;167;1320;250;10;|;1368;288;17039;195;263;471;9;|;239;-2003;17129;578;1203;490;9;|;2461;586;17699;356;202;209;11;|;823;-1092;17649;310;237;65;10;|;-7;-1369;17215;550;1279;288;9;|;-290;-249;17072;208;515;-33;9;|;-488;-965;17071;472;439;372;10;|;1270;-342;16923;377;476;120;8;|;671;-1337;16788;-33;482;14;9;|;1206;-725;16783;556;1306;344;9;|;-232;-1404;16681;309;423;379;8;|;364;-1400;17305;-28;432;-952;7;|;-854;-363;17644;1;369;626;8;|;-782;89;17931;529;134;-661;5;|;-723;-1057;16407;269;-82;700;6;|;459;-1235;16929;358;-119;193;5;|;-414;-1122;17400;282;775;526;2"] - -''' -righe = ["17/03/2022 15:10;13.7;14.8;|;401;832;17373;-8;920;469;9;|;839;133;17116;675;941;228;10;|;-302;-1252;17165;288;75;-940;10;|;739;76;17203;562;879;604;9;|;1460;751;16895;672;1462;132;10;|;-1088;-1883;16675;244;1071;518;10;|;-29;-1683;16923;384;1039;505;11;|;1309;-1095;17066;-36;324;-552;10;|;-36;-713;16701;-121;372;122;10;|;508;-1318;16833;475;1154;405;10;|;1178;878;17067;636;1114;428;10;|;1613;-573;17243;291;-234;-473;9;|;-107;-259;17287;94;421;369;10;|;-900;-647;16513;168;1330;252;10;|;1372;286;17035;202;263;469;10;|;238;-2006;17142;573;1201;492;9;|;2458;589;17695;356;187;208;11;|;827;-1085;17644;308;233;66;10;|;1;-1373;17214;557;1279;298;9;|;-281;-244;17071;209;517;-36;10;|;-486;-961;17075;467;440;367;10;|;1264;-339;16918;374;476;116;8;|;661;-1330;16789;-37;478;15;9;|;1208;-724;16790;558;1303;335;8;|;-236;-1404;16678;309;426;376;8;|;367;-1402;17308;-32;428;-957;7;|;-849;-360;17640;1;371;635;7;|;-784;90;17924;533;128;-661;5;|;-723;-1062;16413;270;-79;702;7;|;458;-1235;16925;354;-117;194;5;|;-411;-1116;17403;280;777;530;1", -"17/03/2022 15:13;13.6;14.8;|;398;836;17368;-3;924;472;9;|;838;125;17110;675;938;230;10;|;-298;-1253;17164;290;75;-942;10;|;749;78;17221;560;883;601;9;|;1463;752;16904;673;1467;134;10;|;-1085;-1884;16655;239;1067;520;10;|;-27;-1680;16923;393;1032;507;10;|;1308;-1095;17065;-43;328;-548;10;|;-38;-712;16704;-124;373;122;10;|;512;-1318;16830;473;1155;408;10;|;1181;879;17070;637;1113;436;10;|;1610;-567;17239;287;-240;-462;10;|;-108;-250;17297;94;420;370;10;|;-903;-652;16518;169;1326;257;9;|;1371;282;17047;198;263;471;10;|;244;-2006;17137;570;1205;487;9;|;2461;589;17689;354;199;210;11;|;823;-1081;17642;310;235;68;10;|;1;-1370;17214;560;1278;290;9;|;-280;-245;17062;209;517;-31;9;|;-484;-963;17074;463;440;374;10;|;1271;-340;16912;374;477;125;8;|;668;-1331;16786;-37;478;7;9;|;1209;-724;16784;557;1301;329;8;|;-237;-1406;16673;316;425;371;8;|;371;-1401;17307;-30;429;-961;7;|;-854;-356;17647;7;368;631;7;|;-781;85;17934;531;130;-664;5;|;-726;-1062;16400;274;-79;707;6;|;460;-1233;16931;355;-113;196;5;|;-413;-1119;17405;280;780;525;1", -"17/03/2022 15:28;13.6;14.3;|;396;832;17379;-3;919;470;10;|;837;128;17114;670;945;233;10;|;-304;-1246;17167;292;77;-931;10;|;744;70;17211;567;888;601;9;|;1459;748;16893;672;1480;141;10;|;-1084;-1887;16658;236;1068;522;10;|;-29;-1686;16912;388;1035;500;10;|;1312;-1092;17062;-35;328;-545;10;|;-40;-709;16701;-120;374;121;10;|;515;-1327;16826;475;1148;402;10;|;1179;881;17063;635;1114;430;9;|;1613;-568;17246;293;-230;-461;9;|;-103;-265;17289;96;420;363;10;|;-896;-656;16522;167;1320;250;10;|;1368;288;17039;195;263;471;9;|;239;-2003;17129;578;1203;490;9;|;2461;586;17699;356;202;209;11;|;823;-1092;17649;310;237;65;10;|;-7;-1369;17215;550;1279;288;9;|;-290;-249;17072;208;515;-33;9;|;-488;-965;17071;472;439;372;10;|;1270;-342;16923;377;476;120;8;|;671;-1337;16788;-33;482;14;9;|;1206;-725;16783;556;1306;344;9;|;-232;-1404;16681;309;423;379;8;|;364;-1400;17305;-28;432;-952;7;|;-854;-363;17644;1;369;626;8;|;-782;89;17931;529;134;-661;5;|;-723;-1057;16407;269;-82;700;6;|;459;-1235;16929;358;-119;193;5;|;-414;-1122;17400;282;775;526;2"] -''' -#Dividi la riga principale usando il primo delimitatore ';' -UnitName = '' -ToolNameID = '' -matrice_valori = [] -for riga in righe: - timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) - EventDate, EventTime = timestamp.split(' ') - valori_nodi = rilevazioni.split('|')[1:-1] # Dividi per '|' e prendi gli elementi interni togliendo primo e ultimo - for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): - valori = valori_nodo.split(';')[1:-1] - matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (16 - len(valori)))) - -# Crea un DataFrame pandas per visualizzare la matrice in forma tabellare -colonne = ['UnitName', 'ToolNameID', 'NodeNum', 'EventDate', 'EventTime', 'BatLevel', 'Temperature', 'Val0', 'Val1', 'Val2', 'Val3', 'Val4', 'Val5', 'Val6', 'Val7', 'Val8', 'Val9', 'ValA', 'ValB', 'ValC', 'ValD', 'ValE', 'ValF'] -df = pd.DataFrame(matrice_valori, columns=colonne) - -# Stampa il DataFrame -#print(df.to_string()) - -engine = create_engine('mysql+mysqlconnector://root:batt1l0@10.211.114.173/ase_lar') - -metadata = MetaData() - -table = Table('RAWDATACOR', metadata, autoload_with=engine) - -Base = declarative_base() - -class RawDataCor(Base): - __table__ = table - -with Session(engine) as session: - for index, row in df.iterrows(): - try: - nuova_riga = RawDataCor(**row.to_dict()) - session.add(nuova_riga) - session.commit() - except IntegrityError: - session.rollback() # Ignora l'errore di chiave duplicata - print(f"Riga con chiavi duplicate ignorata: {row.to_dict()}") - except Exception as e: - session.rollback() - print(f"Errore inatteso durante l'inserimento: {e}, riga: {row.to_dict()}") - - -#df.to_sql('RAWDATACOR', con=engine, if_exists='', index=False) diff --git a/run_trans.sh b/run_trans.sh deleted file mode 100755 index f12793e..0000000 --- a/run_trans.sh +++ /dev/null @@ -1,4 +0,0 @@ -for (( i=1; i<=29000; i++ )) -do -./transform_file.py -done diff --git a/transform_file.py b/transform_file.py deleted file mode 100755 index 4b4d18d..0000000 --- a/transform_file.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/env python3 - -import sys -import os - -import re -from datetime import datetime -import json -import mysql.connector as mysql - -import logging - -from utils.time import timestamp_fmt as ts -from utils.config import loader as setting - -def conn_db(cfg): - return mysql.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport ) - -def extract_value(patterns, source, default='Not Defined'): - ip = {} - for pattern in patterns: - s_pattern = rf'{pattern}:\s*(\d{{1,3}}(?:\.\d{{1,3}}){{3}})' - matches = re.search(s_pattern, source, re.IGNORECASE) - if matches: - ip.update({pattern: matches.group(1)}) - else: - ip.update({pattern: default}) - return ip - -def write_db(records, cfg): - insert_values = [ - ( - record["unit_name"], record["unit_type"], record["tool_name"], record["tool_type"], - record["unit_ip"], record["unit_subnet"], record["unit_gateway"], record["event_timestamp"], - record["battery_level"], record["temperature"], record["nodes_jsonb"] - ) - for record in records - ] - - query = f""" - INSERT IGNORE INTO {cfg.dbname}.{cfg.dbdataraw} ( - unit_name, unit_type, tool_name, tool_type, unit_ip, unit_subnet, unit_gateway, - event_timestamp, battery_level, temperature, nodes_jsonb - ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - """ - - try: - with conn_db(cfg) as conn: - conn.autocommit = True - with conn.cursor() as cur: - try: - cur.executemany(query, insert_values) - cur.close() - conn.commit() - except Exception as e: - logging.error(f'Records not inserted: {e}') - logging.info('Exit') - exit() - except Exception as e: - logging.error(f'Records not inserted: {e}') - exit() - -def elab_csv(cfg): - try: - with conn_db(cfg) as conn: - cur = conn.cursor() - cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = 0 limit 1') - id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() - cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}') - data_list = str(tool_data).strip("('{\"").strip("\"}\',)").split('","') - # Estrarre le informazioni degli ip dalla header - infos = extract_value(cfg.csv_infos, str(data_list[:9])) - except Exception as e: - logging.error(f'{e}') - - records = [] - # Definizione dei pattern - timestamp_pattern1 = r'(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2});' - timestamp_pattern2 = r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2});' - - # Formato desiderato per il timestamp - output_format = "%Y-%m-%d %H:%M:%S" - - for line in list(set(data_list)): - if (match := re.search(timestamp_pattern1, line)): - timestamp = datetime.strptime(match.group(1), "%Y/%m/%d %H:%M:%S").strftime(output_format) - elif (match := re.search(timestamp_pattern2, line)): - timestamp = datetime.strptime(match.group(1), "%d/%m/%Y %H:%M:%S").strftime(output_format) - else: - continue - - line_without_timestamp = (line[match.end():]).strip('|;') - - match_values = re.findall(r'[-+]?\d*\.\d+|\d+', line_without_timestamp) - battery_level, temperature = match_values[0], match_values[1] - remainder = ";".join(line_without_timestamp.split(";")[2:]).strip('|;') - - # Rimuovi spazi bianchi o caratteri di nuova riga - nodes = remainder.strip().replace('\\n', '').split(";|;") - - # Estrai i valori di ciascun nodo e formatta i dati come JSON - node_list = [] - for i, node_data in enumerate(nodes, start=1): - node_dict = {"num": i} - # Dividi ogni nodo in valori separati da ";" - node_values = node_data.split(';') - for j, value in enumerate(node_values, start=0): - # Imposta i valori a -9999 se trovi "Dis." - node_dict['val' + str(j)] = -9999 if (value == "Dis." or value == "Err1" or value == "Err2" or value == "---" or value == "NotAv" or value == "No RX" or value == "DMUXe" or value == "CH n. Error" or value == "-") else float(value) - node_list.append(node_dict) - - # Prepara i dati per l'inserimento/aggiornamento - record = { - "unit_name": unit_name.upper(), - "unit_type": unit_type.upper(), - "tool_name": tool_name.upper(), - "tool_type": tool_type.upper(), - "unit_ip": infos['IP'], - "unit_subnet": infos['Subnet'], - "unit_gateway": infos['Gateway'], - "event_timestamp": timestamp, - "battery_level": float(battery_level), - "temperature": float(temperature), - "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON - } - - records.append(record) - - # Se abbiamo raggiunto 500 record, esegui l'inserimento in batch - if len(records) >= 500: - logging.info("Raggiunti 500 record scrivo sul DB") - write_db(records, cfg) - records = [] - write_db(records, cfg) - - -def main(): - # Load the configuration settings - cfg = setting.config() - - try: - # Configura la connessione al database PostgreSQL - # Configure logging - logging.basicConfig( - format="%(asctime)s %(message)s", - filename=cfg.elablog, - level=logging.INFO, - ) - elab_csv(cfg) - - except KeyboardInterrupt: - logging.info( - "Info: {}.".format("Shutdown requested...exiting") - ) - - except Exception: - print( - "{} - PID {:>5} >> Error: {}.".format( - ts.timestamp("log"), os.getpid(), sys.exc_info()[1] - ) - ) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/utils/config/loader.py b/utils/config/loader.py index 199cf2b..457873c 100644 --- a/utils/config/loader.py +++ b/utils/config/loader.py @@ -6,8 +6,7 @@ from configparser import ConfigParser class Config: def __init__(self): c = ConfigParser() - c.read(["/etc/aseftp/ftpcsvreceiver.ini", "./ftpcsvreceiver.ini", - "./ftpReceiver/ftpcsvreceiver.ini"]) + c.read(["/etc/aseftp/ftp_csv_receiver.ini", "./ftp_csv_receiver.ini"]) # FTP setting self.firstport = c.getint("ftpserver", "firstPort") self.logfilename = c.get("ftpserver", "logFilename") @@ -37,6 +36,8 @@ class Config: self.dbusertable = c.get("db", "userTableName") self.dbrectable = c.get("db", "recTableName") self.dbrawdata = c.get("db", "rawTableName") + self.dbrawdata = c.get("db", "rawTableName") + self.dbnodes = c.get("db", "nodesTableName") # unit setting self.units_name = [part for part in c.get("unit", "Names").split('|')] diff --git a/utils/database/connection.py b/utils/database/connection.py index e9ba172..bfb7f6a 100644 --- a/utils/database/connection.py +++ b/utils/database/connection.py @@ -13,7 +13,7 @@ def connetti_db(cfg): A MySQL database connection object. """ try: - conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport) + conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport, database=cfg.dbname) conn.autocommit = True logger.info("Connected") return conn diff --git a/utils/database/get_nodes_type.py b/utils/database/get_nodes_type.py deleted file mode 100644 index 7fcdb7a..0000000 --- a/utils/database/get_nodes_type.py +++ /dev/null @@ -1,96 +0,0 @@ -import logging -import datetime -import os -import mysql.connector - -from utils.database.connection import connetti_db - -logger = logging.getLogger(__name__) - -def get_timestamp(log_type): - """Generates a timestamp string for logging.""" - now = datetime.datetime.now() - return now.strftime("%Y-%m-%d %H:%M:%S") - -def get_nodes_type(db_lar, server, username, password, tool, unit, channels, nodetype, ain, din): - """ - Retrieves node type, Ain, Din, and channels from the database for a specific tool and unit. - - Args: - db_lar (str): The name of the MySQL database. - server (str): The hostname or IP address of the MySQL server. - username (str): The MySQL username. - password (str): The MySQL password. - tool (str): The name of the tool. - unit (str): The name of the unit. - channels (list): An empty list to store the 'channels' values. - nodetype (list): An empty list to store the 'type' values. - ain (list): An empty list to store the 'ain' values. - din (list): An empty list to store the 'din' values. - """ - - - try: - conn = connetti_db(cfg) - - cursor = conn.cursor(dictionary=True) - - query = f""" - SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din - FROM nodes AS n - INNER JOIN tools AS t ON t.id = n.tool_id - INNER JOIN units AS u ON u.id = t.unit_id - INNER JOIN nodetypes AS y ON n.nodetype_id = y.id - WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}' - ORDER BY n.num; - """ - - cursor.execute(query) - results = cursor.fetchall() - - print(f"{get_timestamp('log')} - pid {os.getpid()} >> {unit} - {tool}: {cursor.rowcount} rows selected to get node type/Ain/Din/channels.") - - if not results: - print(f"{get_timestamp('log')} - pid {os.getpid()} >> Node/Channels/Ain/Din not defined.") - print(f"{get_timestamp('log')} - pid {os.getpid()} >> Execution ended.") - exit() - else: - for row in results: - channels.append(row['channels']) - nodetype.append(row['type']) - ain.append(row['ain']) - din.append(row['din']) - - cursor.close() - dbh.close() - - except mysql.connector.Error as err: - error_message = f"{get_timestamp('log')} - pid {os.getpid()} >> Could not connect to database: {err}" - print(error_message) - raise # Re-raise the exception if you want the calling code to handle it - -if __name__ == '__main__': - # Example usage (replace with your actual database credentials and values) - db_name = "your_database_name" - db_server = "your_server_address" - db_user = "your_username" - db_password = "your_password" - current_tool = "your_tool_name" - current_unit = "your_unit_name" - - node_channels = [] - node_types = [] - node_ains = [] - node_dins = [] - - try: - get_nodes_type(db_name, db_server, db_user, db_password, current_tool, current_unit, node_channels, node_types, node_ains, node_dins) - - print("\nRetrieved Data:") - print(f"Channels: {node_channels}") - print(f"Node Types: {node_types}") - print(f"Ains: {node_ains}") - print(f"Dins: {node_dins}") - - except Exception as e: - print(f"An error occurred: {e}") \ No newline at end of file diff --git a/utils/database/nodes_query.py b/utils/database/nodes_query.py new file mode 100644 index 0000000..0ed5724 --- /dev/null +++ b/utils/database/nodes_query.py @@ -0,0 +1,37 @@ +from utils.database.connection import connetti_db +import logging + +logger = logging.getLogger(__name__) + +def get_nodes_type(cfg, tool, unit): + + with connetti_db(cfg) as conn: + cur = conn.cursor(dictionary=True) + query = f""" + SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din + FROM {cfg.dbname}.{cfg.dbnodes} AS n + INNER JOIN tools AS t ON t.id = n.tool_id + INNER JOIN units AS u ON u.id = t.unit_id + INNER JOIN nodetypes AS y ON n.nodetype_id = y.id + WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}' + ORDER BY n.num; + """ + logger.info(f"{unit} - {tool}: Executing query: {query}") + cur.execute(query) + results = cur.fetchall() + logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.") + cur.close() + conn.close() + + if not results: + logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.") + return None, None, None, None + else: + channels, types, ains, dins = [], [], [], [] + for row in results: + channels.append(row['channels']) + types.append(row['type']) + ains.append(row['ain']) + dins.append(row['din']) + return channels, types, ains, dins + diff --git a/utils/timestamp/date_check.py b/utils/timestamp/date_check.py index 18e03fc..67c128b 100644 --- a/utils/timestamp/date_check.py +++ b/utils/timestamp/date_check.py @@ -1,6 +1,6 @@ from datetime import datetime -def conforma_data(data_string): +def conforma_data(data_string: str)->str: """ Conforma una stringa di data al formato YYYY-MM-DD, provando diversi formati di input.