""" SFTP Server implementation using asyncssh. Shares the same authentication system and file handling logic as the FTP server. """ import asyncio import logging from pathlib import Path import asyncssh from utils.connect import file_management from utils.database.connection import connetti_db_async logger = logging.getLogger(__name__) class ASESFTPServer(asyncssh.SFTPServer): """Custom SFTP server that handles file uploads with the same logic as FTP server.""" def __init__(self, chan): """Initialize SFTP server with channel.""" super().__init__(chan) # Get config from connection (set during authentication) self.cfg = chan.get_connection()._cfg async def close(self): """Called when SFTP session is closed.""" logger.info(f"SFTP session closed for user: {self._chan.get_connection().get_extra_info('username')}") await super().close() class ASESSHServer(asyncssh.SSHServer): """Custom SSH server for SFTP authentication using database.""" def __init__(self, cfg): """Initialize SSH server with configuration.""" self.cfg = cfg super().__init__() def connection_made(self, conn): """Called when connection is established.""" # Store config in connection for later use conn._cfg = self.cfg logger.info(f"SSH connection from {conn.get_extra_info('peername')[0]}") def connection_lost(self, exc): """Called when connection is lost.""" if exc: logger.error(f"SSH connection lost: {exc}") async def validate_password(self, username, password): """ Validate user credentials against database. Same logic as DatabaseAuthorizer for FTP. """ from hashlib import sha256 # Hash the provided password password_hash = sha256(password.encode("UTF-8")).hexdigest() # Check if user is admin if username == self.cfg.adminuser[0]: if self.cfg.adminuser[1] == password_hash: logger.info(f"Admin user '{username}' authenticated successfully") return True else: logger.warning(f"Failed admin login attempt for user: {username}") return False # For regular users, check database try: conn = await connetti_db_async(self.cfg) cur = await conn.cursor() # Query user from database await cur.execute( f"SELECT ftpuser, hash, virtpath, perm, disabled_at FROM {self.cfg.dbname}.{self.cfg.dbusertable} WHERE ftpuser = %s", (username,) ) result = await cur.fetchone() await cur.close() conn.close() if not result: logger.warning(f"SFTP login attempt for non-existent user: {username}") return False ftpuser, stored_hash, virtpath, perm, disabled_at = result # Check if user is disabled if disabled_at is not None: logger.warning(f"SFTP login attempt for disabled user: {username}") return False # Verify password if stored_hash != password_hash: logger.warning(f"Invalid password for SFTP user: {username}") return False # Authentication successful - ensure user directory exists try: Path(virtpath).mkdir(parents=True, exist_ok=True) except Exception as e: logger.error(f"Failed to create directory for user {username}: {e}") return False logger.info(f"Successful SFTP login for user: {username}") return True except Exception as e: logger.error(f"Database error during SFTP authentication for user {username}: {e}", exc_info=True) return False def password_auth_supported(self): """Enable password authentication.""" return True def begin_auth(self, username): """Called when authentication begins.""" logger.debug(f"Authentication attempt for user: {username}") return True class SFTPFileHandler(asyncssh.SFTPServer): """Extended SFTP server with file upload handling.""" def __init__(self, chan): super().__init__(chan) self.cfg = chan.get_connection()._cfg async def close(self): """Handle session close.""" await super().close() # Override file operations to add custom handling async def rename(self, oldpath, newpath): """ Handle file rename/move - called when upload completes. This is where we trigger the CSV processing like in FTP. """ result = await super().rename(oldpath, newpath) # Check if it's a CSV file that was uploaded if newpath.lower().endswith('.csv'): try: # Trigger file processing (same as FTP on_file_received) logger.info(f"CSV file uploaded via SFTP: {newpath}") # Create a mock handler object with required attributes mock_handler = type('obj', (object,), { 'cfg': self.cfg, 'username': self._chan.get_connection().get_extra_info('username') })() # Call the same file_management function used by FTP file_management.on_file_received(mock_handler, newpath) except Exception as e: logger.error(f"Error processing SFTP uploaded file {newpath}: {e}", exc_info=True) return result async def start_sftp_server(cfg, host='0.0.0.0', port=22): """ Start SFTP server. Args: cfg: Configuration object host: Host to bind to port: Port to bind to Returns: asyncssh server object """ logger.info(f"Starting SFTP server on {host}:{port}") # Create SSH server ssh_server = ASESSHServer(cfg) # Start asyncssh server server = await asyncssh.create_server( lambda: ssh_server, host, port, server_host_keys=['/app/ssh_host_key'], # You'll need to generate this sftp_factory=SFTPFileHandler, ) logger.info(f"SFTP server started successfully on {host}:{port}") logger.info(f"Database connection: {cfg.dbuser}@{cfg.dbhost}:{cfg.dbport}/{cfg.dbname}") return server