From cec66d0028cbf19852b29881d8882849c5e4b36c Mon Sep 17 00:00:00 2001 From: Alessandro Battilani Date: Sat, 16 May 2020 22:36:18 +0200 Subject: [PATCH] mariadb insert --- CsvLoader.py | 79 +++++++++++++++++++++++++++++-------- FtpCsvReceiver.py | 2 +- asebat/config/set_config.py | 21 +++++++--- ftpcsvreceiver.ini | 12 +++++- 4 files changed, 89 insertions(+), 25 deletions(-) diff --git a/CsvLoader.py b/CsvLoader.py index 7b5a07e..7ed4345 100755 --- a/CsvLoader.py +++ b/CsvLoader.py @@ -6,6 +6,7 @@ import pika import logging import csv import re +import mariadb from asebat.timefmt import timestamp_fmt as ts from asebat.timefmt import date_refmt as df @@ -13,31 +14,57 @@ from asebat.config import set_config as setting class sqlraw(): - def __init__(self, db, table): - self.db = db - self.table = table - self.sql = ( - "INSERT IGNORE INTO " + db + "." + table + + def __init__(self, cfg): + self.config = { + 'host': cfg.dbhost, + 'user': cfg.dbuser, + 'password': cfg.dbpass + } + self.dbname = cfg.dbname + self.table = cfg.table + self.sql_head = ( + "INSERT IGNORE INTO " + self.dbname + "." + self.table + " (`UnitName`,`ToolName`,`eventDT`,`BatteryLevel`,`Temperature`,`NodeNum`," + "`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`," + "`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`) VALUES ") def add_data(self, values): - self.sql += '(' + '),('.join(values) + ');' + self.sql = self.sql_head + '(' + '),('.join(values) + ');' + + def write_db(self): + try: + conn = mariadb.connect(**self.config, database=self.dbname) + except mariadb.Error as err: + logging.error("PID {:>5} >> Error to connet to DB {} - System error {}.".format( + os.getpid(), self.dbname, err)) + sys.exit(1) + + cur = conn.cursor() + try: + cur.execute(self.sql) + except mariadb.ProgrammingError as err: + logging.error("PID {:>5} >> Error write into DB {} - System error {}.".format( + os.getpid(), self.dbname, err)) + print(err) + sys.exit(1) + + finally: + conn.close() -def callback(ch, method, properties, body): #body è di tipo byte +def callback_ase(ch, method, properties, body, config): #body è di tipo byte logging.info("PID {0:>5} >> Read message {1}".format(os.getpid(), body.decode("utf-8"))) msg = body.decode("utf-8").split(";") - sql = sqlraw('ase', 'rawdata') + sql = sqlraw(config) stmlst = [] commonData = '"{0}","{1}"'.format(msg[1],msg[2]) + tooltype = msg[3] with open(msg[6], "r") as csvfile: lines = csvfile.read().splitlines() for line in lines: + fields = line.split(";|;") - print(fields) if (mG501 := re.match( r"^(\d\d\d\d\/\d\d\/\d\d\s\d\d:\d\d:\d\d);(.+);(.+)$", fields[0] @@ -54,11 +81,10 @@ def callback(ch, method, properties, body): #body è di tipo byte fields[1] ) - rowData = ',"{0}",{1},{2}'.format(df.dateTimeFmt(mG201.group(1)), mbtG201.group(1), mbtG201.group(2)) fields.pop(0) fields.pop(0) - + else: continue @@ -66,11 +92,25 @@ def callback(ch, method, properties, body): #body è di tipo byte for field in fields: nodeNum += 1 vals = field.split(";") - stmlst.append(commonData + rowData + ',{0},'.format(nodeNum) + ','.join('"{0}"'.format(d) for d in vals) + ',' + ','.join(['null']*(16-len(vals)))) - sql.add_data(stmlst) - print(sql.sql) + stmlst.append(commonData + rowData + ',{0},'.format(nodeNum) + ','.join('"{0}"'.format(d) for d in vals) + ',' + ','.join(['null']*(config.valueNum-len(vals)))) - ch.basic_ack(delivery_tag=method.delivery_tag) + if (config.maxInsertRow < len(stmlst)): + sql.add_data(stmlst) + try: + sql.write_db() + stmlst.clear() + except: + print("errore nell'inseriento") + sys.exit(1) + + if len(stmlst) > 0: + sql.add_data(stmlst) + try: + sql.write_db() + ch.basic_ack(delivery_tag=method.delivery_tag) + except: + print("errore nell'inseriento") + sys.exit(1) def main(): @@ -83,13 +123,18 @@ def main(): ) parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + cfg.mqpass + - '@' + cfg.mqhost + ':5672/%2F') + '@' + cfg.mqhost + ':' + cfg.mqport +'/%2F') connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=cfg.csv_queue, durable=True) channel.basic_qos(prefetch_count=1) - channel.basic_consume(queue=cfg.csv_queue, on_message_callback=callback) + channel.basic_consume( + queue=cfg.csv_queue, + on_message_callback=lambda ch, method, properties, body: callback_ase(ch, method, properties, body, config=cfg) + ) + + #channel.basic_consume(queue=cfg.csv_queue, on_message_callback=callback,arguments=cfg) try: channel.start_consuming() except KeyboardInterrupt: diff --git a/FtpCsvReceiver.py b/FtpCsvReceiver.py index eeaadff..699ee5d 100755 --- a/FtpCsvReceiver.py +++ b/FtpCsvReceiver.py @@ -44,7 +44,7 @@ class mq(): def __init__(self, cfg): parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + cfg.mqpass + '@' + cfg.mqhost + - ':5672/%2F') + ':' + cfg.mqport +'/%2F') connection = pika.BlockingConnection(parameters) self.channel = connection.channel() self.channel.queue_declare(queue=cfg.csv_queue, durable=True) diff --git a/asebat/config/set_config.py b/asebat/config/set_config.py index 829a6f6..d68ec93 100644 --- a/asebat/config/set_config.py +++ b/asebat/config/set_config.py @@ -8,11 +8,11 @@ class config(): def __init__(self): c = ConfigParser() c.read(["/etc/ase/ftpcsvreceiver.ini", "./ftpcsvreceiver.ini"]) - + # FTP setting self.firstport = c.getint("ftpserver", "firstport") self.logfilename = c.get("ftpserver", "logfilename") self.proxyaddr = c.get("ftpserver", "proxyaddr") - + # MAIL setting self.smtphost = c.get("mailserver", "hostname") self.smtpport = c.getint("mailserver", "port") self.sender = c.get("mailserver", "sender") @@ -21,13 +21,22 @@ class config(): self.message = c.get("mailserver", "message") self.subject = c.get("mailserver", "subject") self.debuglevel = c.getint("mailserver", "debug") - + # MQ setting self.mqhost = c.get("mqserver", "hostname") + self.mqport = c.get("mqserver", "port") self.mquser = c.get("mqserver", "user") self.mqpass = c.get("mqserver", "password") self.csv_queue = c.get("mqserver", "csv_queue") self.elab_queue = c.get("mqserver", "elab_queue") - + # CSV FILE setting self.csvfs = c.get("csvfs", "path") - - self.elablog = c.get("csvelab", "logfilename") \ No newline at end of file + # LOADER setting + self.elablog = c.get("csvelab", "logfilename") + # DB setting + self.dbhost = c.get("db", "hostname") + self.dbuser = c.get("db", "user") + self.dbpass = c.get("db", "password") + self.dbname = c.get("db", "dbname") + self.table = c.get("db", "tablename") + self.valueNum = c.getint("db", "valueNum") + self.maxInsertRow = c.getint("db", "maxInsertRow") \ No newline at end of file diff --git a/ftpcsvreceiver.ini b/ftpcsvreceiver.ini index 343d9dd..e279dd1 100644 --- a/ftpcsvreceiver.ini +++ b/ftpcsvreceiver.ini @@ -18,6 +18,7 @@ [mqserver] hostname = 192.168.1.241 + port = 5672 user = alex password = batt1l0 csv_queue = task_queue @@ -27,4 +28,13 @@ path = /home/ [csvelab] - logfilename = csvElab.log \ No newline at end of file + logfilename = csvElab.log + +[db] + hostname = 192.168.1.241 + user = root + password = batt1l0 + dbname = ase + tablename = rawdata + maxInsertRow = 20000 + valueNum = 16 \ No newline at end of file