#!/usr/bin/python3 import sys import os import shutil import ssl import pika import re import logging from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError from email.mime.text import MIMEText from configparser import ConfigParser from asebat.timefmt import timestamp_fmt as ts from asebat.timefmt import date_refmt as df from pyftpdlib.handlers import FTPHandler from pyftpdlib.servers import FTPServer from pyftpdlib.authorizers import UnixAuthorizer from pyftpdlib.filesystems import UnixFilesystem class config(): def __init__(self): c = ConfigParser() c.read([ "/etc/ase/ftpcsvreceiver.ini", "./ftpcsvreceiver.ini" ]) self.firstport = c.getint("ftpserver", "firstport") self.logfilename = c.get("ftpserver", "logfilename") self.proxyaddr = c.get("ftpserver", "proxyaddr") self.smtphost = c.get("mailserver", "hostname") self.smtpport = c.getint("mailserver", "port") self.sender = c.get("mailserver", "sender") self.password = c.get("mailserver", "password") self.receivers = c.get("mailserver", "receivers") self.message = c.get("mailserver", "message") self.subject = c.get("mailserver", "subject") self.debuglevel = c.getint("mailserver", "debug") self.mqhost = c.get("mqserver", "hostname") self.mquser = c.get("mqserver", "user") self.mqpass = c.get("mqserver", "password") self.csvfs = c.get("csvfs", "path") def send_mail(sev, msg, cfg): msg = MIMEText(cfg.message + "\n" + msg) msg["Subject"] = cfg.subject + " " + sev msg["From"] = cfg.sender msg["To"] = cfg.receivers conn = SMTP(host=cfg.smtphost, port=cfg.smtpport, local_hostname=None, timeout=5, source_address=None ) conn.set_debuglevel(cfg.debuglevel) try: conn.login(cfg.sender, cfg.password) conn.sendmail(cfg.sender, cfg.receivers, msg.as_string()) except SMTPAuthenticationError: logging.error("PID {:>5} >> Mail failed: {}.".format( os.getpid(), "SMTP authentication error")) except: logging.info("PID {:>5} >> Mail failed: {}.".format( os.getpid(), "CUSTOM_ERROR")) finally: conn.quit() def write_queue(msg, cfg): parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + cfg.mqpass + '@' + cfg.mqhost + ':5672/%2F') connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) channel.basic_publish( exchange='', routing_key='task_queue', body=msg, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) logging.info("PID {:>5} >> write message {} in queue".format( os.getpid(), msg)) connection.close() class ASEHandler(FTPHandler): def on_file_received(self, file): cfg = self.cfg path, filenameExt = os.path.split(file) filename, fileExtension = os.path.splitext(filenameExt) if (m := re.match( r"^(G\d\d\d)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d)(\d\d)(\d\d)(\d\d)$", filename, re.I, )): unitType = m.group(1).upper() unitName = m.group(2).upper() toolName = m.group(3).upper() toolType = "N/A" fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4) fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9) elif re.match(r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", filename, re.I): with open(file, "r") as fileCsv: try: for i, line in enumerate(fileCsv.readlines(4096), 1): if i == 1: m1 = re.match( r"File Creation Date: (\d*\/\d*\/\d*)\s(\d*:\d*:\d*).*", line, re.I, ) fileDate = m1.group(1) fileTime = m1.group(2) if i == 2: unitType, unitName = (line.strip("\n").replace( ";", "").replace(",", "").split(" ")) if i == 6: m6 = re.match(r"SD path: .*\/.*\/(.*)\/(.*)\..*", line, re.I) toolType = m6.group(1) toolName = m6.group(2) break except: logging.error("PID {:>5} >> Error: {}.".format( os.getpid(), sys.exc_info()[1])) fileCsv.close logging.info("PID {:>5} >> {} - {} - {} - {} - {} {}.".format( os.getpid(), unitType.upper(), unitName.upper(), toolName.upper(), toolType.upper(), df.dateFmt(fileDate), fileTime, )) newPath = cfg.csvfs + self.username + "/received/" + unitName.upper( ) + "/" newFilename = (newPath + filename + "_" + str(ts.timestamp("tms") + fileExtension)) try: os.makedirs(newPath) logging.info("PID {:>5} >> path {} created.".format( os.getpid(), newPath)) except FileExistsError: logging.info("PID {:>5} >> path {} already exists.".format( os.getpid(), newPath)) try: shutil.move(file, newFilename) logging.info("PID {:>5} >> {} moved into {}.".format( os.getpid(), filenameExt, newFilename)) except OSError: logging.error("PID {:>5} >> Error to move {} into {}.".format( os.getpid(), filenameExt, newFilename)) # send_mail("Error", # "OS error move " + filenameExt + " to " + newFilename, cfg) mq_message = "{};{};{};{};{};{};{}".format( unitType.upper(), unitName.upper(), toolName.upper(), toolType.upper(), df.dateFmt(fileDate), fileTime, newFilename, ) write_queue(mq_message, cfg) logging.info("PID {:>5} >> queue message: {}.".format( os.getpid(), mq_message)) def on_incomplete_file_received(self, file): # remove partially uploaded files os.remove(file) def main(): cfg = config() try: authorizer = UnixAuthorizer(rejected_users=["root"], require_valid_shell=True) handler = ASEHandler handler.cfg = cfg handler.authorizer = authorizer handler.abstracted_fs = UnixFilesystem handler.masquerade_address = cfg.proxyaddr _range = list(range(cfg.firstport, cfg.firstport + 20)) handler.passive_ports = _range logging.basicConfig( format="%(asctime)s %(message)s", filename="/var/log/" + cfg.logfilename, level=logging.INFO, ) server = FTPServer(("0.0.0.0", 21), handler) server.serve_forever() except KeyboardInterrupt: logging.info("PID {:>5} >> Info: {}.".format( os.getpid(), "Shutdown requested...exiting")) except Exception: print("{} - PID {:>5} >> Error: {}.".format(ts.timestamp("log"), os.getpid(), sys.exc_info()[1])) if __name__ == "__main__": main()