commit 97abdc8dfa3ba84e916722a55c1565c99eefe13f Author: Alessandro Battilani Date: Sat Nov 16 15:20:50 2024 +0100 initial diff --git a/csvLoader/CsvLoader.py b/csvLoader/CsvLoader.py new file mode 100755 index 0000000..3d0087f --- /dev/null +++ b/csvLoader/CsvLoader.py @@ -0,0 +1,197 @@ +#!/usr/bin/python3 + +import sys +import os +import pika +import logging +import csv +import re +import mariadb +import shutil + +from utils.timefmt import timestamp_fmt as ts +from utils.timefmt import date_refmt as df +from utils.config import set_config as setting + + +class sqlraw: + def __init__(self, cfg): + self.config = {"host": cfg.dbhost, "user": cfg.dbuser, "password": cfg.dbpass} + self.dbname = cfg.dbname + self.table = cfg.table + self.sql_head = ( + "INSERT IGNORE INTO " + + self.dbname + + "." + + self.table + + " (`UnitName`,`ToolName`,`eventDT`,`BatteryLevel`,`Temperature`,`NodeNum`," + + "`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`," + + "`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`) VALUES " + ) + + def add_data(self, values): + self.sql = self.sql_head + "(" + "),(".join(values) + ");" + + def write_db(self): + try: + conn = mariadb.connect(**self.config, database=self.dbname) + except mariadb.Error as err: + logging.error( + "PID {:>5} >> Error to connet to DB {} - System error {}.".format( + os.getpid(), self.dbname, err + ) + ) + sys.exit(1) + + cur = conn.cursor() + try: + cur.execute(self.sql) + except mariadb.ProgrammingError as err: + logging.error( + "PID {:>5} >> Error write into DB {} - System error {}.".format( + os.getpid(), self.dbname, err + ) + ) + print(err) + sys.exit(1) + + finally: + conn.close() + + +def callback_ase(ch, method, properties, body, config): # body รจ di tipo byte + logging.info( + "PID {0:>5} >> Read message {1}".format(os.getpid(), body.decode("utf-8")) + ) + msg = body.decode("utf-8").split(";") + sql = sqlraw(config) + stmlst = [] + commonData = '"{0}","{1}"'.format(msg[1], msg[2]) + tooltype = msg[3] + with open(msg[6], "r") as csvfile: + lines = csvfile.read().splitlines() + for line in lines: + + fields = line.split(";|;") + + if mG501 := re.match( + r"^(\d\d\d\d\/\d\d\/\d\d\s\d\d:\d\d:\d\d);(.+);(.+)$", fields[0] + ): + rowData = ',"{0}",{1},{2}'.format( + mG501.group(1), mG501.group(2), mG501.group(3) + ) + fields.pop(0) + + elif mG201 := re.match( + r"^(\d\d\/\d\d\/\d\d\d\d\s\d\d:\d\d:\d\d)$", fields[0] + ): + mbtG201 = re.match(r"^(.+);(.+)$", fields[1]) + + rowData = ',"{0}",{1},{2}'.format( + df.dateTimeFmt(mG201.group(1)), mbtG201.group(1), mbtG201.group(2) + ) + fields.pop(0) + fields.pop(0) + + else: + continue + + nodeNum = 0 + for field in fields: + nodeNum += 1 + vals = field.split(";") + stmlst.append( + commonData + + rowData + + ",{0},".format(nodeNum) + + ",".join('"{0}"'.format(d) for d in vals) + + "," + + ",".join(["null"] * (config.valueNum - len(vals))) + ) + + if config.maxInsertRow < len(stmlst): + sql.add_data(stmlst) + try: + sql.write_db() + stmlst.clear() + except: + print("errore nell'inserimento") + sys.exit(1) + + if len(stmlst) > 0: + sql.add_data(stmlst) + try: + sql.write_db() + ch.basic_ack(delivery_tag=method.delivery_tag) + except: + print("errore nell'inserimento") + sys.exit(1) + newFilename = msg[6].replace("received", "loaded") + newPath, filenameExt = os.path.split(newFilename) + try: + os.makedirs(newPath) + logging.info("PID {:>5} >> path {} created.".format(os.getpid(), newPath)) + except FileExistsError: + logging.info( + "PID {:>5} >> path {} already exists.".format(os.getpid(), newPath) + ) + try: + shutil.move(msg[6], newFilename) + logging.info( + "PID {:>5} >> {} moved into {}.".format( + os.getpid(), filenameExt, newFilename + ) + ) + except OSError: + logging.error( + "PID {:>5} >> Error to move {} into {}.".format( + os.getpid(), filenameExt, newFilename + ) + ) + + +def main(): + cfg = setting.config() + + logging.basicConfig( + format="%(asctime)s %(message)s", + filename="/var/log/" + cfg.elablog, + level=logging.INFO, + ) + + parameters = pika.URLParameters( + "amqp://" + + cfg.mquser + + ":" + + cfg.mqpass + + "@" + + cfg.mqhost + + ":" + + cfg.mqport + + "/%2F" + ) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=cfg.csv_queue, durable=True) + + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + queue=cfg.csv_queue, + on_message_callback=lambda ch, method, properties, body: callback_ase( + ch, method, properties, body, config=cfg + ), + ) + + # channel.basic_consume(queue=cfg.csv_queue, on_message_callback=callback,arguments=cfg) + try: + channel.start_consuming() + except KeyboardInterrupt: + logging.info( + "PID {0:>5} >> Info: {1}.".format( + os.getpid(), "Shutdown requested...exiting" + ) + ) + + +if __name__ == "__main__": + main() diff --git a/csvLoader/transform_file.py b/csvLoader/transform_file.py new file mode 100644 index 0000000..fa23794 --- /dev/null +++ b/csvLoader/transform_file.py @@ -0,0 +1,102 @@ +import json +import psycopg2 +from sqlalchemy import create_engine, text + +# Configura la connessione al database PostgreSQL +engine = create_engine('postgresql://asepg:batt1l0@10.211.114.101:5432/asedb') + +def write_db(engine, records): + with engine.connect() as conn: + conn.execute(text(""" + INSERT INTO dataraw (nome_unit, tipo_centralina, nome_tool, tipo_tool, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) + VALUES + """ + ",".join([ + f"(:{i}_nome_unit, :{i}_tipo_centralina, :{i}_nome_tool, :{i}_tipo_tool, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)" + for i in range(len(records)) + ]) + """ + ON CONFLICT ON CONSTRAINT dataraw_unique + DO UPDATE SET + tipo_centralina = EXCLUDED.tipo_centralina, + tipo_tool = EXCLUDED.tipo_tool, + ip_centralina = EXCLUDED.ip_centralina, + ip_gateway = EXCLUDED.ip_gateway, + battery_level = EXCLUDED.battery_level, + temperature = EXCLUDED.temperature, + nodes_jsonb = EXCLUDED.nodes_jsonb; + """), {f"{i}_{key}": value for i, record in enumerate(records) for key, value in record.items()}) + + conn.commit() + + +# Leggi il file intero e separa l'intestazione dal resto dei dati +with open('DT0029_20241106044856.csv', 'r') as file: + lines = file.readlines() + +# Estrarre le informazioni dalle prime 7 righe +if len(lines) >= 7: + tipo_centralina = lines[1].split()[0] # Prima stringa nella seconda riga + nome_unit = lines[1].split()[1] # Seconda stringa nella seconda riga + ip_centralina = lines[2].split()[1] # IP della centralina dalla terza riga + ip_gateway = lines[4].split()[1] # IP del gateway dalla quinta riga + path_tool = lines[5].strip() # Path completo dalla sesta riga + nome_tool = path_tool.split('/')[-1].replace('.csv', '') # Ultima parte del percorso senza estensione + tipo_tool = path_tool.split('/')[-2] # Parte precedente al nome_tool + +else: + raise ValueError("Il file non contiene abbastanza righe per estrarre i dati richiesti.") + +records = [] +# Elabora le righe dei dati a partire dalla riga 8 in poi +for line in lines[7:]: + # Rimuovi spazi bianchi o caratteri di nuova riga + input_data = line.strip() + + # Suddividi la stringa in sezioni usando ";|;" come separatore + parts = input_data.split(';|;') + + # Verifica che ci siano almeno tre parti (timestamp, misure e nodi) + if len(parts) < 3: + print(f"Riga non valida: {input_data}") + continue + + # Estrai la data/ora e le prime misurazioni + timestamp = parts[0] + measurements = parts[1] + + # Estrai i valori di ciascun nodo e formatta i dati come JSON + nodes = parts[2:] + node_list = [] + for i, node_data in enumerate(nodes, start=1): + node_dict = {"num": i} + # Dividi ogni nodo in valori separati da ";" + node_values = node_data.split(';') + for j, value in enumerate(node_values, start=0): + # Imposta i valori a -9999 se trovi "Dis." + node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value) + node_list.append(node_dict) + + # Prepara i dati per l'inserimento/aggiornamento + record = { + "nome_unit": nome_unit.upper(), + "tipo_centralina": tipo_centralina, + "nome_tool": nome_tool.upper(), + "tipo_tool": tipo_tool, + "ip_centralina": ip_centralina, + "ip_gateway": ip_gateway, + "event_timestamp": timestamp, + "battery_level": float(measurements.split(';')[0]), + "temperature": float(measurements.split(';')[1]), + "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON + } + + records.append(record) + + # Se abbiamo raggiunto 1000 record, esegui l'inserimento in batch + if len(records) >= 500: + print("raggiunti 500 record scrivo sul db") + write_db(engine, records) + records = [] + +write_db(engine, records) + +print("Tutte le righe del file sono state caricate con successo nella tabella PostgreSQL!") \ No newline at end of file diff --git a/ftpReceiver/FtpCsvReceiver.py b/ftpReceiver/FtpCsvReceiver.py new file mode 100755 index 0000000..7e9d76c --- /dev/null +++ b/ftpReceiver/FtpCsvReceiver.py @@ -0,0 +1,378 @@ +#!/usr/bin/env python3 + +import sys +import os +import shutil +# import ssl +import pika +import re +import logging +import sqlite3 + +from hashlib import md5 +from pathlib import Path +from datetime import datetime + +from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError +from email.mime.text import MIMEText + +from utils.time import timestamp_fmt as ts +from utils.time import date_refmt as df +from utils.config import set_config as setting + +from pyftpdlib.handlers import FTPHandler, TLS_FTPHandler +from pyftpdlib.servers import FTPServer +from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed + + +def send_mail(sev, msg, cfg): + msg = MIMEText(cfg.message + "\n" + msg) + msg["Subject"] = cfg.subject + " " + sev + msg["From"] = cfg.sender + msg["To"] = cfg.receivers + conn = SMTP( + host=cfg.smtphost, + port=cfg.smtpport, + local_hostname=None, + timeout=5, + source_address=None, + ) + conn.set_debuglevel(cfg.debuglevel) + try: + conn.login(cfg.sender, cfg.password) + conn.sendmail(cfg.sender, cfg.receivers, msg.as_string()) + except SMTPAuthenticationError: + logging.error( + "Mail failed: {}.".format("SMTP authentication error") + ) + except: + logging.info( + "Mail failed: {}.".format("CUSTOM_ERROR") + ) + finally: + conn.quit() + + +class mq: + def __init__(self, cfg): + parameters = pika.URLParameters( + "amqp://" + + cfg.mquser + + ":" + + cfg.mqpass + + "@" + + cfg.mqhost + + ":" + + cfg.mqport + + "/%2F" + ) + connection = pika.BlockingConnection(parameters) + self.channel = connection.channel() + self.channel.queue_declare(queue=cfg.csv_queue, durable=True) + + def write(self, msg, cfg): + try: + props = pika.BasicProperties( + delivery_mode=2, + content_encoding='utf-8', + timestamp=msg["timestamp"],) + self.channel.basic_publish( + exchange="", + routing_key=cfg.csv_queue, + body=msg["payload"], + properties=props + ) + logging.info( + "Write message {} in queue".format(msg)) + except: + logging.error( + "Error write message {} in queue".format(msg)) + + def close(self): + self.channel.close() + + +class DummyMD5Authorizer(DummyAuthorizer): + def __init__(self, cfg): + super().__init__() + self.add_user( + cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3]) + + con = sqlite3.connect(cfg.virtusersdb) + cur = con.cursor() + cur.execute( + '''CREATE TABLE IF NOT EXISTS virtusers (user text, hash text, virtpath text, perm text)''') + cur.execute( + '''CREATE INDEX IF NOT EXISTS user_idx on virtusers(user)''') + + for row in cur.execute('SELECT * FROM virtusers'): + self.add_user(row[0], row[1], row[2], perm=row[3]) + con.close() + + def validate_authentication(self, username, password, handler): + hash = md5(password.encode("UTF-8")).hexdigest() + try: + if self.user_table[username]["pwd"] != hash: + raise KeyError + except KeyError: + raise AuthenticationFailed + + +class ASEHandler(FTPHandler): + + def __init__(self, conn, server, ioloop=None): + super().__init__(conn, server, ioloop) + self.proto_cmds = FTPHandler.proto_cmds.copy() + self.proto_cmds.update( + {'SITE ADDU': dict(perm='M', auth=True, arg=True, + help='Syntax: SITE ADDU USERNAME PASSWORD (add virtual user).')} + ) + self.proto_cmds.update( + {'SITE DELU': dict(perm='M', auth=True, arg=True, + help='Syntax: SITE DELU USERNAME (remove virtual user).')} + ) + self.proto_cmds.update( + {'SITE LSTU': dict(perm='M', auth=True, arg=None, + help='Syntax: SITE LSTU (list virtual users).')} + ) + + def on_file_received(self, file): + unitType = "" + unitName = "" + toolName = "" + toolType = "" + fileDate = "" + fileTime = "" + queue = "" + if not os.stat(file).st_size: + os.remove(file) + logging.info( + "File {} was empty: removed.".format(file)) + else: + cfg = self.cfg + path, filenameExt = os.path.split(file) + filename, fileExtension = os.path.splitext(filenameExt) + if (fileExtension.upper() in (cfg.fileext)): + if m := re.match( + r"^(G\d\d\d|GFLOW)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d|\d\d)(\d\d)(\d\d)(\d\d)$", + filename, + re.I, + ): + unitType = m.group(1).upper() + unitName = m.group(2).upper() + toolName = m.group(3).upper() + toolType = "N/A" + fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4) + fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9) + + elif re.match( + r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", filename, re.I + ): + with open(file, "r") as fileCsv: + try: + for i, line in enumerate(fileCsv.readlines(4096), 1): + if m1 := re.match( + r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$", + line, + re.I, + ): + fileDate = m1.group(2) + fileTime = m1.group(3) + + elif m2 := re.match( + r"^(\w+\d+)\s(\w+\d+)\;*\n?$", + line, + re.I, + ): + unitType = m2.group(1).upper() + unitName = m2.group(2).upper() + + elif m3 := re.match( + r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$", + line, + re.I, + ): + if m3.group(2): + toolType = m3.group(1).upper() + toolName = m3.group(2).upper() + else: + toolType = "".join( + re.findall( + "^[a-zA-Z]+", m3.group(1)) + ).upper() + toolName = m3.group(1).upper() + break + except: + logging.error( + "Error: {}.".format(sys.exc_info()[1])) + fileCsv.close + + logging.info( + "{} - {} - {} - {} - {} {}.".format( + unitType, + unitName, + toolName, + toolType, + df.dateFmt(fileDate), + fileTime, + ) + ) + newPath = cfg.csvfs + "/" + self.username + "/received/" + \ + unitName.upper() + "/" + newFilename = ( + newPath + filename + "_" + + str(ts.timestamp("tms") + fileExtension) + ) + fileRenamed = file + "_" + str(ts.timestamp("tms")) + os.rename(file, fileRenamed) + try: + os.makedirs(newPath) + logging.info("Path {} created.".format(newPath)) + except FileExistsError: + logging.info("Path {} already exists.".format(newPath)) + try: + shutil.move(fileRenamed, newFilename) + logging.info("{} moved into {}.".format( + filenameExt, newFilename)) + except OSError: + logging.error("Error to move {} into {}.".format( + filenameExt, newFilename)) + send_mail( + "Error", "OS error move " + filenameExt + " to " + newFilename, cfg + ) + now = datetime.now() + + mq_message = {"payload": "{};{};{};{};{};{};{}".format( + unitType, + unitName, + toolName, + toolType, + df.dateFmt(fileDate), + fileTime, + newFilename), + "timestamp": int(datetime.timestamp(now)*1000000) + } + try: + queue = mq(cfg) + queue.write(mq_message, cfg) + logging.info("Queue message: {}.".format(mq_message)) + except: + logging.error( + "Error to put message in queue: {}.".format(mq_message)) + send_mail( + "Error", "Error to put message " + mq_message + " in queue.", cfg + ) + finally: + queue.close() + + def on_incomplete_file_received(self, file): + # remove partially uploaded files + os.remove(file) + + def ftp_SITE_ADDU(self, line): + """ + add virtual user and save virtuser cfg file + create virtuser dir in virtpath cfg path + """ + cfg = self.cfg + parms = line.split() + user = os.path.basename(parms[0]) + password = parms[1] + hash = md5(password.encode("UTF-8")).hexdigest() + + try: + Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True) + except: + self.responde('551 Error in create virtual user path.') + else: + try: + self.authorizer.add_user(str(user), + hash, cfg.virtpath + "/" + user, perm="lmw") + con = sqlite3.connect(cfg.virtusersdb) + cur = con.cursor() + cur.execute("INSERT INTO virtusers VALUES (?,?,?,?)", + (user, hash, cfg.virtpath + user, 'elmw')) + con.commit() + con.close() + logging.info("User {} created.".format(user)) + self.respond('200 SITE ADDU successful.') + except: + self.respond('501 SITE ADDU failed.') + + def ftp_SITE_DELU(self, line): + """ + remove virtual user and save virtuser cfg file + """ + cfg = self.cfg + parms = line.split() + user = os.path.basename(parms[0]) + try: + self.authorizer.remove_user(str(user)) + con = sqlite3.connect(cfg.virtusersdb) + cur = con.cursor() + cur.execute("DELETE FROM virtusers WHERE user = ?", (user,)) + con.commit() + con.close() + logging.info("User {} deleted.".format(user)) + # self.push(' The user path has not been removed!\r\n') + self.respond('200 SITE DELU successful.') + + except: + self.respond('501 SITE DELU failed.') + + def ftp_SITE_LSTU(self, line): + """ + list virtual user + """ + cfg = self.cfg + users_list = [] + try: + con = sqlite3.connect(cfg.virtusersdb) + cur = con.cursor() + self.push("214-The following virtual users are defined:\r\n") + for row in cur.execute("SELECT * FROM virtusers").fetchall(): + users_list.append( + " Username: " + row[0] + "\tPerms: " + row[3] + "\r\n") + con.close() + self.push(''.join(users_list)) + self.respond("214 LSTU SITE command successful.") + + except: + self.respond('501 list users failed.') + + +def main(): + cfg = setting.config() + + try: + authorizer = DummyMD5Authorizer(cfg) + handler = ASEHandler + handler.cfg = cfg + handler.authorizer = authorizer + handler.masquerade_address = cfg.proxyaddr + _range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth)) + handler.passive_ports = _range + + logging.basicConfig( + format="%(asctime)s %(message)s", + filename=cfg.logfilename, + level=logging.INFO, + ) + + server = FTPServer(("0.0.0.0", 2121), handler) + + server.serve_forever() + except KeyboardInterrupt: + logging.info( + "Info: {}.".format("Shutdown requested...exiting") + )F + except Exception: + print( + "{} - PID {:>5} >> Error: {}.".format( + ts.timestamp("log"), os.getpid(), sys.exc_info()[1] + ) + ) + + +if __name__ == "__main__": + main() diff --git a/ftpReceiver/demonize_ftpd.py b/ftpReceiver/demonize_ftpd.py new file mode 100644 index 0000000..a972169 --- /dev/null +++ b/ftpReceiver/demonize_ftpd.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2007 Giampaolo Rodola' . +# Use of this source code is governed by MIT license that can be +# found in the LICENSE file. + +"""A basic unix daemon using the python-daemon library: +http://pypi.python.org/pypi/python-daemon + +Example usages: + + $ python unix_daemon.py start + $ python unix_daemon.py stop + $ python unix_daemon.py status + $ python unix_daemon.py # foreground (no daemon) + $ python unix_daemon.py --logfile /var/log/ftpd.log start + $ python unix_daemon.py --pidfile /var/run/ftpd.pid start + +This is just a proof of concept which demonstrates how to daemonize +the FTP server. +You might want to use this as an example and provide the necessary +customizations. + +Parts you might want to customize are: + - UMASK, WORKDIR, HOST, PORT constants + - get_server() function (to define users and customize FTP handler) + +Authors: + - Ben Timby - btimby gmail.com + - Giampaolo Rodola' - g.rodola gmail.com + +""" + +import atexit +import errno +import optparse +import os +import signal +import sys +import time + +from pyftpdlib.authorizers import UnixAuthorizer +from pyftpdlib.filesystems import UnixFilesystem +from pyftpdlib.handlers import FTPHandler +from pyftpdlib.servers import FTPServer + + +# overridable options +HOST = "" +PORT = 21 +PID_FILE = "/var/run/pyftpdlib.pid" +LOG_FILE = "/var/log/pyftpdlib.log" +WORKDIR = os.getcwd() +UMASK = 0 + + +def pid_exists(pid): + """Return True if a process with the given PID is currently running.""" + try: + os.kill(pid, 0) + except OSError as err: + return err.errno == errno.EPERM + else: + return True + + +def get_pid(): + """Return the PID saved in the pid file if possible, else None.""" + try: + with open(PID_FILE) as f: + return int(f.read().strip()) + except IOError as err: + if err.errno != errno.ENOENT: + raise + + +def stop(): + """Keep attempting to stop the daemon for 5 seconds, first using + SIGTERM, then using SIGKILL. + """ + pid = get_pid() + if not pid or not pid_exists(pid): + sys.exit("daemon not running") + sig = signal.SIGTERM + i = 0 + while True: + sys.stdout.write('.') + sys.stdout.flush() + try: + os.kill(pid, sig) + except OSError as err: + if err.errno == errno.ESRCH: + print("\nstopped (pid %s)" % pid) + return + else: + raise + i += 1 + if i == 25: + sig = signal.SIGKILL + elif i == 50: + sys.exit("\ncould not kill daemon (pid %s)" % pid) + time.sleep(0.1) + + +def status(): + """Print daemon status and exit.""" + pid = get_pid() + if not pid or not pid_exists(pid): + print("daemon not running") + else: + print("daemon running with pid %s" % pid) + sys.exit(0) + + +def get_server(): + """Return a pre-configured FTP server instance.""" + handler = FTPHandler + handler.authorizer = UnixAuthorizer() + handler.abstracted_fs = UnixFilesystem + server = FTPServer((HOST, PORT), handler) + return server + + +def daemonize(): + """A wrapper around python-daemonize context manager.""" + def _daemonize(): + pid = os.fork() + if pid > 0: + # exit first parent + sys.exit(0) + + # decouple from parent environment + os.chdir(WORKDIR) + os.setsid() + os.umask(0) + + # do second fork + pid = os.fork() + if pid > 0: + # exit from second parent + sys.exit(0) + + # redirect standard file descriptors + sys.stdout.flush() + sys.stderr.flush() + si = open(LOG_FILE, 'r') + so = open(LOG_FILE, 'a+') + se = open(LOG_FILE, 'a+', 0) + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + + # write pidfile + pid = str(os.getpid()) + with open(PID_FILE, 'w') as f: + f.write("%s\n" % pid) + atexit.register(lambda: os.remove(PID_FILE)) + + pid = get_pid() + if pid and pid_exists(pid): + sys.exit('daemon already running (pid %s)' % pid) + # instance FTPd before daemonizing, so that in case of problems we + # get an exception here and exit immediately + server = get_server() + _daemonize() + server.serve_forever() + + +def main(): + global PID_FILE, LOG_FILE + USAGE = "python [-p PIDFILE] [-l LOGFILE]\n\n" \ + "Commands:\n - start\n - stop\n - status" + parser = optparse.OptionParser(usage=USAGE) + parser.add_option('-l', '--logfile', dest='logfile', + help='the log file location') + parser.add_option('-p', '--pidfile', dest='pidfile', default=PID_FILE, + help='file to store/retreive daemon pid') + options, args = parser.parse_args() + + if options.pidfile: + PID_FILE = options.pidfile + if options.logfile: + LOG_FILE = options.logfile + + if not args: + server = get_server() + server.serve_forever() + else: + if len(args) != 1: + sys.exit('too many commands') + elif args[0] == 'start': + daemonize() + elif args[0] == 'stop': + stop() + elif args[0] == 'restart': + try: + stop() + finally: + daemonize() + elif args[0] == 'status': + status() + else: + sys.exit('invalid command') + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/ftpReceiver/ftpcsvreceiver.ini b/ftpReceiver/ftpcsvreceiver.ini new file mode 100644 index 0000000..c44d6aa --- /dev/null +++ b/ftpReceiver/ftpcsvreceiver.ini @@ -0,0 +1,51 @@ +# to generete adminuser password hash: +# python3 -c 'from hashlib import md5;print(md5("????admin-password???".encode("UTF-8")).hexdigest())' +[ftpserver] + firstPort = 40000 + logFilename = ./ftppylog.log + proxyAddr = 0.0.0.0 + portRangeWidth = 500 + virtusersdb = /home/alex/aseftp/virtusers.db + virtpath = /home/alex/aseftp/ + adminuser = admin|c8cf955bd8b8a78419013b831e627eb2|/home/alex/aseftp/|elradfmwMT + servertype = FTPHandler + certfile = /home/alex/aseftp/keycert.pem + fileext = .CSV|.txt + #servertype = FTPHandler/TLS_FTPHandler + + +[mailserver] + hostname = smtps.aruba.it + port = 465 + sender = alessandro.battilani@aseltd.eu + password = taylor1964NFL! + receivers = alessandro.battilani@gmail.com + message = prova messaggio + + bbbbb + ccccc + subject = ciao a domani + debug = 0 + +[mqserver] + hostname = galera1 + port = 5672 + user = asemq + password = Ase2021 + csvQueue = task_queue + elabQueue = elab_queue + +[csvfs] + path = /home/alex/aseftp/csvfs/ + +[csvelab] + logFilename = csvElab.log + +[db] + hostname = 192.168.1.241 + user = root + password = batt1l0 + dbName = ase + tableName = rawdata + maxInsertRow = 20000 + valueNum = 16 \ No newline at end of file