from flask import Flask, request, jsonify import os import psycopg2 import bcrypt import base64 import hashlib import logging from cryptography.fernet import Fernet from waitress import serve # Configurazione hash_file = "data/master_hash.txt" # Configurazione connessione PostgreSQL DB_CONFIG = { "dbname": os.getenv("DB_NAME"), "dbschema": os.getenv("DB_SCHEMA"), "dbtable": os.getenv("DB_TABLE"), "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "host": os.getenv("DB_HOST"), "port": os.getenv("DB_PORT") } app = Flask(__name__) # Configura il logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - PID: %(process)d %(levelname)s - %(message)s') def get_db_connection(): return psycopg2.connect( dbname=DB_CONFIG["dbname"], user=DB_CONFIG["user"], password=DB_CONFIG["password"], host=DB_CONFIG["host"], port=DB_CONFIG["port"] ) # Inizializza il database def init_db(): conn = get_db_connection() cursor = conn.cursor() cursor.execute(f""" CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} ( id SERIAL PRIMARY KEY, site TEXT NOT NULL, username TEXT NOT NULL, password TEXT NOT NULL ) """) conn.commit() conn.close() logging.info("Database inizializzato.") # Salva l'hash della master password def save_master_hash(hash): with open(hash_file, "wb") as f: f.write(hash) logging.info("Hash della master password salvato.") # Carica l'hash della master password def load_master_hash(): if not os.path.exists(hash_file): logging.warning("Hash della master password non trovato.") return None with open(hash_file, "rb") as f: logging.info("Hash della master password caricato.") return f.read() # Deriva una chiave di crittografia dalla master password def derive_key(master_password): hash = hashlib.sha256(master_password.encode()).digest() return base64.urlsafe_b64encode(hash) # Autenticazione della master password def authenticate(master_password): master_hash = load_master_hash() if master_hash is None: hashed_password = bcrypt.hashpw(master_password.encode(), bcrypt.gensalt()) save_master_hash(hashed_password) logging.info("Master password impostata per la prima volta.") return True auth_success = bcrypt.checkpw(master_password.encode(), master_hash) if auth_success: logging.info("Autenticazione riuscita.") else: logging.warning("Autenticazione fallita.") return auth_success # Aggiungi una password al database def add_password(site, username, password, cipher): conn = get_db_connection() cursor = conn.cursor() encrypted_password = cipher.encrypt(password.encode()).decode() try: cursor.execute( f"INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (site, username, password) VALUES (%s, %s, %s)", (site, username, encrypted_password)) conn.commit() logging.info(f"Password aggiunta per il sito: {site}.") except psycopg2.Error as e: logging.error(f"Errore durante l'aggiunta della password: {e}") finally: conn.close() # Recupera una password dal database def get_password(site, cipher): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(f"SELECT username, password FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} WHERE site = %s", (site,)) row = cursor.fetchone() if row: username, encrypted_password = row decrypted_password = cipher.decrypt(encrypted_password.encode()).decode() logging.info(f"Password recuperata per il sito: {site}.") return username, decrypted_password logging.warning(f"Sito non trovato: {site}.") return None, None except psycopg2.Error as e: logging.error(f"Errore durante il recupero della password: {e}") return None, None finally: conn.close() # Cancella una password dal database def delete_password(site): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(f"DELETE FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} WHERE site = %s", (site,)) if cursor.rowcount > 0: logging.info(f"Password cancellata per il sito: {site}.") else: logging.warning(f"Nessuna password trovata per il sito: {site}.") conn.commit() except psycopg2.Error as e: logging.error(f"Errore durante la cancellazione della password: {e}") finally: conn.close() # Ottieni la lista di tutti i siti def list_sites(): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(f"SELECT site FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}") sites = [row[0] for row in cursor.fetchall()] logging.info("Elenco dei siti recuperato.") return sites except psycopg2.Error as e: logging.error(f"Errore durante il recupero dell'elenco dei siti: {e}") return [] finally: conn.close() # Endpoint per aggiungere una password @app.route('/add', methods=['POST']) def add_password_api(): master_password = request.json.get('master_password') site = request.json.get('site') username = request.json.get('username') password = request.json.get('password') if not authenticate(master_password): logging.warning("Tentativo di aggiungere una password con master password errata.") return jsonify({"error": "Master password errata"}), 403 key = derive_key(master_password) cipher = Fernet(key) add_password(site, username, password, cipher) return jsonify({"message": "Password aggiunta con successo"}) # Endpoint per recuperare una password @app.route('/get', methods=['POST']) def get_password_api(): master_password = request.json.get('master_password') site = request.json.get('site') if not authenticate(master_password): logging.warning("Tentativo di recuperare una password con master password errata.") return jsonify({"error": "Master password errata"}), 403 key = derive_key(master_password) cipher = Fernet(key) username, password = get_password(site, cipher) if username is None: return jsonify({"error": "Sito non trovato"}), 404 return jsonify({"site": site, "username": username, "password": password}) # Endpoint per cancellare una password @app.route('/delete', methods=['POST']) def delete_password_api(): master_password = request.json.get('master_password') site = request.json.get('site') if not authenticate(master_password): logging.warning("Tentativo di cancellare una password con master password errata.") return jsonify({"error": "Master password errata"}), 403 delete_password(site) return jsonify({"message": "Password cancellata con successo"}) # Endpoint per listare tutti i siti @app.route('/list', methods=['POST']) def list_sites_api(): master_password = request.json.get('master_password') if not authenticate(master_password): logging.warning("Tentativo di recuperare l'elenco dei siti con master password errata.") return jsonify({"error": "Master password errata"}), 403 sites = list_sites() return jsonify({"sites": sites}) # Avvio dell'app if __name__ == '__main__': init_db() #app.run(host='0.0.0.0', port=5000) serve(app, host='0.0.0.0', port=5000)