import logging import os from hashlib import sha256 from pathlib import Path import mysql.connector from utils.database.connection import connetti_db logger = logging.getLogger(__name__) def ftp_SITE_ADDU(self: object, line: str) -> None: """ Adds a virtual user, creates their directory, and saves their details to the database. Args: line (str): A string containing the username and password separated by a space. """ cfg = self.cfg try: parms = line.split() user = os.path.basename(parms[0]) # Extract the username password = parms[1] # Get the password hash = sha256(password.encode("UTF-8")).hexdigest() # Hash the password except IndexError: self.respond("501 SITE ADDU failed. Command needs 2 arguments") else: try: # Create the user's directory Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True) except Exception as e: self.respond(f"551 Error in create virtual user path: {e}") else: try: # Add the user to the authorizer self.authorizer.add_user(str(user), hash, cfg.virtpath + "/" + user, perm=cfg.defperm) # Save the user to the database # Define the database connection try: conn = connetti_db(cfg) except mysql.connector.Error as e: print(f"Error: {e}") logger.error(f"{e}") # Create a cursor cur = conn.cursor() cur.execute( f"""INSERT INTO {cfg.dbname}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('{user}', '{hash}', '{cfg.virtpath + user}', '{cfg.defperm}')""" ) conn.commit() conn.close() logger.info(f"User {user} created.") self.respond("200 SITE ADDU successful.") except Exception as e: self.respond(f"501 SITE ADDU failed: {e}.") print(e) def ftp_SITE_DISU(self: object, line: str) -> None: """ Removes a virtual user from the authorizer and marks them as deleted in the database. Args: line (str): A string containing the username to be disabled. """ cfg = self.cfg parms = line.split() user = os.path.basename(parms[0]) # Extract the username try: # Remove the user from the authorizer self.authorizer.remove_user(str(user)) # Delete the user from database try: conn = connetti_db(cfg) except mysql.connector.Error as e: print(f"Error: {e}") logger.error(f"{e}") # Crea un cursore cur = conn.cursor() cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = now() WHERE ftpuser = '{user}'") conn.commit() conn.close() logger.info(f"User {user} deleted.") self.respond("200 SITE DISU successful.") except Exception as e: self.respond("501 SITE DISU failed.") print(e) def ftp_SITE_ENAU(self: object, line: str) -> None: """ Restores a virtual user by updating their status in the database and adding them back to the authorizer. Args: line (str): A string containing the username to be enabled. """ cfg = self.cfg parms = line.split() user = os.path.basename(parms[0]) # Extract the username try: # Restore the user into database try: conn = connetti_db(cfg) except mysql.connector.Error as e: print(f"Error: {e}") logger.error(f"{e}") # Crea un cursore cur = conn.cursor() try: cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = null WHERE ftpuser = '{user}'") conn.commit() except Exception as e: logger.error(f"Update DB failed: {e}") cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE ftpuser = '{user}'") ftpuser, hash, virtpath, perm = cur.fetchone() self.authorizer.add_user(ftpuser, hash, virtpath, perm) try: Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True) except Exception as e: self.responde(f"551 Error in create virtual user path: {e}") conn.close() logger.info(f"User {user} restored.") self.respond("200 SITE ENAU successful.") except Exception as e: self.respond("501 SITE ENAU failed.") print(e) def ftp_SITE_LSTU(self: object, line: str) -> None: """ Lists all virtual users from the database. Args: line (str): An empty string (no arguments needed for this command). """ cfg = self.cfg users_list = [] try: # Connect to the SQLite database to fetch users try: conn = connetti_db(cfg) except mysql.connector.Error as e: print(f"Error: {e}") logger.error(f"{e}") # Crea un cursore cur = conn.cursor() self.push("214-The following virtual users are defined:\r\n") cur.execute(f"SELECT ftpuser, perm, disabled_at FROM {cfg.dbname}.{cfg.dbusertable}") [ users_list.append(f"Username: {ftpuser}\tPerms: {perm}\tDisabled: {disabled_at}\r\n") for ftpuser, perm, disabled_at in cur.fetchall() ] self.push("".join(users_list)) self.respond("214 LSTU SITE command successful.") except Exception as e: self.respond(f"501 list users failed: {e}")