diff --git a/CsvLoader.py b/CsvLoader.py new file mode 100755 index 0000000..f71f41c --- /dev/null +++ b/CsvLoader.py @@ -0,0 +1,209 @@ +#!/usr/bin/python3 + +import sys +import os +import shutil +import ssl +import pika +import re +import logging + +from configparser import ConfigParser + +from asebat.timefmt import timestamp_fmt as ts +from asebat.timefmt import date_refmt as df + + +class config(): + def __init__(self): + c = ConfigParser() + c.read([ + "/etc/ase/ftpcsvreceiver.ini", + "./ftpcsvreceiver.ini" + ]) + + self.firstport = c.getint("ftpserver", "firstport") + self.logfilename = c.get("ftpserver", "logfilename") + self.proxyaddr = c.get("ftpserver", "proxyaddr") + + self.smtphost = c.get("mailserver", "hostname") + self.smtpport = c.getint("mailserver", "port") + self.sender = c.get("mailserver", "sender") + self.password = c.get("mailserver", "password") + self.receivers = c.get("mailserver", "receivers") + self.message = c.get("mailserver", "message") + self.subject = c.get("mailserver", "subject") + self.debuglevel = c.getint("mailserver", "debug") + + self.mqhost = c.get("mqserver", "hostname") + self.mquser = c.get("mqserver", "user") + self.mqpass = c.get("mqserver", "password") + + self.csvfs = c.get("csvfs", "path") + +def send_mail(sev, msg, cfg): + msg = MIMEText(cfg.message + "\n" + msg) + msg["Subject"] = cfg.subject + " " + sev + msg["From"] = cfg.sender + msg["To"] = cfg.receivers + conn = SMTP(host=cfg.smtphost, port=cfg.smtpport, local_hostname=None, timeout=5, source_address=None ) + conn.set_debuglevel(cfg.debuglevel) + try: + conn.login(cfg.sender, cfg.password) + conn.sendmail(cfg.sender, cfg.receivers, msg.as_string()) + except SMTPAuthenticationError: + logging.error("PID {:>5} >> Mail failed: {}.".format( + os.getpid(), "SMTP authentication error")) + except: + logging.info("PID {:>5} >> Mail failed: {}.".format( + os.getpid(), "CUSTOM_ERROR")) + finally: + conn.quit() + + +def write_queue(msg, cfg): + parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + + cfg.mqpass + '@' + cfg.mqhost + + ':5672/%2F') + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue='task_queue', durable=True) + channel.basic_publish( + exchange='', + routing_key='task_queue', + body=msg, + properties=pika.BasicProperties( + delivery_mode=2, # make message persistent + )) + logging.info("PID {:>5} >> write message {} in queue".format( + os.getpid(), msg)) + connection.close() + + +class ASEHandler(FTPHandler): + def on_file_received(self, file): + cfg = self.cfg + path, filenameExt = os.path.split(file) + filename, fileExtension = os.path.splitext(filenameExt) + + if (m := re.match( + r"^(G\d\d\d)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d)(\d\d)(\d\d)(\d\d)$", + filename, + re.I, + )): + unitType = m.group(1).upper() + unitName = m.group(2).upper() + toolName = m.group(3).upper() + toolType = "N/A" + fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4) + fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9) + elif re.match(r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", + filename, re.I): + with open(file, "r") as fileCsv: + try: + for i, line in enumerate(fileCsv.readlines(4096), 1): + if i == 1: + m1 = re.match( + r"File Creation Date: (\d*\/\d*\/\d*)\s(\d*:\d*:\d*).*", + line, + re.I, + ) + fileDate = m1.group(1) + fileTime = m1.group(2) + if i == 2: + unitType, unitName = (line.strip("\n").replace( + ";", "").replace(",", "").split(" ")) + if i == 6: + m6 = re.match(r"SD path: .*\/.*\/(.*)\/(.*)\..*", + line, re.I) + toolType = m6.group(1) + toolName = m6.group(2) + break + except: + logging.error("PID {:>5} >> Error: {}.".format( + os.getpid(), + sys.exc_info()[1])) + fileCsv.close + + logging.info("PID {:>5} >> {} - {} - {} - {} - {} {}.".format( + os.getpid(), + unitType.upper(), + unitName.upper(), + toolName.upper(), + toolType.upper(), + df.dateFmt(fileDate), + fileTime, + )) + newPath = cfg.csvfs + self.username + "/received/" + unitName.upper( + ) + "/" + newFilename = (newPath + filename + "_" + + str(ts.timestamp("tms") + fileExtension)) + try: + os.makedirs(newPath) + logging.info("PID {:>5} >> path {} created.".format( + os.getpid(), newPath)) + except FileExistsError: + logging.info("PID {:>5} >> path {} already exists.".format( + os.getpid(), newPath)) + try: + shutil.move(file, newFilename) + logging.info("PID {:>5} >> {} moved into {}.".format( + os.getpid(), filenameExt, newFilename)) + except OSError: + logging.error("PID {:>5} >> Error to move {} into {}.".format( + os.getpid(), filenameExt, newFilename)) + # send_mail("Error", + # "OS error move " + filenameExt + " to " + newFilename, cfg) + + + mq_message = "{};{};{};{};{};{};{}".format( + unitType.upper(), + unitName.upper(), + toolName.upper(), + toolType.upper(), + df.dateFmt(fileDate), + fileTime, + newFilename, + ) + write_queue(mq_message, cfg) + logging.info("PID {:>5} >> queue message: {}.".format( + os.getpid(), mq_message)) + + def on_incomplete_file_received(self, file): + # remove partially uploaded files + os.remove(file) + + +def main(): + cfg = config() + try: + authorizer = UnixAuthorizer(rejected_users=["root"], + require_valid_shell=True) + handler = ASEHandler + handler.cfg = cfg + handler.authorizer = authorizer + handler.abstracted_fs = UnixFilesystem + handler.masquerade_address = cfg.proxyaddr + _range = list(range(cfg.firstport, cfg.firstport + 20)) + handler.passive_ports = _range + + logging.basicConfig( + format="%(asctime)s %(message)s", + filename="/var/log/" + cfg.logfilename, + level=logging.INFO, + ) + + server = FTPServer(("0.0.0.0", 21), handler) + + server.serve_forever() + except KeyboardInterrupt: + logging.info("PID {:>5} >> Info: {}.".format( + os.getpid(), "Shutdown requested...exiting")) + except Exception: + print("{} - PID {:>5} >> Error: {}.".format(ts.timestamp("log"), + os.getpid(), + sys.exc_info()[1])) + + +if __name__ == "__main__": + main() diff --git a/FtpCsvReceiver.py b/FtpCsvReceiver.py index 76d51e7..07b5deb 100755 --- a/FtpCsvReceiver.py +++ b/FtpCsvReceiver.py @@ -10,43 +10,15 @@ import logging from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError from email.mime.text import MIMEText -from configparser import ConfigParser from asebat.timefmt import timestamp_fmt as ts from asebat.timefmt import date_refmt as df +from asebat.config import set_config as config from pyftpdlib.handlers import FTPHandler from pyftpdlib.servers import FTPServer from pyftpdlib.authorizers import UnixAuthorizer from pyftpdlib.filesystems import UnixFilesystem - - -class config(): - def __init__(self): - c = ConfigParser() - c.read([ - "/etc/ase/ftpcsvreceiver.ini", - "./ftpcsvreceiver.ini" - ]) - - self.firstport = c.getint("ftpserver", "firstport") - self.logfilename = c.get("ftpserver", "logfilename") - self.proxyaddr = c.get("ftpserver", "proxyaddr") - - self.smtphost = c.get("mailserver", "hostname") - self.smtpport = c.getint("mailserver", "port") - self.sender = c.get("mailserver", "sender") - self.password = c.get("mailserver", "password") - self.receivers = c.get("mailserver", "receivers") - self.message = c.get("mailserver", "message") - self.subject = c.get("mailserver", "subject") - self.debuglevel = c.getint("mailserver", "debug") - - self.mqhost = c.get("mqserver", "hostname") - self.mquser = c.get("mqserver", "user") - self.mqpass = c.get("mqserver", "password") - - self.csvfs = c.get("csvfs", "path") def send_mail(sev, msg, cfg): msg = MIMEText(cfg.message + "\n" + msg) @@ -74,10 +46,10 @@ def write_queue(msg, cfg): ':5672/%2F') connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.queue_declare(queue='task_queue', durable=True) + channel.queue_declare(queue=cfg.queue, durable=True) channel.basic_publish( exchange='', - routing_key='task_queue', + routing_key=cfg.queue, body=msg, properties=pika.BasicProperties( delivery_mode=2, # make message persistent @@ -182,7 +154,7 @@ class ASEHandler(FTPHandler): def main(): - cfg = config() + cfg = config.config() try: authorizer = UnixAuthorizer(rejected_users=["root"], require_valid_shell=True) diff --git a/asebat/config/__init__.py b/asebat/config/__init__.py new file mode 100644 index 0000000..ae21170 --- /dev/null +++ b/asebat/config/__init__.py @@ -0,0 +1 @@ +"""Config ini setting""" \ No newline at end of file diff --git a/asebat/config/set_config.py b/asebat/config/set_config.py new file mode 100644 index 0000000..c4d55c9 --- /dev/null +++ b/asebat/config/set_config.py @@ -0,0 +1,29 @@ +"""set configurations + +""" +from configparser import ConfigParser + +class config(): + def __init__(self): + c = ConfigParser() + c.read(["/etc/ase/ftpcsvreceiver.ini", "./ftpcsvreceiver.ini"]) + + self.firstport = c.getint("ftpserver", "firstport") + self.logfilename = c.get("ftpserver", "logfilename") + self.proxyaddr = c.get("ftpserver", "proxyaddr") + + self.smtphost = c.get("mailserver", "hostname") + self.smtpport = c.getint("mailserver", "port") + self.sender = c.get("mailserver", "sender") + self.password = c.get("mailserver", "password") + self.receivers = c.get("mailserver", "receivers") + self.message = c.get("mailserver", "message") + self.subject = c.get("mailserver", "subject") + self.debuglevel = c.getint("mailserver", "debug") + + self.mqhost = c.get("mqserver", "hostname") + self.mquser = c.get("mqserver", "user") + self.mqpass = c.get("mqserver", "password") + self.queue = c.get("mqserver", "queue") + + self.csvfs = c.get("csvfs", "path") \ No newline at end of file diff --git a/ftpcsvreceiver.ini b/ftpcsvreceiver.ini index 6cde5e9..9d99fbe 100644 --- a/ftpcsvreceiver.ini +++ b/ftpcsvreceiver.ini @@ -20,6 +20,7 @@ hostname = 192.168.1.242 user = alex password = batt1l0 + queue = task_queue [csvfs] path = /home/ \ No newline at end of file