diff --git a/.gitignore b/.gitignore index 07d3e2b..3d084ea 100644 --- a/.gitignore +++ b/.gitignore @@ -7,8 +7,10 @@ *.txt *.pyz *.pyo +*.log +*.lock pyftpdlib_example.py -prova.py -dist/FtpCsvReceiver -prova1.py -prova_mail.py +prova*.py +FtpCsvReceiver +ase-receiver/ase-receiver/provaftp.py +.envrc diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index 500dc13..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - - { - "name": "Python: File corrente", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "integratedTerminal" - } - ] -} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index d82c837..dfb0c2f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,4 @@ { - "python.pythonPath": "/usr/bin/python3.8", - "python.formatting.provider": "yapf" + "docwriter.style": "Google", + "python.formatting.provider": "autopep8" } \ No newline at end of file diff --git a/CsvLoader.py b/CsvLoader.py deleted file mode 100755 index 7d0f2ab..0000000 --- a/CsvLoader.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/python3 - -import sys -import os -import pika -import logging -import csv -import re -import mariadb -import shutil - -from asebat.timefmt import timestamp_fmt as ts -from asebat.timefmt import date_refmt as df -from asebat.config import set_config as setting - - -class sqlraw(): - def __init__(self, cfg): - self.config = { - 'host': cfg.dbhost, - 'user': cfg.dbuser, - 'password': cfg.dbpass - } - self.dbname = cfg.dbname - self.table = cfg.table - self.sql_head = ( - "INSERT IGNORE INTO " + self.dbname + "." + self.table + - " (`UnitName`,`ToolName`,`eventDT`,`BatteryLevel`,`Temperature`,`NodeNum`," - + "`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`," + - "`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`) VALUES ") - - def add_data(self, values): - self.sql = self.sql_head + '(' + '),('.join(values) + ');' - - def write_db(self): - try: - conn = mariadb.connect(**self.config, database=self.dbname) - except mariadb.Error as err: - logging.error("PID {:>5} >> Error to connet to DB {} - System error {}.".format( - os.getpid(), self.dbname, err)) - sys.exit(1) - - cur = conn.cursor() - try: - cur.execute(self.sql) - except mariadb.ProgrammingError as err: - logging.error("PID {:>5} >> Error write into DB {} - System error {}.".format( - os.getpid(), self.dbname, err)) - print(err) - sys.exit(1) - - finally: - conn.close() - - -def callback_ase(ch, method, properties, body, config): #body è di tipo byte - logging.info("PID {0:>5} >> Read message {1}".format(os.getpid(), body.decode("utf-8"))) - msg = body.decode("utf-8").split(";") - sql = sqlraw(config) - stmlst = [] - commonData = '"{0}","{1}"'.format(msg[1],msg[2]) - tooltype = msg[3] - with open(msg[6], "r") as csvfile: - lines = csvfile.read().splitlines() - for line in lines: - - fields = line.split(";|;") - - if (mG501 := re.match( - r"^(\d\d\d\d\/\d\d\/\d\d\s\d\d:\d\d:\d\d);(.+);(.+)$", - fields[0] - )): - rowData = ',"{0}",{1},{2}'.format(mG501.group(1), mG501.group(2), mG501.group(3)) - fields.pop(0) - - elif (mG201 := re.match( - r"^(\d\d\/\d\d\/\d\d\d\d\s\d\d:\d\d:\d\d)$", - fields[0] - )): - mbtG201 = re.match( - r"^(.+);(.+)$", - fields[1] - ) - - rowData = ',"{0}",{1},{2}'.format(df.dateTimeFmt(mG201.group(1)), mbtG201.group(1), mbtG201.group(2)) - fields.pop(0) - fields.pop(0) - - else: - continue - - nodeNum = 0 - for field in fields: - nodeNum += 1 - vals = field.split(";") - stmlst.append(commonData + rowData + ',{0},'.format(nodeNum) + ','.join('"{0}"'.format(d) for d in vals) + ',' + ','.join(['null']*(config.valueNum-len(vals)))) - - if (config.maxInsertRow < len(stmlst)): - sql.add_data(stmlst) - try: - sql.write_db() - stmlst.clear() - except: - print("errore nell'inserimento") - sys.exit(1) - - if len(stmlst) > 0: - sql.add_data(stmlst) - try: - sql.write_db() - ch.basic_ack(delivery_tag=method.delivery_tag) - except: - print("errore nell'inserimento") - sys.exit(1) - newFilename = msg[6].replace("received", "loaded") - newPath, filenameExt = os.path.split(newFilename) - try: - os.makedirs(newPath) - logging.info("PID {:>5} >> path {} created.".format( - os.getpid(), newPath)) - except FileExistsError: - logging.info("PID {:>5} >> path {} already exists.".format( - os.getpid(), newPath)) - try: - shutil.move(msg[6], newFilename) - logging.info("PID {:>5} >> {} moved into {}.".format( - os.getpid(), filenameExt, newFilename)) - except OSError: - logging.error("PID {:>5} >> Error to move {} into {}.".format( - os.getpid(), filenameExt, newFilename)) - - -def main(): - cfg = setting.config() - - logging.basicConfig( - format="%(asctime)s %(message)s", - filename="/var/log/" + cfg.elablog, - level=logging.INFO, - ) - - parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + cfg.mqpass + - '@' + cfg.mqhost + ':' + cfg.mqport +'/%2F') - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.queue_declare(queue=cfg.csv_queue, durable=True) - - channel.basic_qos(prefetch_count=1) - channel.basic_consume( - queue=cfg.csv_queue, - on_message_callback=lambda ch, method, properties, body: callback_ase(ch, method, properties, body, config=cfg) - ) - - #channel.basic_consume(queue=cfg.csv_queue, on_message_callback=callback,arguments=cfg) - try: - channel.start_consuming() - except KeyboardInterrupt: - logging.info("PID {0:>5} >> Info: {1}.".format( - os.getpid(), "Shutdown requested...exiting")) - - -if __name__ == "__main__": - main() diff --git a/FtpCsvReceiver.py b/FtpCsvReceiver.py deleted file mode 100755 index 03d84bd..0000000 --- a/FtpCsvReceiver.py +++ /dev/null @@ -1,240 +0,0 @@ -#!/usr/bin/python3.8 - -import sys -import os -import shutil -import ssl -import pika -import re -import logging - -from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError -from email.mime.text import MIMEText - -from asebat.timefmt import timestamp_fmt as ts -from asebat.timefmt import date_refmt as df -from asebat.config import set_config as setting - -from pyftpdlib.handlers import FTPHandler -from pyftpdlib.servers import FTPServer -from pyftpdlib.authorizers import UnixAuthorizer -from pyftpdlib.filesystems import UnixFilesystem - - -def send_mail(sev, msg, cfg): - msg = MIMEText(cfg.message + "\n" + msg) - msg["Subject"] = cfg.subject + " " + sev - msg["From"] = cfg.sender - msg["To"] = cfg.receivers - conn = SMTP(host=cfg.smtphost, - port=cfg.smtpport, - local_hostname=None, - timeout=5, - source_address=None) - conn.set_debuglevel(cfg.debuglevel) - try: - conn.login(cfg.sender, cfg.password) - conn.sendmail(cfg.sender, cfg.receivers, msg.as_string()) - except SMTPAuthenticationError: - logging.error("PID {:>5} >> Mail failed: {}.".format( - os.getpid(), "SMTP authentication error")) - except: - logging.info("PID {:>5} >> Mail failed: {}.".format( - os.getpid(), "CUSTOM_ERROR")) - finally: - conn.quit() - - -class mq(): - def __init__(self, cfg): - parameters = pika.URLParameters('amqp://' + cfg.mquser + ':' + - cfg.mqpass + '@' + cfg.mqhost + ':' + - cfg.mqport + '/%2F') - connection = pika.BlockingConnection(parameters) - self.channel = connection.channel() - self.channel.queue_declare(queue=cfg.csv_queue, durable=True) - - def write(self, msg, cfg): - try: - self.channel.basic_publish( - exchange='', - routing_key=cfg.csv_queue, - body=msg, - properties=pika.BasicProperties( - delivery_mode=2, # make message persistent - )) - logging.info("PID {:>5} >> write message {} in queue".format( - os.getpid(), msg)) - except: - logging.error( - "PID {:>5} >> error write message {} in queue".format( - os.getpid(), msg)) - - def close(self): - self.channel.close() - - -class ASEHandler(FTPHandler): - def on_file_received(self, file): - unitType = '' - unitName = '' - toolName = '' - toolType = '' - fileDate = '' - fileTime = '' - queue = '' - if not os.stat(file).st_size: - os.remove(file) - logging.info("PID {:>5} >> file {} was empty: removed.".format( - os.getpid(), file)) - else: - cfg = self.cfg - path, filenameExt = os.path.split(file) - filename, fileExtension = os.path.splitext(filenameExt) - - if (m := re.match( - r"^(G\d\d\d)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d)(\d\d)(\d\d)(\d\d)$", - filename, - re.I, - )): - unitType = m.group(1).upper() - unitName = m.group(2).upper() - toolName = m.group(3).upper() - toolType = "N/A" - fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4) - fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9) - elif re.match( - r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", - filename, re.I): - with open(file, "r") as fileCsv: - try: - for i, line in enumerate(fileCsv.readlines(4096), 1): - if (m1 := re.match( - r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$", - line, - re.I, - )): - fileDate = m1.group(2) - fileTime = m1.group(3) - - elif (m2 := re.match( - r"^(\w+\d+)\s(\w+\d+)\;*\n?$", - line, - re.I, - )): - unitType = m2.group(1).upper() - unitName = m2.group(2).upper() - - elif (m3 := re.match( - r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$", - line, re.I)): - if m3.group(2): - toolType = m3.group(1).upper() - toolName = m3.group(2).upper() - else: - toolType = "".join( - re.findall("^[a-zA-Z]+", - m3.group(1))).upper() - toolName = m3.group(1).upper() - break - except: - logging.error("PID {:>5} >> Error: {}.".format( - os.getpid(), - sys.exc_info()[1])) - fileCsv.close - - logging.info("PID {:>5} >> {} - {} - {} - {} - {} {}.".format( - os.getpid(), - unitType, - unitName, - toolName, - toolType, - df.dateFmt(fileDate), - fileTime, - )) - newPath = cfg.csvfs + self.username + "/received/" + unitName.upper( - ) + "/" - newFilename = (newPath + filename + "_" + - str(ts.timestamp("tms") + fileExtension)) - fileRenamed = (file + "_" + str(ts.timestamp("tms"))) - os.rename(file, fileRenamed) - try: - os.makedirs(newPath) - logging.info("PID {:>5} >> path {} created.".format( - os.getpid(), newPath)) - except FileExistsError: - logging.info("PID {:>5} >> path {} already exists.".format( - os.getpid(), newPath)) - try: - shutil.move(fileRenamed, newFilename) - logging.info("PID {:>5} >> {} moved into {}.".format( - os.getpid(), filenameExt, newFilename)) - except OSError: - logging.error("PID {:>5} >> Error to move {} into {}.".format( - os.getpid(), filenameExt, newFilename)) - send_mail( - "Error", - "OS error move " + filenameExt + " to " + newFilename, cfg) - - mq_message = "{};{};{};{};{};{};{}".format( - unitType, - unitName, - toolName, - toolType, - df.dateFmt(fileDate), - fileTime, - newFilename, - ) - try: - queue = mq(cfg) - queue.write(mq_message, cfg) - logging.info("PID {:>5} >> queue message: {}.".format( - os.getpid(), mq_message)) - except: - logging.error( - "PID {:>5} >> Error to put message in queue: {}.".format( - os.getpid(), mq_message)) - send_mail("Error", - "Error to put message " + mq_message + " in queue.", - cfg) - finally: - queue.close() - - def on_incomplete_file_received(self, file): - # remove partially uploaded files - os.remove(file) - - -def main(): - cfg = setting.config() - try: - authorizer = UnixAuthorizer(rejected_users=["root"], - require_valid_shell=True) - handler = ASEHandler - handler.cfg = cfg - handler.authorizer = authorizer - handler.abstracted_fs = UnixFilesystem - handler.masquerade_address = cfg.proxyaddr - _range = list(range(cfg.firstport, cfg.firstport + 20)) - handler.passive_ports = _range - - logging.basicConfig( - format="%(asctime)s %(message)s", - filename="/var/log/" + cfg.logfilename, - level=logging.INFO, - ) - - server = FTPServer(("0.0.0.0", 21), handler) - - server.serve_forever() - except KeyboardInterrupt: - logging.info("PID {:>5} >> Info: {}.".format( - os.getpid(), "Shutdown requested...exiting")) - except Exception: - print("{} - PID {:>5} >> Error: {}.".format(ts.timestamp("log"), - os.getpid(), - sys.exc_info()[1])) - - -if __name__ == "__main__": - main() diff --git a/ase-receiver/ase-receiver/CsvLoader.py b/ase-receiver/ase-receiver/CsvLoader.py new file mode 100755 index 0000000..3d0087f --- /dev/null +++ b/ase-receiver/ase-receiver/CsvLoader.py @@ -0,0 +1,197 @@ +#!/usr/bin/python3 + +import sys +import os +import pika +import logging +import csv +import re +import mariadb +import shutil + +from utils.timefmt import timestamp_fmt as ts +from utils.timefmt import date_refmt as df +from utils.config import set_config as setting + + +class sqlraw: + def __init__(self, cfg): + self.config = {"host": cfg.dbhost, "user": cfg.dbuser, "password": cfg.dbpass} + self.dbname = cfg.dbname + self.table = cfg.table + self.sql_head = ( + "INSERT IGNORE INTO " + + self.dbname + + "." + + self.table + + " (`UnitName`,`ToolName`,`eventDT`,`BatteryLevel`,`Temperature`,`NodeNum`," + + "`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`," + + "`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`) VALUES " + ) + + def add_data(self, values): + self.sql = self.sql_head + "(" + "),(".join(values) + ");" + + def write_db(self): + try: + conn = mariadb.connect(**self.config, database=self.dbname) + except mariadb.Error as err: + logging.error( + "PID {:>5} >> Error to connet to DB {} - System error {}.".format( + os.getpid(), self.dbname, err + ) + ) + sys.exit(1) + + cur = conn.cursor() + try: + cur.execute(self.sql) + except mariadb.ProgrammingError as err: + logging.error( + "PID {:>5} >> Error write into DB {} - System error {}.".format( + os.getpid(), self.dbname, err + ) + ) + print(err) + sys.exit(1) + + finally: + conn.close() + + +def callback_ase(ch, method, properties, body, config): # body è di tipo byte + logging.info( + "PID {0:>5} >> Read message {1}".format(os.getpid(), body.decode("utf-8")) + ) + msg = body.decode("utf-8").split(";") + sql = sqlraw(config) + stmlst = [] + commonData = '"{0}","{1}"'.format(msg[1], msg[2]) + tooltype = msg[3] + with open(msg[6], "r") as csvfile: + lines = csvfile.read().splitlines() + for line in lines: + + fields = line.split(";|;") + + if mG501 := re.match( + r"^(\d\d\d\d\/\d\d\/\d\d\s\d\d:\d\d:\d\d);(.+);(.+)$", fields[0] + ): + rowData = ',"{0}",{1},{2}'.format( + mG501.group(1), mG501.group(2), mG501.group(3) + ) + fields.pop(0) + + elif mG201 := re.match( + r"^(\d\d\/\d\d\/\d\d\d\d\s\d\d:\d\d:\d\d)$", fields[0] + ): + mbtG201 = re.match(r"^(.+);(.+)$", fields[1]) + + rowData = ',"{0}",{1},{2}'.format( + df.dateTimeFmt(mG201.group(1)), mbtG201.group(1), mbtG201.group(2) + ) + fields.pop(0) + fields.pop(0) + + else: + continue + + nodeNum = 0 + for field in fields: + nodeNum += 1 + vals = field.split(";") + stmlst.append( + commonData + + rowData + + ",{0},".format(nodeNum) + + ",".join('"{0}"'.format(d) for d in vals) + + "," + + ",".join(["null"] * (config.valueNum - len(vals))) + ) + + if config.maxInsertRow < len(stmlst): + sql.add_data(stmlst) + try: + sql.write_db() + stmlst.clear() + except: + print("errore nell'inserimento") + sys.exit(1) + + if len(stmlst) > 0: + sql.add_data(stmlst) + try: + sql.write_db() + ch.basic_ack(delivery_tag=method.delivery_tag) + except: + print("errore nell'inserimento") + sys.exit(1) + newFilename = msg[6].replace("received", "loaded") + newPath, filenameExt = os.path.split(newFilename) + try: + os.makedirs(newPath) + logging.info("PID {:>5} >> path {} created.".format(os.getpid(), newPath)) + except FileExistsError: + logging.info( + "PID {:>5} >> path {} already exists.".format(os.getpid(), newPath) + ) + try: + shutil.move(msg[6], newFilename) + logging.info( + "PID {:>5} >> {} moved into {}.".format( + os.getpid(), filenameExt, newFilename + ) + ) + except OSError: + logging.error( + "PID {:>5} >> Error to move {} into {}.".format( + os.getpid(), filenameExt, newFilename + ) + ) + + +def main(): + cfg = setting.config() + + logging.basicConfig( + format="%(asctime)s %(message)s", + filename="/var/log/" + cfg.elablog, + level=logging.INFO, + ) + + parameters = pika.URLParameters( + "amqp://" + + cfg.mquser + + ":" + + cfg.mqpass + + "@" + + cfg.mqhost + + ":" + + cfg.mqport + + "/%2F" + ) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=cfg.csv_queue, durable=True) + + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + queue=cfg.csv_queue, + on_message_callback=lambda ch, method, properties, body: callback_ase( + ch, method, properties, body, config=cfg + ), + ) + + # channel.basic_consume(queue=cfg.csv_queue, on_message_callback=callback,arguments=cfg) + try: + channel.start_consuming() + except KeyboardInterrupt: + logging.info( + "PID {0:>5} >> Info: {1}.".format( + os.getpid(), "Shutdown requested...exiting" + ) + ) + + +if __name__ == "__main__": + main() diff --git a/ase-receiver/ase-receiver/FtpCsvReceiver.py b/ase-receiver/ase-receiver/FtpCsvReceiver.py new file mode 100755 index 0000000..f9dc214 --- /dev/null +++ b/ase-receiver/ase-receiver/FtpCsvReceiver.py @@ -0,0 +1,397 @@ +#!/usr/bin/env python3 + +import sys +import os +import shutil +import ssl +import pika +import re +import logging +import sqlite3 + +from hashlib import md5 +from pathlib import Path + +from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError +from email.mime.text import MIMEText + +from utils.time import timestamp_fmt as ts +from utils.time import date_refmt as df +from utils.config import set_config as setting + +from pyftpdlib.handlers import FTPHandler +from pyftpdlib.servers import FTPServer +from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed + + +def send_mail(sev, msg, cfg): + msg = MIMEText(cfg.message + "\n" + msg) + msg["Subject"] = cfg.subject + " " + sev + msg["From"] = cfg.sender + msg["To"] = cfg.receivers + conn = SMTP( + host=cfg.smtphost, + port=cfg.smtpport, + local_hostname=None, + timeout=5, + source_address=None, + ) + conn.set_debuglevel(cfg.debuglevel) + try: + conn.login(cfg.sender, cfg.password) + conn.sendmail(cfg.sender, cfg.receivers, msg.as_string()) + except SMTPAuthenticationError: + logging.error( + "PID {:>5} >> Mail failed: {}.".format( + os.getpid(), "SMTP authentication error" + ) + ) + except: + logging.info( + "PID {:>5} >> Mail failed: {}.".format(os.getpid(), "CUSTOM_ERROR") + ) + finally: + conn.quit() + + +class mq: + def __init__(self, cfg): + parameters = pika.URLParameters( + "amqp://" + + cfg.mquser + + ":" + + cfg.mqpass + + "@" + + cfg.mqhost + + ":" + + cfg.mqport + + "/%2F" + ) + connection = pika.BlockingConnection(parameters) + self.channel = connection.channel() + self.channel.queue_declare(queue=cfg.csv_queue, durable=True) + + def write(self, msg, cfg): + try: + self.channel.basic_publish( + exchange="", + routing_key=cfg.csv_queue, + body=msg, + properties=pika.BasicProperties( + delivery_mode=2, # make message persistent + ), + ) + logging.info( + "PID {:>5} >> write message {} in queue".format( + os.getpid(), msg) + ) + except: + logging.error( + "PID {:>5} >> error write message {} in queue".format( + os.getpid(), msg) + ) + + def close(self): + self.channel.close() + + +class DummyMD5Authorizer(DummyAuthorizer): + def validate_authentication(self, username, password, handler): + # if sys.version_info >= (3, 0): + # password = md5(password.encode("latin1")) + hash = md5(password.encode("UTF-8")).hexdigest() + try: + if self.user_table[username]["pwd"] != hash: + raise KeyError + except KeyError: + raise AuthenticationFailed + + +class ASEHandler(FTPHandler): + + def on_file_received(self, file): + unitType = "" + unitName = "" + toolName = "" + toolType = "" + fileDate = "" + fileTime = "" + queue = "" + if not os.stat(file).st_size: + os.remove(file) + logging.info( + "PID {:>5} >> file {} was empty: removed.".format( + os.getpid(), file) + ) + else: + cfg = self.cfg + path, filenameExt = os.path.split(file) + filename, fileExtension = os.path.splitext(filenameExt) + + if m := re.match( + r"^(G\d\d\d)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d)(\d\d)(\d\d)(\d\d)$", + filename, + re.I, + ): + unitType = m.group(1).upper() + unitName = m.group(2).upper() + toolName = m.group(3).upper() + toolType = "N/A" + fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4) + fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9) + elif re.match( + r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", filename, re.I + ): + with open(file, "r") as fileCsv: + try: + for i, line in enumerate(fileCsv.readlines(4096), 1): + if m1 := re.match( + r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$", + line, + re.I, + ): + fileDate = m1.group(2) + fileTime = m1.group(3) + + elif m2 := re.match( + r"^(\w+\d+)\s(\w+\d+)\;*\n?$", + line, + re.I, + ): + unitType = m2.group(1).upper() + unitName = m2.group(2).upper() + + elif m3 := re.match( + r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$", + line, + re.I, + ): + if m3.group(2): + toolType = m3.group(1).upper() + toolName = m3.group(2).upper() + else: + toolType = "".join( + re.findall("^[a-zA-Z]+", m3.group(1)) + ).upper() + toolName = m3.group(1).upper() + break + except: + logging.error( + "PID {:>5} >> Error: {}.".format( + os.getpid(), sys.exc_info()[1] + ) + ) + fileCsv.close + + logging.info( + "PID {:>5} >> {} - {} - {} - {} - {} {}.".format( + os.getpid(), + unitType, + unitName, + toolName, + toolType, + df.dateFmt(fileDate), + fileTime, + ) + ) + newPath = cfg.csvfs + self.username + "/received/" + unitName.upper() + "/" + newFilename = ( + newPath + filename + "_" + + str(ts.timestamp("tms") + fileExtension) + ) + fileRenamed = file + "_" + str(ts.timestamp("tms")) + os.rename(file, fileRenamed) + try: + os.makedirs(newPath) + logging.info( + "PID {:>5} >> path {} created.".format( + os.getpid(), newPath) + ) + except FileExistsError: + logging.info( + "PID {:>5} >> path {} already exists.".format( + os.getpid(), newPath) + ) + try: + shutil.move(fileRenamed, newFilename) + logging.info( + "PID {:>5} >> {} moved into {}.".format( + os.getpid(), filenameExt, newFilename + ) + ) + except OSError: + logging.error( + "PID {:>5} >> Error to move {} into {}.".format( + os.getpid(), filenameExt, newFilename + ) + ) + send_mail( + "Error", "OS error move " + filenameExt + " to " + newFilename, cfg + ) + + mq_message = "{};{};{};{};{};{};{}".format( + unitType, + unitName, + toolName, + toolType, + df.dateFmt(fileDate), + fileTime, + newFilename, + ) + try: + queue = mq(cfg) + queue.write(mq_message, cfg) + logging.info( + "PID {:>5} >> queue message: {}.".format( + os.getpid(), mq_message) + ) + except: + logging.error( + "PID {:>5} >> Error to put message in queue: {}.".format( + os.getpid(), mq_message + ) + ) + send_mail( + "Error", "Error to put message " + mq_message + " in queue.", cfg + ) + finally: + queue.close() + + def on_incomplete_file_received(self, file): + # remove partially uploaded files + os.remove(file) + + def ftp_SITE_ADDU(self, line): + """ + add virtual user and save virtuser cfg file + create virtuser dir in virtpath cfg path + """ + cfg = self.cfg + parms = line.split() + user = os.path.basename(parms[0]) + password = parms[1] + hash = md5(password.encode("UTF-8")).hexdigest() + + try: + Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True) + except: + self.responde('551 Error in create virtual user path.') + else: + try: + self.authorizer.add_user(str(user), + hash, cfg.virtpath + user, perm="lmw") + con = sqlite3.connect(cfg.virtusersdb) + cur = con.cursor() + cur.execute("INSERT INTO virtusers VALUES (?,?,?,?)", + (user, hash, cfg.virtpath + user, 'elmw')) + con.commit() + con.close() + self.respond('200 SITE ADDU successful.') + except: + self.respond('501 Invalid SITE ADDU format.') + + def ftp_SITE_REMU(self, line): + """ + remove virtual user and save virtuser cfg file + """ + cfg = self.cfg + parms = line.split() + user = os.path.basename(parms[0]) + try: + self.authorizer.remove_user(str(user)) + con = sqlite3.connect(cfg.virtusersdb) + cur = con.cursor() + cur.execute("DELETE FROM virtusers WHERE user = ?", (user,)) + con.commit() + con.close() + self.respond('200 SITE REMU successful.') + + except: + self.respond('501 Invalid SITE REMU format.') + + def ftp_SITE_LSTU(self, line): + """ + list virtual user + """ + cfg = self.cfg + users_list = [] + try: + con = sqlite3.connect(cfg.virtusersdb) + cur = con.cursor() + self.push("214-The following virtual users are defined:\r\n") + for row in cur.execute("SELECT * FROM virtusers").fetchall(): + users_list.append( + " Username: " + row[0] + " - Perms: " + row[3] + "\r\n") + con.close() + self.push(''.join(users_list)) + self.respond("214 LSTU SITE command successful.") + + except: + self.respond('501 list users failed.') + + +def main(): + cfg = setting.config() + + try: + proto_cmds = FTPHandler.proto_cmds.copy() + proto_cmds.update( + {'SITE ADDU': dict(perm='M', auth=True, arg=True, + help='Syntax: SITE ADDU USERNAME PASSWORD (add virtual user).')} + ) + proto_cmds.update( + {'SITE REMU': dict(perm='M', auth=True, arg=True, + help='Syntax: SITE REMU USERNAME (remove virtual user).')} + ) + proto_cmds.update( + {'SITE LSTU': dict(perm='M', auth=True, arg=None, + help='Syntax: SITE LSTU (list virtual users).')} + ) + + authorizer = DummyMD5Authorizer() + authorizer.add_user( + cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3]) + + con = sqlite3.connect(cfg.virtusersdb) + cur = con.cursor() + cur.execute( + '''CREATE TABLE IF NOT EXISTS virtusers (user text, hash text, virtpath text, perm text)''') + cur.execute( + '''CREATE INDEX IF NOT EXISTS user_idx on virtusers(user)''') + + for row in cur.execute('SELECT * FROM virtusers'): + authorizer.add_user(row[0], row[1], row[2], perm=row[3]) + con.close() + + handler = ASEHandler + handler.proto_cmds = proto_cmds + handler.cfg = cfg + handler.authorizer = authorizer + handler.masquerade_address = cfg.proxyaddr + _range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth)) + handler.passive_ports = _range + + logging.basicConfig( + format="%(asctime)s %(message)s", + filename=cfg.logfilename, + level=logging.INFO, + ) + + server = FTPServer(("0.0.0.0", 2121), handler) + + server.serve_forever() + except KeyboardInterrupt: + logging.info( + "PID {:>5} >> Info: {}.".format( + os.getpid(), "Shutdown requested...exiting") + ) + except Exception: + print( + "{} - PID {:>5} >> Error: {}.".format( + ts.timestamp("log"), os.getpid(), sys.exc_info()[1] + ) + ) + + +if __name__ == "__main__": + main() diff --git a/ase-receiver/ase-receiver/demonize_ftpd.py b/ase-receiver/ase-receiver/demonize_ftpd.py new file mode 100644 index 0000000..4ba63e1 --- /dev/null +++ b/ase-receiver/ase-receiver/demonize_ftpd.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2007 Giampaolo Rodola' . +# Use of this source code is governed by MIT license that can be +# found in the LICENSE file. + +"""A basic unix daemon using the python-daemon library: +http://pypi.python.org/pypi/python-daemon + +Example usages: + + $ python unix_daemon.py start + $ python unix_daemon.py stop + $ python unix_daemon.py status + $ python unix_daemon.py # foreground (no daemon) + $ python unix_daemon.py --logfile /var/log/ftpd.log start + $ python unix_daemon.py --pidfile /var/run/ftpd.pid start + +This is just a proof of concept which demonstrates how to daemonize +the FTP server. +You might want to use this as an example and provide the necessary +customizations. + +Parts you might want to customize are: + - UMASK, WORKDIR, HOST, PORT constants + - get_server() function (to define users and customize FTP handler) + +Authors: + - Ben Timby - btimby gmail.com + - Giampaolo Rodola' - g.rodola gmail.com +""" + +import atexit +import errno +import optparse +import os +import signal +import sys +import time + +from pyftpdlib.authorizers import UnixAuthorizer +from pyftpdlib.filesystems import UnixFilesystem +from pyftpdlib.handlers import FTPHandler +from pyftpdlib.servers import FTPServer + + +# overridable options +HOST = "" +PORT = 21 +PID_FILE = "/var/run/pyftpdlib.pid" +LOG_FILE = "/var/log/pyftpdlib.log" +WORKDIR = os.getcwd() +UMASK = 0 + + +def pid_exists(pid): + """Return True if a process with the given PID is currently running.""" + try: + os.kill(pid, 0) + except OSError as err: + return err.errno == errno.EPERM + else: + return True + + +def get_pid(): + """Return the PID saved in the pid file if possible, else None.""" + try: + with open(PID_FILE) as f: + return int(f.read().strip()) + except IOError as err: + if err.errno != errno.ENOENT: + raise + + +def stop(): + """Keep attempting to stop the daemon for 5 seconds, first using + SIGTERM, then using SIGKILL. + """ + pid = get_pid() + if not pid or not pid_exists(pid): + sys.exit("daemon not running") + sig = signal.SIGTERM + i = 0 + while True: + sys.stdout.write('.') + sys.stdout.flush() + try: + os.kill(pid, sig) + except OSError as err: + if err.errno == errno.ESRCH: + print("\nstopped (pid %s)" % pid) + return + else: + raise + i += 1 + if i == 25: + sig = signal.SIGKILL + elif i == 50: + sys.exit("\ncould not kill daemon (pid %s)" % pid) + time.sleep(0.1) + + +def status(): + """Print daemon status and exit.""" + pid = get_pid() + if not pid or not pid_exists(pid): + print("daemon not running") + else: + print("daemon running with pid %s" % pid) + sys.exit(0) + + +def get_server(): + """Return a pre-configured FTP server instance.""" + handler = FTPHandler + handler.authorizer = UnixAuthorizer() + handler.abstracted_fs = UnixFilesystem + server = FTPServer((HOST, PORT), handler) + return server + + +def daemonize(): + """A wrapper around python-daemonize context manager.""" + def _daemonize(): + pid = os.fork() + if pid > 0: + # exit first parent + sys.exit(0) + + # decouple from parent environment + os.chdir(WORKDIR) + os.setsid() + os.umask(0) + + # do second fork + pid = os.fork() + if pid > 0: + # exit from second parent + sys.exit(0) + + # redirect standard file descriptors + sys.stdout.flush() + sys.stderr.flush() + si = open(LOG_FILE, 'r') + so = open(LOG_FILE, 'a+') + se = open(LOG_FILE, 'a+', 0) + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + + # write pidfile + pid = str(os.getpid()) + with open(PID_FILE, 'w') as f: + f.write("%s\n" % pid) + atexit.register(lambda: os.remove(PID_FILE)) + + pid = get_pid() + if pid and pid_exists(pid): + sys.exit('daemon already running (pid %s)' % pid) + # instance FTPd before daemonizing, so that in case of problems we + # get an exception here and exit immediately + server = get_server() + _daemonize() + server.serve_forever() + + +def main(): + global PID_FILE, LOG_FILE + USAGE = "python [-p PIDFILE] [-l LOGFILE]\n\n" \ + "Commands:\n - start\n - stop\n - status" + parser = optparse.OptionParser(usage=USAGE) + parser.add_option('-l', '--logfile', dest='logfile', + help='the log file location') + parser.add_option('-p', '--pidfile', dest='pidfile', default=PID_FILE, + help='file to store/retreive daemon pid') + options, args = parser.parse_args() + + if options.pidfile: + PID_FILE = options.pidfile + if options.logfile: + LOG_FILE = options.logfile + + if not args: + server = get_server() + server.serve_forever() + else: + if len(args) != 1: + sys.exit('too many commands') + elif args[0] == 'start': + daemonize() + elif args[0] == 'stop': + stop() + elif args[0] == 'restart': + try: + stop() + finally: + daemonize() + elif args[0] == 'status': + status() + else: + sys.exit('invalid command') + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/ftpcsvreceiver.ini b/ase-receiver/ase-receiver/ftpcsvreceiver.ini similarity index 59% rename from ftpcsvreceiver.ini rename to ase-receiver/ase-receiver/ftpcsvreceiver.ini index e279dd1..0897d1b 100644 --- a/ftpcsvreceiver.ini +++ b/ase-receiver/ase-receiver/ftpcsvreceiver.ini @@ -1,7 +1,12 @@ [ftpserver] - firstport = 40000 - logfilename = ftppylog.log - proxyaddr = 0.0.0.0 + firstPort = 40000 + logFilename = ./ftppylog.log + proxyAddr = 0.0.0.0 + portRangeWidth = 50 + virtusersdb = /home/aseftp/virtusers.db + virtpath = /home/aseftp/ + adminuser = admin|c8cf955bd8b8a78419013b831e627eb2|/home/aseftp/|elradfmwMT + [mailserver] hostname = smtps.aruba.it @@ -10,7 +15,7 @@ password = taylor1964NFL! receivers = alessandro.battilani@gmail.com message = prova messaggio - + bbbbb ccccc subject = ciao a domani @@ -21,20 +26,20 @@ port = 5672 user = alex password = batt1l0 - csv_queue = task_queue - elab_queue = elab_queue - + csvQueue = task_queue + elabQueue = elab_queue + [csvfs] path = /home/ [csvelab] - logfilename = csvElab.log + logFilename = csvElab.log [db] hostname = 192.168.1.241 user = root password = batt1l0 - dbname = ase - tablename = rawdata + dbName = ase + tableName = rawdata maxInsertRow = 20000 valueNum = 16 \ No newline at end of file diff --git a/ase-receiver/ase-receiver/pyproject.toml b/ase-receiver/ase-receiver/pyproject.toml new file mode 100644 index 0000000..84a1cd3 --- /dev/null +++ b/ase-receiver/ase-receiver/pyproject.toml @@ -0,0 +1,16 @@ +[tool.poetry] +name = "ase-receiver" +version = "0.1.0" +description = "" +authors = ["Alessandro Battilani "] + +[tool.poetry.dependencies] +python = "^3.9" +pika = "^1.2.0" +pyftpdlib = "^1.5.6" + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/ase-receiver/ase-receiver/utils/__init__.py b/ase-receiver/ase-receiver/utils/__init__.py new file mode 100644 index 0000000..d325317 --- /dev/null +++ b/ase-receiver/ase-receiver/utils/__init__.py @@ -0,0 +1 @@ +"""Utilità""" diff --git a/ase-receiver/ase-receiver/utils/config/__init__.py b/ase-receiver/ase-receiver/utils/config/__init__.py new file mode 100644 index 0000000..09324b5 --- /dev/null +++ b/ase-receiver/ase-receiver/utils/config/__init__.py @@ -0,0 +1 @@ +"""Config ini setting""" diff --git a/asebat/config/set_config.py b/ase-receiver/ase-receiver/utils/config/set_config.py similarity index 57% rename from asebat/config/set_config.py rename to ase-receiver/ase-receiver/utils/config/set_config.py index d68ec93..da0ab2b 100644 --- a/asebat/config/set_config.py +++ b/ase-receiver/ase-receiver/utils/config/set_config.py @@ -4,14 +4,18 @@ from configparser import ConfigParser -class config(): +class config: def __init__(self): c = ConfigParser() - c.read(["/etc/ase/ftpcsvreceiver.ini", "./ftpcsvreceiver.ini"]) + c.read(["/etc/aseftp/ftpcsvreceiver.ini", "./ftpcsvreceiver.ini"]) # FTP setting - self.firstport = c.getint("ftpserver", "firstport") - self.logfilename = c.get("ftpserver", "logfilename") - self.proxyaddr = c.get("ftpserver", "proxyaddr") + self.firstport = c.getint("ftpserver", "firstPort") + self.logfilename = c.get("ftpserver", "logFilename") + self.proxyaddr = c.get("ftpserver", "proxyAddr") + self.portrangewidth = c.getint("ftpserver", "portRangeWidth") + self.virtusersdb = c.get("ftpserver", "virtusersdb") + self.virtpath = c.get("ftpserver", "virtpath") + self.adminuser = c.get("ftpserver", "adminuser").split("|") # MAIL setting self.smtphost = c.get("mailserver", "hostname") self.smtpport = c.getint("mailserver", "port") @@ -26,17 +30,17 @@ class config(): self.mqport = c.get("mqserver", "port") self.mquser = c.get("mqserver", "user") self.mqpass = c.get("mqserver", "password") - self.csv_queue = c.get("mqserver", "csv_queue") - self.elab_queue = c.get("mqserver", "elab_queue") + self.csv_queue = c.get("mqserver", "csvQueue") + self.elab_queue = c.get("mqserver", "elabQueue") # CSV FILE setting self.csvfs = c.get("csvfs", "path") # LOADER setting - self.elablog = c.get("csvelab", "logfilename") + self.elablog = c.get("csvelab", "logFilename") # DB setting self.dbhost = c.get("db", "hostname") self.dbuser = c.get("db", "user") self.dbpass = c.get("db", "password") - self.dbname = c.get("db", "dbname") - self.table = c.get("db", "tablename") + self.dbname = c.get("db", "dbName") + self.table = c.get("db", "tableName") self.valueNum = c.getint("db", "valueNum") - self.maxInsertRow = c.getint("db", "maxInsertRow") \ No newline at end of file + self.maxInsertRow = c.getint("db", "maxInsertRow") diff --git a/ase-receiver/ase-receiver/utils/parsers/G801_parser.py b/ase-receiver/ase-receiver/utils/parsers/G801_parser.py new file mode 100644 index 0000000..18da667 --- /dev/null +++ b/ase-receiver/ase-receiver/utils/parsers/G801_parser.py @@ -0,0 +1 @@ +locals diff --git a/ase-receiver/ase-receiver/utils/parsers/__init__.py b/ase-receiver/ase-receiver/utils/parsers/__init__.py new file mode 100644 index 0000000..645f1c4 --- /dev/null +++ b/ase-receiver/ase-receiver/utils/parsers/__init__.py @@ -0,0 +1 @@ +"""Parser delle centraline""" diff --git a/ase-receiver/ase-receiver/utils/time/__init__.py b/ase-receiver/ase-receiver/utils/time/__init__.py new file mode 100644 index 0000000..71166cd --- /dev/null +++ b/ase-receiver/ase-receiver/utils/time/__init__.py @@ -0,0 +1 @@ +"""Utilità per i formati timestamp""" diff --git a/asebat/timefmt/date_refmt.py b/ase-receiver/ase-receiver/utils/time/date_refmt.py similarity index 51% rename from asebat/timefmt/date_refmt.py rename to ase-receiver/ase-receiver/utils/time/date_refmt.py index 4246d8b..c62b522 100644 --- a/asebat/timefmt/date_refmt.py +++ b/ase-receiver/ase-receiver/utils/time/date_refmt.py @@ -4,20 +4,22 @@ import datetime + def dateFmt(date): - t = date.replace("/","-") + t = date.replace("/", "-") try: - datetime.datetime.strptime(t, '%Y-%m-%d') + datetime.datetime.strptime(t, "%Y-%m-%d") return t except ValueError: - d = datetime.datetime.strptime(t, '%d-%m-%Y') + d = datetime.datetime.strptime(t, "%d-%m-%Y") return datetime.datetime.strftime(d, "%Y-%m-%d") + def dateTimeFmt(date): - t = date.replace("/","-") + t = date.replace("/", "-") try: - datetime.datetime.strptime(t, '%Y-%m-%d %H:%M:%S') + datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S") return t except ValueError: - d = datetime.datetime.strptime(t, '%d-%m-%Y %H:%M:%S') - return datetime.datetime.strftime(d, "%Y-%m-%d %H:%M:%S") \ No newline at end of file + d = datetime.datetime.strptime(t, "%d-%m-%Y %H:%M:%S") + return datetime.datetime.strftime(d, "%Y-%m-%d %H:%M:%S") diff --git a/ase-receiver/ase-receiver/utils/time/dt_convert.py b/ase-receiver/ase-receiver/utils/time/dt_convert.py new file mode 100644 index 0000000..13392e2 --- /dev/null +++ b/ase-receiver/ase-receiver/utils/time/dt_convert.py @@ -0,0 +1,25 @@ +"""Funzioni per convertire formato data + +""" + +import datetime + + +def dateFmt(date): + t = date.replace("/", "-") + try: + datetime.datetime.strptime(t, "%Y-%m-%d") + return t + except ValueError: + d = datetime.datetime.strptime(t, "%d-%m-%Y") + return datetime.datetime.strftime(d, "%Y-%m-%d") + + +def dateTimeFmt(date): + t = date.replace("/", "-") + try: + datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S") + return t + except ValueError: + d = datetime.datetime.strptime(t, "%d-%m-%Y %H:%M:%S") + return datetime.datetime.strftime(d, "%Y-%m-%d %H:%M:%S") diff --git a/ase-receiver/ase-receiver/utils/time/timestamp_fmt.py b/ase-receiver/ase-receiver/utils/time/timestamp_fmt.py new file mode 100644 index 0000000..71ab82c --- /dev/null +++ b/ase-receiver/ase-receiver/utils/time/timestamp_fmt.py @@ -0,0 +1,10 @@ +"""Funzioni per timestamp + +""" + +import datetime + + +def timestamp(t): + fmt = {"log": "%Y-%m-%d %H:%M:%S", "tms": "%Y%m%d%H%M%S"} + return datetime.datetime.now().strftime(fmt[t]) diff --git a/asebat/__init__.py b/asebat/__init__.py deleted file mode 100644 index c24b50b..0000000 --- a/asebat/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Utilità""" \ No newline at end of file diff --git a/asebat/config/__init__.py b/asebat/config/__init__.py deleted file mode 100644 index ae21170..0000000 --- a/asebat/config/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Config ini setting""" \ No newline at end of file diff --git a/asebat/parsers/G801_parser.py b/asebat/parsers/G801_parser.py deleted file mode 100644 index a1d964f..0000000 --- a/asebat/parsers/G801_parser.py +++ /dev/null @@ -1 +0,0 @@ -locals \ No newline at end of file diff --git a/asebat/parsers/__init__.py b/asebat/parsers/__init__.py deleted file mode 100644 index 6d34771..0000000 --- a/asebat/parsers/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Parser delle centraline""" \ No newline at end of file diff --git a/asebat/timefmt/__init__.py b/asebat/timefmt/__init__.py deleted file mode 100644 index 582e8ed..0000000 --- a/asebat/timefmt/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Utilità per i formati timestamp""" \ No newline at end of file diff --git a/asebat/timefmt/timestamp_fmt.py b/asebat/timefmt/timestamp_fmt.py deleted file mode 100644 index 6bf9dbf..0000000 --- a/asebat/timefmt/timestamp_fmt.py +++ /dev/null @@ -1,9 +0,0 @@ -"""Funzioni per timestamp - -""" - -import datetime - -def timestamp(t): - fmt = {'log': '%Y-%m-%d %H:%M:%S', 'tms': '%Y%m%d%H%M%S'} - return datetime.datetime.now().strftime(fmt[t]) \ No newline at end of file diff --git a/build/FtpCsvReceiver/FtpCsvReceiver b/build/FtpCsvReceiver/FtpCsvReceiver deleted file mode 100755 index 421b0f4..0000000 Binary files a/build/FtpCsvReceiver/FtpCsvReceiver and /dev/null differ