#!.venv/bin/python """This module implements an FTP server with custom commands for managing virtual users and handling CSV file uploads.""" import os import logging from hashlib import sha256 from pathlib import Path from utils.config import loader from utils.database.connection import connetti_db from utils.ftp import user_admin, file_management from pyftpdlib.handlers import FTPHandler from pyftpdlib.servers import FTPServer from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed # Configure logging (moved inside main function) logger = logging.getLogger(__name__) class DummySha256Authorizer(DummyAuthorizer): """Custom authorizer that uses SHA256 for password hashing and manages users from a database.""" def __init__(self: object, cfg: object) -> None: """Initializes the authorizer, adds the admin user, and loads users from the database. Args: cfg: The configuration object. """ super().__init__() self.add_user( cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3]) # Define the database connection conn = connetti_db(cfg) # Create a cursor cur = conn.cursor() cur.execute(f'SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL') for ftpuser, hash, virtpath, perm in cur.fetchall(): self.add_user(ftpuser, hash, virtpath, perm) """ Create the user's directory if it does not exist. """ try: Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True) except Exception as e: self.responde(f'551 Error in create virtual user path: {e}') def validate_authentication(self: object, username: str, password: str, handler: object) -> None: # Validate the user's password against the stored hash hash = sha256(password.encode("UTF-8")).hexdigest() try: if self.user_table[username]["pwd"] != hash: raise KeyError except KeyError: raise AuthenticationFailed class ASEHandler(FTPHandler): """Custom FTP handler that extends FTPHandler with custom commands and file handling.""" def __init__(self: object, conn: object, server: object, ioloop=None) -> None: """Initializes the handler, adds custom commands, and sets up command permissions. Args: conn: The connection object. server: The FTP server object. ioloop: The I/O loop object. """ super().__init__(conn, server, ioloop) self.proto_cmds = FTPHandler.proto_cmds.copy() # Add custom FTP commands for managing virtual users - command in lowercase self.proto_cmds.update( {'SITE ADDU': dict(perm='M', auth=True, arg=True, help='Syntax: SITE ADDU USERNAME PASSWORD (add virtual user).')} ) self.proto_cmds.update( {'SITE DISU': dict(perm='M', auth=True, arg=True, help='Syntax: SITE DISU USERNAME (disable virtual user).')} ) self.proto_cmds.update( {'SITE ENAU': dict(perm='M', auth=True, arg=True, help='Syntax: SITE ENAU USERNAME (enable virtual user).')} ) self.proto_cmds.update( {'SITE LSTU': dict(perm='M', auth=True, arg=None, help='Syntax: SITE LSTU (list virtual users).')} ) def on_file_received(self: object, file: str) -> None: return file_management.on_file_received(self, file) def on_incomplete_file_received(self: object, file: str) -> None: """Removes partially uploaded files. Args: file: The path to the incomplete file. """ os.remove(file) def ftp_SITE_ADDU(self: object, line: str) -> None: return user_admin.ftp_SITE_ADDU(self, line) def ftp_SITE_DISU(self: object, line: str) -> None: return user_admin.ftp_SITE_DISU(self, line) def ftp_SITE_ENAU(self: object, line: str) -> None: return user_admin.ftp_SITE_ENAU(self, line) def ftp_SITE_LSTU(self: object, line: str) -> None: return user_admin.ftp_SITE_LSTU(self, line) def main(): """Main function to start the FTP server.""" # Load the configuration settings cfg = loader.Config() try: # Initialize the authorizer and handler authorizer = DummySha256Authorizer(cfg) handler = ASEHandler handler.cfg = cfg handler.authorizer = authorizer handler.masquerade_address = cfg.proxyaddr # Set the range of passive ports for the FTP server _range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth)) handler.passive_ports = _range # Configure logging logging.basicConfig( format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", # Use cfg.logfilename directly without checking its existence filename=cfg.logfilename, level=logging.INFO, ) # Create and start the FTP server server = FTPServer(("0.0.0.0", 2121), handler) server.serve_forever() except Exception as e: logger.error( f"Exit with error: {e}." ) if __name__ == "__main__": main()