From f2475fbafc422d70b983ca1df8533ac1bfb99e4d Mon Sep 17 00:00:00 2001 From: Alessandro Battilani Date: Sun, 31 Jan 2021 10:50:50 +0100 Subject: [PATCH] aa --- CsvLoader.py | 21 +++- FtpCsvReceiver.py | 241 +++++++++++++++++++++++++--------------------- checkDBsync.py | 0 3 files changed, 150 insertions(+), 112 deletions(-) create mode 100644 checkDBsync.py diff --git a/CsvLoader.py b/CsvLoader.py index 7ed4345..7d0f2ab 100755 --- a/CsvLoader.py +++ b/CsvLoader.py @@ -7,6 +7,7 @@ import logging import csv import re import mariadb +import shutil from asebat.timefmt import timestamp_fmt as ts from asebat.timefmt import date_refmt as df @@ -100,7 +101,7 @@ def callback_ase(ch, method, properties, body, config): #body è di tipo byte sql.write_db() stmlst.clear() except: - print("errore nell'inseriento") + print("errore nell'inserimento") sys.exit(1) if len(stmlst) > 0: @@ -109,8 +110,24 @@ def callback_ase(ch, method, properties, body, config): #body è di tipo byte sql.write_db() ch.basic_ack(delivery_tag=method.delivery_tag) except: - print("errore nell'inseriento") + print("errore nell'inserimento") sys.exit(1) + newFilename = msg[6].replace("received", "loaded") + newPath, filenameExt = os.path.split(newFilename) + try: + os.makedirs(newPath) + logging.info("PID {:>5} >> path {} created.".format( + os.getpid(), newPath)) + except FileExistsError: + logging.info("PID {:>5} >> path {} already exists.".format( + os.getpid(), newPath)) + try: + shutil.move(msg[6], newFilename) + logging.info("PID {:>5} >> {} moved into {}.".format( + os.getpid(), filenameExt, newFilename)) + except OSError: + logging.error("PID {:>5} >> Error to move {} into {}.".format( + os.getpid(), filenameExt, newFilename)) def main(): diff --git a/FtpCsvReceiver.py b/FtpCsvReceiver.py index 699ee5d..03d84bd 100755 --- a/FtpCsvReceiver.py +++ b/FtpCsvReceiver.py @@ -19,13 +19,18 @@ from pyftpdlib.handlers import FTPHandler from pyftpdlib.servers import FTPServer from pyftpdlib.authorizers import UnixAuthorizer from pyftpdlib.filesystems import UnixFilesystem - + + def send_mail(sev, msg, cfg): msg = MIMEText(cfg.message + "\n" + msg) msg["Subject"] = cfg.subject + " " + sev msg["From"] = cfg.sender msg["To"] = cfg.receivers - conn = SMTP(host=cfg.smtphost, port=cfg.smtpport, local_hostname=None, timeout=5, source_address=None ) + conn = SMTP(host=cfg.smtphost, + port=cfg.smtpport, + local_hostname=None, + timeout=5, + source_address=None) conn.set_debuglevel(cfg.debuglevel) try: conn.login(cfg.sender, cfg.password) @@ -43,8 +48,8 @@ def send_mail(sev, msg, cfg): class mq(): def __init__(self, cfg): parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + - cfg.mqpass + '@' + cfg.mqhost + - ':' + cfg.mqport +'/%2F') + cfg.mqpass + '@' + cfg.mqhost + ':' + + cfg.mqport + '/%2F') connection = pika.BlockingConnection(parameters) self.channel = connection.channel() self.channel.queue_declare(queue=cfg.csv_queue, durable=True) @@ -61,123 +66,139 @@ class mq(): logging.info("PID {:>5} >> write message {} in queue".format( os.getpid(), msg)) except: - logging.error("PID {:>5} >> error write message {} in queue".format( - os.getpid(), msg)) - def close(self): - self.connection.close() + logging.error( + "PID {:>5} >> error write message {} in queue".format( + os.getpid(), msg)) + + def close(self): + self.channel.close() class ASEHandler(FTPHandler): def on_file_received(self, file): - cfg = self.cfg - path, filenameExt = os.path.split(file) - filename, fileExtension = os.path.splitext(filenameExt) + unitType = '' + unitName = '' + toolName = '' + toolType = '' + fileDate = '' + fileTime = '' + queue = '' + if not os.stat(file).st_size: + os.remove(file) + logging.info("PID {:>5} >> file {} was empty: removed.".format( + os.getpid(), file)) + else: + cfg = self.cfg + path, filenameExt = os.path.split(file) + filename, fileExtension = os.path.splitext(filenameExt) - if (m := re.match( - r"^(G\d\d\d)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d)(\d\d)(\d\d)(\d\d)$", - filename, - re.I, - )): - unitType = m.group(1).upper() - unitName = m.group(2).upper() - toolName = m.group(3).upper() - toolType = "N/A" - fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4) - fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9) - elif re.match(r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", - filename, re.I): - with open(file, "r") as fileCsv: - try: - for i, line in enumerate(fileCsv.readlines(4096), 1): - if (m1 := re.match( - r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$", - line, - re.I, + if (m := re.match( + r"^(G\d\d\d)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d)(\d\d)(\d\d)(\d\d)$", + filename, + re.I, + )): + unitType = m.group(1).upper() + unitName = m.group(2).upper() + toolName = m.group(3).upper() + toolType = "N/A" + fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4) + fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9) + elif re.match( + r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", + filename, re.I): + with open(file, "r") as fileCsv: + try: + for i, line in enumerate(fileCsv.readlines(4096), 1): + if (m1 := re.match( + r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$", + line, + re.I, )): - fileDate = m1.group(2) - fileTime = m1.group(3) + fileDate = m1.group(2) + fileTime = m1.group(3) - elif (m2 := re.match( - r"^(\w+\d+)\s(\w+\d+)\;*\n?$", - line, - re.I, + elif (m2 := re.match( + r"^(\w+\d+)\s(\w+\d+)\;*\n?$", + line, + re.I, )): - unitType = m2.group(1).upper() - unitName = m2.group(2).upper() + unitType = m2.group(1).upper() + unitName = m2.group(2).upper() - elif (m3 := re.match( - r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$", - line, - re.I - )): - if m3.group(2): - toolType = m3.group(1).upper() - toolName = m3.group(2).upper() - else: - toolType = "".join(re.findall("^[a-zA-Z]+", m3.group(1))).upper() - toolName = m3.group(1).upper() - break - except: - logging.error("PID {:>5} >> Error: {}.".format( - os.getpid(), - sys.exc_info()[1])) - fileCsv.close - - logging.info("PID {:>5} >> {} - {} - {} - {} - {} {}.".format( - os.getpid(), - unitType, - unitName, - toolName, - toolType, - df.dateFmt(fileDate), - fileTime, - )) - newPath = cfg.csvfs + self.username + "/received/" + unitName.upper() + "/" - newFilename = (newPath + filename + "_" + - str(ts.timestamp("tms") + fileExtension)) - fileRenamed = (file + "_" + str(ts.timestamp("tms"))) - os.rename(file, fileRenamed) - try: - os.makedirs(newPath) - logging.info("PID {:>5} >> path {} created.".format( - os.getpid(), newPath)) - except FileExistsError: - logging.info("PID {:>5} >> path {} already exists.".format( - os.getpid(), newPath)) - try: - shutil.move(fileRenamed, newFilename) - logging.info("PID {:>5} >> {} moved into {}.".format( - os.getpid(), filenameExt, newFilename)) - except OSError: - logging.error("PID {:>5} >> Error to move {} into {}.".format( - os.getpid(), filenameExt, newFilename)) - send_mail("Error", - "OS error move " + filenameExt + " to " + newFilename, cfg) - - - mq_message = "{};{};{};{};{};{};{}".format( - unitType, - unitName, - toolName, - toolType, - df.dateFmt(fileDate), - fileTime, - newFilename, - ) - try: - queue = mq(cfg) - queue.write(mq_message, cfg) - logging.info("PID {:>5} >> queue message: {}.".format( - os.getpid(), mq_message)) - except: - logging.error("PID {:>5} >> Error to put message in queue: {}.".format( - os.getpid(), mq_message)) - send_mail("Error", - "Error to put message " + mq_message + " in queue.", cfg) - finally: - queue.close() + elif (m3 := re.match( + r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$", + line, re.I)): + if m3.group(2): + toolType = m3.group(1).upper() + toolName = m3.group(2).upper() + else: + toolType = "".join( + re.findall("^[a-zA-Z]+", + m3.group(1))).upper() + toolName = m3.group(1).upper() + break + except: + logging.error("PID {:>5} >> Error: {}.".format( + os.getpid(), + sys.exc_info()[1])) + fileCsv.close + logging.info("PID {:>5} >> {} - {} - {} - {} - {} {}.".format( + os.getpid(), + unitType, + unitName, + toolName, + toolType, + df.dateFmt(fileDate), + fileTime, + )) + newPath = cfg.csvfs + self.username + "/received/" + unitName.upper( + ) + "/" + newFilename = (newPath + filename + "_" + + str(ts.timestamp("tms") + fileExtension)) + fileRenamed = (file + "_" + str(ts.timestamp("tms"))) + os.rename(file, fileRenamed) + try: + os.makedirs(newPath) + logging.info("PID {:>5} >> path {} created.".format( + os.getpid(), newPath)) + except FileExistsError: + logging.info("PID {:>5} >> path {} already exists.".format( + os.getpid(), newPath)) + try: + shutil.move(fileRenamed, newFilename) + logging.info("PID {:>5} >> {} moved into {}.".format( + os.getpid(), filenameExt, newFilename)) + except OSError: + logging.error("PID {:>5} >> Error to move {} into {}.".format( + os.getpid(), filenameExt, newFilename)) + send_mail( + "Error", + "OS error move " + filenameExt + " to " + newFilename, cfg) + mq_message = "{};{};{};{};{};{};{}".format( + unitType, + unitName, + toolName, + toolType, + df.dateFmt(fileDate), + fileTime, + newFilename, + ) + try: + queue = mq(cfg) + queue.write(mq_message, cfg) + logging.info("PID {:>5} >> queue message: {}.".format( + os.getpid(), mq_message)) + except: + logging.error( + "PID {:>5} >> Error to put message in queue: {}.".format( + os.getpid(), mq_message)) + send_mail("Error", + "Error to put message " + mq_message + " in queue.", + cfg) + finally: + queue.close() def on_incomplete_file_received(self, file): # remove partially uploaded files diff --git a/checkDBsync.py b/checkDBsync.py new file mode 100644 index 0000000..e69de29