diff --git a/CsvLoader.py b/CsvLoader.py index d33b246..36041af 100755 --- a/CsvLoader.py +++ b/CsvLoader.py @@ -2,184 +2,45 @@ import sys import os -import shutil -import ssl import pika -import re import logging -from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError -from email.mime.text import MIMEText - -from configparser import ConfigParser - from asebat.timefmt import timestamp_fmt as ts from asebat.timefmt import date_refmt as df from asebat.config import set_config as setting - -def send_mail(sev, msg, cfg): - msg = MIMEText(cfg.message + "\n" + msg) - msg["Subject"] = cfg.subject + " " + sev - msg["From"] = cfg.sender - msg["To"] = cfg.receivers - conn = SMTP(host=cfg.smtphost, port=cfg.smtpport, local_hostname=None, timeout=5, source_address=None ) - conn.set_debuglevel(cfg.debuglevel) - try: - conn.login(cfg.sender, cfg.password) - conn.sendmail(cfg.sender, cfg.receivers, msg.as_string()) - except SMTPAuthenticationError: - logging.error("PID {:>5} >> Mail failed: {}.".format( - os.getpid(), "SMTP authentication error")) - except: - logging.info("PID {:>5} >> Mail failed: {}.".format( - os.getpid(), "CUSTOM_ERROR")) - finally: - conn.quit() -def write_queue(msg, cfg): - parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + - cfg.mqpass + '@' + cfg.mqhost + - ':5672/%2F') - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.queue_declare(queue='task_queue', durable=True) - channel.basic_publish( - exchange='', - routing_key='task_queue', - body=msg, - properties=pika.BasicProperties( - delivery_mode=2, # make message persistent - )) - logging.info("PID {:>5} >> write message {} in queue".format( - os.getpid(), msg)) - connection.close() +def callback(ch, method, properties, body): #body รจ di tipo byte + logging.info("PID {:>5} >> Read message {}".format( + os.getpid(), body.decode("utf-8"))) -class ASEHandler(FTPHandler): - def on_file_received(self, file): - cfg = self.cfg - path, filenameExt = os.path.split(file) - filename, fileExtension = os.path.splitext(filenameExt) - if (m := re.match( - r"^(G\d\d\d)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d)(\d\d)(\d\d)(\d\d)$", - filename, - re.I, - )): - unitType = m.group(1).upper() - unitName = m.group(2).upper() - toolName = m.group(3).upper() - toolType = "N/A" - fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4) - fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9) - elif re.match(r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", - filename, re.I): - with open(file, "r") as fileCsv: - try: - for i, line in enumerate(fileCsv.readlines(4096), 1): - if i == 1: - m1 = re.match( - r"File Creation Date: (\d*\/\d*\/\d*)\s(\d*:\d*:\d*).*", - line, - re.I, - ) - fileDate = m1.group(1) - fileTime = m1.group(2) - if i == 2: - unitType, unitName = (line.strip("\n").replace( - ";", "").replace(",", "").split(" ")) - if i == 6: - m6 = re.match(r"SD path: .*\/.*\/(.*)\/(.*)\..*", - line, re.I) - toolType = m6.group(1) - toolName = m6.group(2) - break - except: - logging.error("PID {:>5} >> Error: {}.".format( - os.getpid(), - sys.exc_info()[1])) - fileCsv.close - - logging.info("PID {:>5} >> {} - {} - {} - {} - {} {}.".format( - os.getpid(), - unitType.upper(), - unitName.upper(), - toolName.upper(), - toolType.upper(), - df.dateFmt(fileDate), - fileTime, - )) - newPath = cfg.csvfs + self.username + "/received/" + unitName.upper( - ) + "/" - newFilename = (newPath + filename + "_" + - str(ts.timestamp("tms") + fileExtension)) - try: - os.makedirs(newPath) - logging.info("PID {:>5} >> path {} created.".format( - os.getpid(), newPath)) - except FileExistsError: - logging.info("PID {:>5} >> path {} already exists.".format( - os.getpid(), newPath)) - try: - shutil.move(file, newFilename) - logging.info("PID {:>5} >> {} moved into {}.".format( - os.getpid(), filenameExt, newFilename)) - except OSError: - logging.error("PID {:>5} >> Error to move {} into {}.".format( - os.getpid(), filenameExt, newFilename)) - # send_mail("Error", - # "OS error move " + filenameExt + " to " + newFilename, cfg) - - - mq_message = "{};{};{};{};{};{};{}".format( - unitType.upper(), - unitName.upper(), - toolName.upper(), - toolType.upper(), - df.dateFmt(fileDate), - fileTime, - newFilename, - ) - write_queue(mq_message, cfg) - logging.info("PID {:>5} >> queue message: {}.".format( - os.getpid(), mq_message)) - - def on_incomplete_file_received(self, file): - # remove partially uploaded files - os.remove(file) + ch.basic_ack(delivery_tag=method.delivery_tag) def main(): - cfg = config() + cfg = setting.config() + + logging.basicConfig( + format="%(asctime)s %(message)s", + filename="/var/log/" + cfg.elablog, + level=logging.INFO, + ) + + parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + cfg.mqpass + + '@' + cfg.mqhost + ':5672/%2F') + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=cfg.csv_queue, durable=True) + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(queue=cfg.csv_queue, on_message_callback=callback) try: - authorizer = UnixAuthorizer(rejected_users=["root"], - require_valid_shell=True) - handler = ASEHandler - handler.cfg = cfg - handler.authorizer = authorizer - handler.abstracted_fs = UnixFilesystem - handler.masquerade_address = cfg.proxyaddr - _range = list(range(cfg.firstport, cfg.firstport + 20)) - handler.passive_ports = _range - - logging.basicConfig( - format="%(asctime)s %(message)s", - filename="/var/log/" + cfg.logfilename, - level=logging.INFO, - ) - - server = FTPServer(("0.0.0.0", 21), handler) - - server.serve_forever() + channel.start_consuming() except KeyboardInterrupt: logging.info("PID {:>5} >> Info: {}.".format( os.getpid(), "Shutdown requested...exiting")) - except Exception: - print("{} - PID {:>5} >> Error: {}.".format(ts.timestamp("log"), - os.getpid(), - sys.exc_info()[1])) - if __name__ == "__main__": main() diff --git a/FtpCsvReceiver.py b/FtpCsvReceiver.py index 13b8314..eeaadff 100755 --- a/FtpCsvReceiver.py +++ b/FtpCsvReceiver.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 +#!/usr/bin/python3.8 import sys import os @@ -40,23 +40,31 @@ def send_mail(sev, msg, cfg): conn.quit() -def write_queue(msg, cfg): - parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + - cfg.mqpass + '@' + cfg.mqhost + - ':5672/%2F') - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.queue_declare(queue=cfg.queue, durable=True) - channel.basic_publish( - exchange='', - routing_key=cfg.queue, - body=msg, - properties=pika.BasicProperties( - delivery_mode=2, # make message persistent - )) - logging.info("PID {:>5} >> write message {} in queue".format( - os.getpid(), msg)) - connection.close() +class mq(): + def __init__(self, cfg): + parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + + cfg.mqpass + '@' + cfg.mqhost + + ':5672/%2F') + connection = pika.BlockingConnection(parameters) + self.channel = connection.channel() + self.channel.queue_declare(queue=cfg.csv_queue, durable=True) + + def write(self, msg, cfg): + try: + self.channel.basic_publish( + exchange='', + routing_key=cfg.csv_queue, + body=msg, + properties=pika.BasicProperties( + delivery_mode=2, # make message persistent + )) + logging.info("PID {:>5} >> write message {} in queue".format( + os.getpid(), msg)) + except: + logging.error("PID {:>5} >> error write message {} in queue".format( + os.getpid(), msg)) + def close(self): + self.connection.close() class ASEHandler(FTPHandler): @@ -81,22 +89,33 @@ class ASEHandler(FTPHandler): with open(file, "r") as fileCsv: try: for i, line in enumerate(fileCsv.readlines(4096), 1): - if i == 1: - m1 = re.match( - r"File Creation Date: (\d*\/\d*\/\d*)\s(\d*:\d*:\d*).*", + if (m1 := re.match( + r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$", line, re.I, - ) - fileDate = m1.group(1) - fileTime = m1.group(2) - if i == 2: - unitType, unitName = (line.strip("\n").replace( - ";", "").replace(",", "").split(" ")) - if i == 6: - m6 = re.match(r"SD path: .*\/.*\/(.*)\/(.*)\..*", - line, re.I) - toolType = m6.group(1) - toolName = m6.group(2) + )): + fileDate = m1.group(2) + fileTime = m1.group(3) + + elif (m2 := re.match( + r"^(\w+\d+)\s(\w+\d+)\;*\n?$", + line, + re.I, + )): + unitType = m2.group(1).upper() + unitName = m2.group(2).upper() + + elif (m3 := re.match( + r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$", + line, + re.I + )): + if m3.group(2): + toolType = m3.group(1).upper() + toolName = m3.group(2).upper() + else: + toolType = "".join(re.findall("^[a-zA-Z]+", m3.group(1))).upper() + toolName = m3.group(1).upper() break except: logging.error("PID {:>5} >> Error: {}.".format( @@ -106,18 +125,18 @@ class ASEHandler(FTPHandler): logging.info("PID {:>5} >> {} - {} - {} - {} - {} {}.".format( os.getpid(), - unitType.upper(), - unitName.upper(), - toolName.upper(), - toolType.upper(), + unitType, + unitName, + toolName, + toolType, df.dateFmt(fileDate), fileTime, )) - newPath = cfg.csvfs + self.username + "/received/" + unitName.upper( - ) + "/" + newPath = cfg.csvfs + self.username + "/received/" + unitName.upper() + "/" newFilename = (newPath + filename + "_" + str(ts.timestamp("tms") + fileExtension)) fileRenamed = (file + "_" + str(ts.timestamp("tms"))) + os.rename(file, fileRenamed) try: os.makedirs(newPath) logging.info("PID {:>5} >> path {} created.".format( @@ -126,29 +145,39 @@ class ASEHandler(FTPHandler): logging.info("PID {:>5} >> path {} already exists.".format( os.getpid(), newPath)) try: - os.rename(file, fileRenamed) shutil.move(fileRenamed, newFilename) logging.info("PID {:>5} >> {} moved into {}.".format( os.getpid(), filenameExt, newFilename)) except OSError: logging.error("PID {:>5} >> Error to move {} into {}.".format( os.getpid(), filenameExt, newFilename)) - # send_mail("Error", - # "OS error move " + filenameExt + " to " + newFilename, cfg) + send_mail("Error", + "OS error move " + filenameExt + " to " + newFilename, cfg) mq_message = "{};{};{};{};{};{};{}".format( - unitType.upper(), - unitName.upper(), - toolName.upper(), - toolType.upper(), + unitType, + unitName, + toolName, + toolType, df.dateFmt(fileDate), fileTime, newFilename, ) - write_queue(mq_message, cfg) - logging.info("PID {:>5} >> queue message: {}.".format( + try: + queue = mq(cfg) + queue.write(mq_message, cfg) + logging.info("PID {:>5} >> queue message: {}.".format( os.getpid(), mq_message)) + except: + logging.error("PID {:>5} >> Error to put message in queue: {}.".format( + os.getpid(), mq_message)) + send_mail("Error", + "Error to put message " + mq_message + " in queue.", cfg) + finally: + queue.close() + + def on_incomplete_file_received(self, file): # remove partially uploaded files diff --git a/asebat/config/set_config.py b/asebat/config/set_config.py index f5dd343..829a6f6 100644 --- a/asebat/config/set_config.py +++ b/asebat/config/set_config.py @@ -25,6 +25,9 @@ class config(): self.mqhost = c.get("mqserver", "hostname") self.mquser = c.get("mqserver", "user") self.mqpass = c.get("mqserver", "password") - self.queue = c.get("mqserver", "queue") + self.csv_queue = c.get("mqserver", "csv_queue") + self.elab_queue = c.get("mqserver", "elab_queue") self.csvfs = c.get("csvfs", "path") + + self.elablog = c.get("csvelab", "logfilename") \ No newline at end of file diff --git a/asebat/parsers/G801_parser.py b/asebat/parsers/G801_parser.py index e69de29..a1d964f 100644 --- a/asebat/parsers/G801_parser.py +++ b/asebat/parsers/G801_parser.py @@ -0,0 +1 @@ +locals \ No newline at end of file diff --git a/ftpcsvreceiver.ini b/ftpcsvreceiver.ini index 9d99fbe..343d9dd 100644 --- a/ftpcsvreceiver.ini +++ b/ftpcsvreceiver.ini @@ -17,10 +17,14 @@ debug = 1 [mqserver] - hostname = 192.168.1.242 + hostname = 192.168.1.241 user = alex password = batt1l0 - queue = task_queue + csv_queue = task_queue + elab_queue = elab_queue [csvfs] - path = /home/ \ No newline at end of file + path = /home/ + +[csvelab] + logfilename = csvElab.log \ No newline at end of file