This commit is contained in:
2024-11-16 16:57:30 +01:00
parent 968d1c3985
commit 21493d2fa3

View File

@@ -4,7 +4,7 @@ import sys
import os import os
import shutil import shutil
# import ssl # import ssl
import pika
import re import re
import logging import logging
import sqlite3 import sqlite3
@@ -13,9 +13,6 @@ from hashlib import md5
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from smtplib import SMTP_SSL as SMTP, SMTPException, SMTPAuthenticationError
from email.mime.text import MIMEText
from utils.time import timestamp_fmt as ts from utils.time import timestamp_fmt as ts
from utils.time import date_refmt as df from utils.time import date_refmt as df
from utils.config import set_config as setting from utils.config import set_config as setting
@@ -24,92 +21,30 @@ from pyftpdlib.handlers import FTPHandler, TLS_FTPHandler
from pyftpdlib.servers import FTPServer from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
def send_mail(sev, msg, cfg):
msg = MIMEText(cfg.message + "\n" + msg)
msg["Subject"] = cfg.subject + " " + sev
msg["From"] = cfg.sender
msg["To"] = cfg.receivers
conn = SMTP(
host=cfg.smtphost,
port=cfg.smtpport,
local_hostname=None,
timeout=5,
source_address=None,
)
conn.set_debuglevel(cfg.debuglevel)
try:
conn.login(cfg.sender, cfg.password)
conn.sendmail(cfg.sender, cfg.receivers, msg.as_string())
except SMTPAuthenticationError:
logging.error(
"Mail failed: {}.".format("SMTP authentication error")
)
except:
logging.info(
"Mail failed: {}.".format("CUSTOM_ERROR")
)
finally:
conn.quit()
class mq:
def __init__(self, cfg):
parameters = pika.URLParameters(
"amqp://"
+ cfg.mquser
+ ":"
+ cfg.mqpass
+ "@"
+ cfg.mqhost
+ ":"
+ cfg.mqport
+ "/%2F"
)
connection = pika.BlockingConnection(parameters)
self.channel = connection.channel()
self.channel.queue_declare(queue=cfg.csv_queue, durable=True)
def write(self, msg, cfg):
try:
props = pika.BasicProperties(
delivery_mode=2,
content_encoding='utf-8',
timestamp=msg["timestamp"],)
self.channel.basic_publish(
exchange="",
routing_key=cfg.csv_queue,
body=msg["payload"],
properties=props
)
logging.info(
"Write message {} in queue".format(msg))
except:
logging.error(
"Error write message {} in queue".format(msg))
def close(self):
self.channel.close()
class DummyMD5Authorizer(DummyAuthorizer): class DummyMD5Authorizer(DummyAuthorizer):
def __init__(self, cfg): def __init__(self, cfg):
# Initialize the DummyAuthorizer and add the admin user
super().__init__() super().__init__()
self.add_user( self.add_user(
cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3]) cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3])
# Connect to the SQLite database for virtual users
con = sqlite3.connect(cfg.virtusersdb) con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor() cur = con.cursor()
# Create the virtusers table if it doesn't exist
cur.execute( cur.execute(
'''CREATE TABLE IF NOT EXISTS virtusers (user text, hash text, virtpath text, perm text)''') '''CREATE TABLE IF NOT EXISTS virtusers (user text, hash text, virtpath text, perm text)''')
# Create an index on the user column for faster lookups
cur.execute( cur.execute(
'''CREATE INDEX IF NOT EXISTS user_idx on virtusers(user)''') '''CREATE INDEX IF NOT EXISTS user_idx on virtusers(user)''')
# Load existing virtual users from the database
for row in cur.execute('SELECT * FROM virtusers'): for row in cur.execute('SELECT * FROM virtusers'):
self.add_user(row[0], row[1], row[2], perm=row[3]) self.add_user(row[0], row[1], row[2], perm=row[3])
con.close() con.close()
def validate_authentication(self, username, password, handler): def validate_authentication(self, username, password, handler):
# Validate the user's password against the stored hash
hash = md5(password.encode("UTF-8")).hexdigest() hash = md5(password.encode("UTF-8")).hexdigest()
try: try:
if self.user_table[username]["pwd"] != hash: if self.user_table[username]["pwd"] != hash:
@@ -121,8 +56,10 @@ class DummyMD5Authorizer(DummyAuthorizer):
class ASEHandler(FTPHandler): class ASEHandler(FTPHandler):
def __init__(self, conn, server, ioloop=None): def __init__(self, conn, server, ioloop=None):
# Initialize the FTPHandler and add custom commands
super().__init__(conn, server, ioloop) super().__init__(conn, server, ioloop)
self.proto_cmds = FTPHandler.proto_cmds.copy() self.proto_cmds = FTPHandler.proto_cmds.copy()
# Add custom FTP commands for managing virtual users - command in lowercase
self.proto_cmds.update( self.proto_cmds.update(
{'SITE ADDU': dict(perm='M', auth=True, arg=True, {'SITE ADDU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).')} help='Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).')}
@@ -137,6 +74,7 @@ class ASEHandler(FTPHandler):
) )
def on_file_received(self, file): def on_file_received(self, file):
# Handle the event when a file is received
unitType = "" unitType = ""
unitName = "" unitName = ""
toolName = "" toolName = ""
@@ -144,6 +82,7 @@ class ASEHandler(FTPHandler):
fileDate = "" fileDate = ""
fileTime = "" fileTime = ""
queue = "" queue = ""
# Check if the file is empty and remove it if so
if not os.stat(file).st_size: if not os.stat(file).st_size:
os.remove(file) os.remove(file)
logging.info( logging.info(
@@ -152,7 +91,9 @@ class ASEHandler(FTPHandler):
cfg = self.cfg cfg = self.cfg
path, filenameExt = os.path.split(file) path, filenameExt = os.path.split(file)
filename, fileExtension = os.path.splitext(filenameExt) filename, fileExtension = os.path.splitext(filenameExt)
# Process the file if it has a valid extension
if (fileExtension.upper() in (cfg.fileext)): if (fileExtension.upper() in (cfg.fileext)):
# Match the filename against a specific pattern
if m := re.match( if m := re.match(
r"^(G\d\d\d|GFLOW)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d|\d\d)(\d\d)(\d\d)(\d\d)$", r"^(G\d\d\d|GFLOW)_(ID\d\d\d\d)_(DT\d\d\d\d)_(\d\d)(\d\d)(\d\d\d\d|\d\d)(\d\d)(\d\d)(\d\d)$",
filename, filename,
@@ -165,12 +106,15 @@ class ASEHandler(FTPHandler):
fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4) fileDate = m.group(6) + "/" + m.group(5) + "/" + m.group(4)
fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9) fileTime = m.group(7) + ":" + m.group(8) + ":" + m.group(9)
# Match against another pattern for different file formats
elif re.match( elif re.match(
r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", filename, re.I r"^(\d\d_\d\d\d\d|)(DT\d\d\d\d|LOC\d\d\d\d|GD\d\d\d\d)$", filename, re.I
): ):
with open(file, "r") as fileCsv: with open(file, "r") as fileCsv:
try: try:
# Read the CSV file line by line
for i, line in enumerate(fileCsv.readlines(4096), 1): for i, line in enumerate(fileCsv.readlines(4096), 1):
# Extract creation date and time
if m1 := re.match( if m1 := re.match(
r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$", r"^(File Creation Date:\s)?(\d*\/\d*\/\d*)\s(\d*:\d*:\d*)\;*\n?$",
line, line,
@@ -179,6 +123,7 @@ class ASEHandler(FTPHandler):
fileDate = m1.group(2) fileDate = m1.group(2)
fileTime = m1.group(3) fileTime = m1.group(3)
# Extract unit type and name
elif m2 := re.match( elif m2 := re.match(
r"^(\w+\d+)\s(\w+\d+)\;*\n?$", r"^(\w+\d+)\s(\w+\d+)\;*\n?$",
line, line,
@@ -187,6 +132,7 @@ class ASEHandler(FTPHandler):
unitType = m2.group(1).upper() unitType = m2.group(1).upper()
unitName = m2.group(2).upper() unitName = m2.group(2).upper()
# Extract tool type and name from the path
elif m3 := re.match( elif m3 := re.match(
r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$", r"^SD path: a:\/\w+\/(\w+)(?:\.\w+)?\/*(\w*)(?:\.\w+)?\;*\n?$",
line, line,
@@ -207,6 +153,7 @@ class ASEHandler(FTPHandler):
"Error: {}.".format(sys.exc_info()[1])) "Error: {}.".format(sys.exc_info()[1]))
fileCsv.close fileCsv.close
# Log the extracted information
logging.info( logging.info(
"{} - {} - {} - {} - {} {}.".format( "{} - {} - {} - {} - {} {}.".format(
unitType, unitType,
@@ -217,6 +164,7 @@ class ASEHandler(FTPHandler):
fileTime, fileTime,
) )
) )
# Prepare the new path for the received file
newPath = cfg.csvfs + "/" + self.username + "/received/" + \ newPath = cfg.csvfs + "/" + self.username + "/received/" + \
unitName.upper() + "/" unitName.upper() + "/"
newFilename = ( newFilename = (
@@ -224,70 +172,50 @@ class ASEHandler(FTPHandler):
str(ts.timestamp("tms") + fileExtension) str(ts.timestamp("tms") + fileExtension)
) )
fileRenamed = file + "_" + str(ts.timestamp("tms")) fileRenamed = file + "_" + str(ts.timestamp("tms"))
# Rename the original file to avoid conflicts
os.rename(file, fileRenamed) os.rename(file, fileRenamed)
try: try:
# Create the new directory for the user if it doesn't exist
os.makedirs(newPath) os.makedirs(newPath)
logging.info("Path {} created.".format(newPath)) logging.info("Path {} created.".format(newPath))
except FileExistsError: except FileExistsError:
logging.info("Path {} already exists.".format(newPath)) logging.info("Path {} already exists.".format(newPath))
try: try:
# Move the renamed file to the new location
shutil.move(fileRenamed, newFilename) shutil.move(fileRenamed, newFilename)
logging.info("{} moved into {}.".format( logging.info("{} moved into {}.".format(
filenameExt, newFilename)) filenameExt, newFilename))
except OSError: except OSError:
logging.error("Error to move {} into {}.".format( logging.error("Error to move {} into {}.".format(
filenameExt, newFilename)) filenameExt, newFilename))
send_mail(
"Error", "OS error move " + filenameExt + " to " + newFilename, cfg
)
now = datetime.now() now = datetime.now()
mq_message = {"payload": "{};{};{};{};{};{};{}".format(
unitType,
unitName,
toolName,
toolType,
df.dateFmt(fileDate),
fileTime,
newFilename),
"timestamp": int(datetime.timestamp(now)*1000000)
}
try:
queue = mq(cfg)
queue.write(mq_message, cfg)
logging.info("Queue message: {}.".format(mq_message))
except:
logging.error(
"Error to put message in queue: {}.".format(mq_message))
send_mail(
"Error", "Error to put message " + mq_message + " in queue.", cfg
)
finally:
queue.close()
def on_incomplete_file_received(self, file): def on_incomplete_file_received(self, file):
# remove partially uploaded files # Remove partially uploaded files
os.remove(file) os.remove(file)
def ftp_SITE_ADDU(self, line): def ftp_SITE_ADDU(self, line):
""" """
add virtual user and save virtuser cfg file Add a virtual user and save the virtuser configuration file.
create virtuser dir in virtpath cfg path Create a directory for the virtual user in the specified virtpath.
""" """
cfg = self.cfg cfg = self.cfg
parms = line.split() parms = line.split()
user = os.path.basename(parms[0]) user = os.path.basename(parms[0]) # Extract the username
password = parms[1] password = parms[1] # Get the password
hash = md5(password.encode("UTF-8")).hexdigest() hash = md5(password.encode("UTF-8")).hexdigest() # Hash the password
try: try:
# Create the user's directory
Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True) Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True)
except: except:
self.responde('551 Error in create virtual user path.') self.respond('551 Error in create virtual user path.')
else: else:
try: try:
# Add the user to the authorizer
self.authorizer.add_user(str(user), self.authorizer.add_user(str(user),
hash, cfg.virtpath + "/" + user, perm="lmw") hash, cfg.virtpath + "/" + user, perm="lmw")
# Save the user to the SQLite database
con = sqlite3.connect(cfg.virtusersdb) con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor() cur = con.cursor()
cur.execute("INSERT INTO virtusers VALUES (?,?,?,?)", cur.execute("INSERT INTO virtusers VALUES (?,?,?,?)",
@@ -301,20 +229,21 @@ class ASEHandler(FTPHandler):
def ftp_SITE_DELU(self, line): def ftp_SITE_DELU(self, line):
""" """
remove virtual user and save virtuser cfg file Remove a virtual user and save the virtuser configuration file.
""" """
cfg = self.cfg cfg = self.cfg
parms = line.split() parms = line.split()
user = os.path.basename(parms[0]) user = os.path.basename(parms[0]) # Extract the username
try: try:
# Remove the user from the authorizer
self.authorizer.remove_user(str(user)) self.authorizer.remove_user(str(user))
# Delete the user from the SQLite database
con = sqlite3.connect(cfg.virtusersdb) con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor() cur = con.cursor()
cur.execute("DELETE FROM virtusers WHERE user = ?", (user,)) cur.execute("DELETE FROM virtusers WHERE user = ?", (user,))
con.commit() con.commit()
con.close() con.close()
logging.info("User {} deleted.".format(user)) logging.info("User {} deleted.".format(user))
# self.push(' The user path has not been removed!\r\n')
self.respond('200 SITE DELU successful.') self.respond('200 SITE DELU successful.')
except: except:
@@ -322,14 +251,16 @@ class ASEHandler(FTPHandler):
def ftp_SITE_LSTU(self, line): def ftp_SITE_LSTU(self, line):
""" """
list virtual user List all virtual users.
""" """
cfg = self.cfg cfg = self.cfg
users_list = [] users_list = []
try: try:
# Connect to the SQLite database to fetch users
con = sqlite3.connect(cfg.virtusersdb) con = sqlite3.connect(cfg.virtusersdb)
cur = con.cursor() cur = con.cursor()
self.push("214-The following virtual users are defined:\r\n") self.push("214-The following virtual users are defined:\r\n")
# Fetch and list all virtual users
for row in cur.execute("SELECT * FROM virtusers").fetchall(): for row in cur.execute("SELECT * FROM virtusers").fetchall():
users_list.append( users_list.append(
" Username: " + row[0] + "\tPerms: " + row[3] + "\r\n") " Username: " + row[0] + "\tPerms: " + row[3] + "\r\n")
@@ -342,30 +273,34 @@ class ASEHandler(FTPHandler):
def main(): def main():
# Load the configuration settings
cfg = setting.config() cfg = setting.config()
try: try:
# Initialize the authorizer and handler
authorizer = DummyMD5Authorizer(cfg) authorizer = DummyMD5Authorizer(cfg)
handler = ASEHandler handler = ASEHandler
handler.cfg = cfg handler.cfg = cfg
handler.authorizer = authorizer handler.authorizer = authorizer
handler.masquerade_address = cfg.proxyaddr handler.masquerade_address = cfg.proxyaddr
# Set the range of passive ports for the FTP server
_range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth)) _range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth))
handler.passive_ports = _range handler.passive_ports = _range
# Configure logging
logging.basicConfig( logging.basicConfig(
format="%(asctime)s %(message)s", format="%(asctime)s %(message)s",
filename=cfg.logfilename, filename=cfg.logfilename,
level=logging.INFO, level=logging.INFO,
) )
# Create and start the FTP server
server = FTPServer(("0.0.0.0", 2121), handler) server = FTPServer(("0.0.0.0", 2121), handler)
server.serve_forever() server.serve_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info( logging.info(
"Info: {}.".format("Shutdown requested...exiting") "Info: {}.".format("Shutdown requested...exiting")
)F )
except Exception: except Exception:
print( print(
"{} - PID {:>5} >> Error: {}.".format( "{} - PID {:>5} >> Error: {}.".format(