This commit is contained in:
2024-11-16 15:20:50 +01:00
commit 97abdc8dfa
5 changed files with 935 additions and 0 deletions

378
ftpReceiver/FtpCsvReceiver.py Executable file
View File

@@ -0,0 +1,378 @@
#!/usr/bin/env python3
import sys
import os
import shutil
# import ssl
import pika
import re
import logging
import sqlite3
from hashlib import md5
from pathlib import Path
from datetime import datetime
from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError
from email.mime.text import MIMEText
from utils.time import timestamp_fmt as ts
from utils.time import date_refmt as df
from utils.config import set_config as setting
from pyftpdlib.handlers import FTPHandler, TLS_FTPHandler
from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
def send_mail(sev, msg, cfg):
msg = MIMEText(cfg.message + "\n" + msg)
msg["Subject"] = cfg.subject + " " + sev
msg["From"] = cfg.sender
msg["To"] = cfg.receivers
conn = SMTP(
host=cfg.smtphost,
port=cfg.smtpport,
local_hostname=None,
timeout=5,
source_address=None,
)
conn.set_debuglevel(cfg.debuglevel)
try:
conn.login(cfg.sender, cfg.password)
conn.sendmail(cfg.sender, cfg.receivers, msg.as_string())
except SMTPAuthenticationError:
logging.error(
"Mail failed: {}.".format("SMTP authentication error")
)
except:
logging.info(
"Mail failed: {}.".format("CUSTOM_ERROR")
)
finally:
conn.quit()
class mq:
def __init__(self, cfg):
parameters = pika.URLParameters(
"amqp://"
+ cfg.mquser
+ ":"
+ cfg.mqpass
+ "@"
+ cfg.mqhost
+ ":"
+ cfg.mqport
+ "/%2F"
)
connection = pika.BlockingConnection(parameters)
self.channel = connection.channel()
self.channel.queue_declare(queue=cfg.csv_queue, durable=True)
def write(self, msg, cfg):
try:
props = pika.BasicProperties(
delivery_mode=2,
content_encoding='utf-8',
timestamp=msg["timestamp"],)
self.channel.basic_publish(
exchange="",
routing_key=cfg.csv_queue,
body=msg["payload"],
properties=props
)
logging.info(
"Write message {} in queue".format(msg))
except:
logging.error(
"Error write message {} in queue".format(msg))
def close(self):
self.channel.close()
class DummyMD5Authorizer(DummyAuthorizer):
def __init__(self, cfg):
super().__init__()
self.add_user(
cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3])
con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor()
cur.execute(
'''CREATE TABLE IF NOT EXISTS virtusers (user text, hash text, virtpath text, perm text)''')
cur.execute(
'''CREATE INDEX IF NOT EXISTS user_idx on virtusers(user)''')
for row in cur.execute('SELECT * FROM virtusers'):
self.add_user(row[0], row[1], row[2], perm=row[3])
con.close()
def validate_authentication(self, username, password, handler):
hash = md5(password.encode("UTF-8")).hexdigest()
try:
if self.user_table[username]["pwd"] != hash:
raise KeyError
except KeyError:
raise AuthenticationFailed
class ASEHandler(FTPHandler):
def __init__(self, conn, server, ioloop=None):
super().__init__(conn, server, ioloop)
self.proto_cmds = FTPHandler.proto_cmds.copy()
self.proto_cmds.update(
{'SITE ADDU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).')}
)
self.proto_cmds.update(
{'SITE DELU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> DELU USERNAME (remove virtual user).')}
)
self.proto_cmds.update(
{'SITE LSTU': dict(perm='M', auth=True, arg=None,
help='Syntax: SITE <SP> LSTU (list virtual users).')}
)
def on_file_received(self, file):
unitType = ""
unitName = ""
toolName = ""
toolType = ""
fileDate = ""
fileTime = ""
queue = ""
if not os.stat(file).st_size:
os.remove(file)
logging.info(
"File {} was empty: removed.".format(file))
else:
cfg = self.cfg
path, filenameExt = os.path.split(file)
filename, fileExtension = os.path.splitext(filenameExt)
if (fileExtension.upper() in (cfg.fileext)):
if m := re.match(
r"^(G\d\d\d|GFLOW)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d|\d\d)(\d\d)(\d\d)(\d\d)$",
filename,
re.I,
):
unitType = m.group(1).upper()
unitName = m.group(2).upper()
toolName = m.group(3).upper()
toolType = "N/A"
fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4)
fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9)
elif re.match(
r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", filename, re.I
):
with open(file, "r") as fileCsv:
try:
for i, line in enumerate(fileCsv.readlines(4096), 1):
if m1 := re.match(
r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$",
line,
re.I,
):
fileDate = m1.group(2)
fileTime = m1.group(3)
elif m2 := re.match(
r"^(\w+\d+)\s(\w+\d+)\;*\n?$",
line,
re.I,
):
unitType = m2.group(1).upper()
unitName = m2.group(2).upper()
elif m3 := re.match(
r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$",
line,
re.I,
):
if m3.group(2):
toolType = m3.group(1).upper()
toolName = m3.group(2).upper()
else:
toolType = "".join(
re.findall(
"^[a-zA-Z]+", m3.group(1))
).upper()
toolName = m3.group(1).upper()
break
except:
logging.error(
"Error: {}.".format(sys.exc_info()[1]))
fileCsv.close
logging.info(
"{} - {} - {} - {} - {} {}.".format(
unitType,
unitName,
toolName,
toolType,
df.dateFmt(fileDate),
fileTime,
)
)
newPath = cfg.csvfs + "/" + self.username + "/received/" + \
unitName.upper() + "/"
newFilename = (
newPath + filename + "_" +
str(ts.timestamp("tms") + fileExtension)
)
fileRenamed = file + "_" + str(ts.timestamp("tms"))
os.rename(file, fileRenamed)
try:
os.makedirs(newPath)
logging.info("Path {} created.".format(newPath))
except FileExistsError:
logging.info("Path {} already exists.".format(newPath))
try:
shutil.move(fileRenamed, newFilename)
logging.info("{} moved into {}.".format(
filenameExt, newFilename))
except OSError:
logging.error("Error to move {} into {}.".format(
filenameExt, newFilename))
send_mail(
"Error", "OS error move " + filenameExt + " to " + newFilename, cfg
)
now = datetime.now()
mq_message = {"payload": "{};{};{};{};{};{};{}".format(
unitType,
unitName,
toolName,
toolType,
df.dateFmt(fileDate),
fileTime,
newFilename),
"timestamp": int(datetime.timestamp(now)*1000000)
}
try:
queue = mq(cfg)
queue.write(mq_message, cfg)
logging.info("Queue message: {}.".format(mq_message))
except:
logging.error(
"Error to put message in queue: {}.".format(mq_message))
send_mail(
"Error", "Error to put message " + mq_message + " in queue.", cfg
)
finally:
queue.close()
def on_incomplete_file_received(self, file):
# remove partially uploaded files
os.remove(file)
def ftp_SITE_ADDU(self, line):
"""
add virtual user and save virtuser cfg file
create virtuser dir in virtpath cfg path
"""
cfg = self.cfg
parms = line.split()
user = os.path.basename(parms[0])
password = parms[1]
hash = md5(password.encode("UTF-8")).hexdigest()
try:
Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True)
except:
self.responde('551 Error in create virtual user path.')
else:
try:
self.authorizer.add_user(str(user),
hash, cfg.virtpath + "/" + user, perm="lmw")
con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor()
cur.execute("INSERT INTO virtusers VALUES (?,?,?,?)",
(user, hash, cfg.virtpath + user, 'elmw'))
con.commit()
con.close()
logging.info("User {} created.".format(user))
self.respond('200 SITE ADDU successful.')
except:
self.respond('501 SITE ADDU failed.')
def ftp_SITE_DELU(self, line):
"""
remove virtual user and save virtuser cfg file
"""
cfg = self.cfg
parms = line.split()
user = os.path.basename(parms[0])
try:
self.authorizer.remove_user(str(user))
con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor()
cur.execute("DELETE FROM virtusers WHERE user = ?", (user,))
con.commit()
con.close()
logging.info("User {} deleted.".format(user))
# self.push(' The user path has not been removed!\r\n')
self.respond('200 SITE DELU successful.')
except:
self.respond('501 SITE DELU failed.')
def ftp_SITE_LSTU(self, line):
"""
list virtual user
"""
cfg = self.cfg
users_list = []
try:
con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor()
self.push("214-The following virtual users are defined:\r\n")
for row in cur.execute("SELECT * FROM virtusers").fetchall():
users_list.append(
" Username: " + row[0] + "\tPerms: " + row[3] + "\r\n")
con.close()
self.push(''.join(users_list))
self.respond("214 LSTU SITE command successful.")
except:
self.respond('501 list users failed.')
def main():
cfg = setting.config()
try:
authorizer = DummyMD5Authorizer(cfg)
handler = ASEHandler
handler.cfg = cfg
handler.authorizer = authorizer
handler.masquerade_address = cfg.proxyaddr
_range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth))
handler.passive_ports = _range
logging.basicConfig(
format="%(asctime)s %(message)s",
filename=cfg.logfilename,
level=logging.INFO,
)
server = FTPServer(("0.0.0.0", 2121), handler)
server.serve_forever()
except KeyboardInterrupt:
logging.info(
"Info: {}.".format("Shutdown requested...exiting")
)F
except Exception:
print(
"{} - PID {:>5} >> Error: {}.".format(
ts.timestamp("log"), os.getpid(), sys.exc_info()[1]
)
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,207 @@
#!/usr/bin/env python3
# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.
# Use of this source code is governed by MIT license that can be
# found in the LICENSE file.
"""A basic unix daemon using the python-daemon library:
http://pypi.python.org/pypi/python-daemon
Example usages:
$ python unix_daemon.py start
$ python unix_daemon.py stop
$ python unix_daemon.py status
$ python unix_daemon.py # foreground (no daemon)
$ python unix_daemon.py --logfile /var/log/ftpd.log start
$ python unix_daemon.py --pidfile /var/run/ftpd.pid start
This is just a proof of concept which demonstrates how to daemonize
the FTP server.
You might want to use this as an example and provide the necessary
customizations.
Parts you might want to customize are:
- UMASK, WORKDIR, HOST, PORT constants
- get_server() function (to define users and customize FTP handler)
Authors:
- Ben Timby - btimby <at> gmail.com
- Giampaolo Rodola' - g.rodola <at> gmail.com
"""
import atexit
import errno
import optparse
import os
import signal
import sys
import time
from pyftpdlib.authorizers import UnixAuthorizer
from pyftpdlib.filesystems import UnixFilesystem
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
# overridable options
HOST = ""
PORT = 21
PID_FILE = "/var/run/pyftpdlib.pid"
LOG_FILE = "/var/log/pyftpdlib.log"
WORKDIR = os.getcwd()
UMASK = 0
def pid_exists(pid):
"""Return True if a process with the given PID is currently running."""
try:
os.kill(pid, 0)
except OSError as err:
return err.errno == errno.EPERM
else:
return True
def get_pid():
"""Return the PID saved in the pid file if possible, else None."""
try:
with open(PID_FILE) as f:
return int(f.read().strip())
except IOError as err:
if err.errno != errno.ENOENT:
raise
def stop():
"""Keep attempting to stop the daemon for 5 seconds, first using
SIGTERM, then using SIGKILL.
"""
pid = get_pid()
if not pid or not pid_exists(pid):
sys.exit("daemon not running")
sig = signal.SIGTERM
i = 0
while True:
sys.stdout.write('.')
sys.stdout.flush()
try:
os.kill(pid, sig)
except OSError as err:
if err.errno == errno.ESRCH:
print("\nstopped (pid %s)" % pid)
return
else:
raise
i += 1
if i == 25:
sig = signal.SIGKILL
elif i == 50:
sys.exit("\ncould not kill daemon (pid %s)" % pid)
time.sleep(0.1)
def status():
"""Print daemon status and exit."""
pid = get_pid()
if not pid or not pid_exists(pid):
print("daemon not running")
else:
print("daemon running with pid %s" % pid)
sys.exit(0)
def get_server():
"""Return a pre-configured FTP server instance."""
handler = FTPHandler
handler.authorizer = UnixAuthorizer()
handler.abstracted_fs = UnixFilesystem
server = FTPServer((HOST, PORT), handler)
return server
def daemonize():
"""A wrapper around python-daemonize context manager."""
def _daemonize():
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
# decouple from parent environment
os.chdir(WORKDIR)
os.setsid()
os.umask(0)
# do second fork
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(LOG_FILE, 'r')
so = open(LOG_FILE, 'a+')
se = open(LOG_FILE, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
pid = str(os.getpid())
with open(PID_FILE, 'w') as f:
f.write("%s\n" % pid)
atexit.register(lambda: os.remove(PID_FILE))
pid = get_pid()
if pid and pid_exists(pid):
sys.exit('daemon already running (pid %s)' % pid)
# instance FTPd before daemonizing, so that in case of problems we
# get an exception here and exit immediately
server = get_server()
_daemonize()
server.serve_forever()
def main():
global PID_FILE, LOG_FILE
USAGE = "python [-p PIDFILE] [-l LOGFILE]\n\n" \
"Commands:\n - start\n - stop\n - status"
parser = optparse.OptionParser(usage=USAGE)
parser.add_option('-l', '--logfile', dest='logfile',
help='the log file location')
parser.add_option('-p', '--pidfile', dest='pidfile', default=PID_FILE,
help='file to store/retreive daemon pid')
options, args = parser.parse_args()
if options.pidfile:
PID_FILE = options.pidfile
if options.logfile:
LOG_FILE = options.logfile
if not args:
server = get_server()
server.serve_forever()
else:
if len(args) != 1:
sys.exit('too many commands')
elif args[0] == 'start':
daemonize()
elif args[0] == 'stop':
stop()
elif args[0] == 'restart':
try:
stop()
finally:
daemonize()
elif args[0] == 'status':
status()
else:
sys.exit('invalid command')
if __name__ == '__main__':
sys.exit(main())

View File

@@ -0,0 +1,51 @@
# to generete adminuser password hash:
# python3 -c 'from hashlib import md5;print(md5("????admin-password???".encode("UTF-8")).hexdigest())'
[ftpserver]
firstPort = 40000
logFilename = ./ftppylog.log
proxyAddr = 0.0.0.0
portRangeWidth = 500
virtusersdb = /home/alex/aseftp/virtusers.db
virtpath = /home/alex/aseftp/
adminuser = admin|c8cf955bd8b8a78419013b831e627eb2|/home/alex/aseftp/|elradfmwMT
servertype = FTPHandler
certfile = /home/alex/aseftp/keycert.pem
fileext = .CSV|.txt
#servertype = FTPHandler/TLS_FTPHandler
[mailserver]
hostname = smtps.aruba.it
port = 465
sender = alessandro.battilani@aseltd.eu
password = taylor1964NFL!
receivers = alessandro.battilani@gmail.com
message = prova messaggio
bbbbb
ccccc
subject = ciao a domani
debug = 0
[mqserver]
hostname = galera1
port = 5672
user = asemq
password = Ase2021
csvQueue = task_queue
elabQueue = elab_queue
[csvfs]
path = /home/alex/aseftp/csvfs/
[csvelab]
logFilename = csvElab.log
[db]
hostname = 192.168.1.241
user = root
password = batt1l0
dbName = ase
tableName = rawdata
maxInsertRow = 20000
valueNum = 16