Files
ase_mqtt/password_wallet_api.py
2024-12-30 11:30:30 +01:00

227 lines
7.7 KiB
Python

from flask import Flask, request, jsonify
import os
import psycopg2
import bcrypt
import base64
import hashlib
import logging
from cryptography.fernet import Fernet
from waitress import serve
app = Flask(__name__)
# Configurazione
hash_file = "data/master_hash.txt"
# Configurazione connessione PostgreSQL
DB_CONFIG = {
"dbname": os.getenv("DB_NAME"),
"dbschema": os.getenv("DB_SCHEMA"),
"dbtable": os.getenv("DB_TABLE"),
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"host": os.getenv("DB_HOST"),
"port": os.getenv("DB_PORT")
}
# Configura il logging
logging.basicConfig(level=logging.INFO, format='- PID: %(process)d %(levelname)s - %(message)s')
def get_db_connection():
return psycopg2.connect(
dbname=DB_CONFIG["dbname"],
user=DB_CONFIG["user"],
password=DB_CONFIG["password"],
host=DB_CONFIG["host"],
port=DB_CONFIG["port"]
)
# Inizializza il database
def init_db():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (
id SERIAL PRIMARY KEY,
site TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
client_id TEXT NULL,
created_at timestamptz DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT site_user_clientid_unique UNIQUE(site, username, client_id)
)
""")
conn.commit()
conn.close()
logging.info("Database inizializzato.")
# Salva l'hash della master password
def save_master_hash(hash):
with open(hash_file, "wb") as f:
f.write(hash)
logging.info("Hash della master password salvato.")
# Carica l'hash della master password
def load_master_hash():
if not os.path.exists(hash_file):
logging.warning("Hash della master password non trovato.")
return None
with open(hash_file, "rb") as f:
logging.info("Hash della master password caricato.")
return f.read()
# Deriva una chiave di crittografia dalla master password
def derive_key(master_password):
hash = hashlib.sha256(master_password.encode()).digest()
return base64.urlsafe_b64encode(hash)
# Autenticazione della master password
def authenticate(master_password):
master_hash = load_master_hash()
if master_hash is None:
hashed_password = bcrypt.hashpw(master_password.encode(), bcrypt.gensalt())
save_master_hash(hashed_password)
logging.info("Master password impostata per la prima volta.")
return True
auth_success = bcrypt.checkpw(master_password.encode(), master_hash)
if auth_success:
logging.info("Autenticazione riuscita.")
else:
logging.warning("Autenticazione fallita.")
return auth_success
# Aggiungi una password al database
def add_password(site, username, password, client_id, cipher):
conn = get_db_connection()
cursor = conn.cursor()
encrypted_password = cipher.encrypt(password.encode()).decode()
try:
cursor.execute(
f"INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (site, username, password, client_id) VALUES (%s, %s, %s, %s)",
(site, username, encrypted_password, client_id))
conn.commit()
logging.info(f"Password aggiunta per il sito: {site}.")
except psycopg2.Error as e:
logging.error(f"Errore durante l'aggiunta della password: {e}")
finally:
conn.close()
# Recupera una password dal database
def get_password(site, cipher):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(f"SELECT username, password, client_id FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} WHERE site = %s", (site,))
row = cursor.fetchone()
if row:
username, encrypted_password, client_id = row
decrypted_password = cipher.decrypt(encrypted_password.encode()).decode()
logging.info(f"Password recuperata per il sito: {site}.")
return username, decrypted_password, client_id
logging.warning(f"Sito non trovato: {site}.")
return None, None, None
except psycopg2.Error as e:
logging.error(f"Errore durante il recupero della password: {e}")
return None, None, None
finally:
conn.close()
# Cancella una password dal database
def delete_password(site):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(f"DELETE FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} WHERE site = %s", (site,))
if cursor.rowcount > 0:
logging.info(f"Password cancellata per il sito: {site}.")
else:
logging.warning(f"Nessuna password trovata per il sito: {site}.")
conn.commit()
except psycopg2.Error as e:
logging.error(f"Errore durante la cancellazione della password: {e}")
finally:
conn.close()
# Ottieni la lista di tutti i siti
def list_sites():
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(f"SELECT site FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}")
sites = [row[0] for row in cursor.fetchall()]
logging.info("Elenco dei siti recuperato.")
return sites
except psycopg2.Error as e:
logging.error(f"Errore durante il recupero dell'elenco dei siti: {e}")
return []
finally:
conn.close()
# Endpoint per aggiungere una password
@app.route('/add', methods=['POST'])
def add_password_api():
master_password = request.json.get('master_password')
site = request.json.get('site')
username = request.json.get('username')
password = request.json.get('password')
client_id = request.json.get('client_id')
if not authenticate(master_password):
logging.warning("Tentativo di aggiungere una password con master password errata.")
return jsonify({"error": "Master password errata"}), 403
key = derive_key(master_password)
cipher = Fernet(key)
add_password(site, username, password, client_id, cipher)
return jsonify({"message": "Password aggiunta con successo"})
# Endpoint per recuperare una password
@app.route('/get', methods=['POST'])
def get_password_api():
master_password = request.json.get('master_password')
site = request.json.get('site')
if not authenticate(master_password):
logging.warning("Tentativo di recuperare una password con master password errata.")
return jsonify({"error": "Master password errata"}), 403
key = derive_key(master_password)
cipher = Fernet(key)
username, password, client_id = get_password(site, cipher)
if username is None:
return jsonify({"error": "Sito non trovato"}), 404
return jsonify({"site": site, "username": username, "password": password, "client_id": client_id})
# Endpoint per cancellare una password
@app.route('/delete', methods=['POST'])
def delete_password_api():
master_password = request.json.get('master_password')
site = request.json.get('site')
if not authenticate(master_password):
logging.warning("Tentativo di cancellare una password con master password errata.")
return jsonify({"error": "Master password errata"}), 403
delete_password(site)
return jsonify({"message": "Password cancellata con successo"})
# Endpoint per listare tutti i siti
@app.route('/list', methods=['POST'])
def list_sites_api():
master_password = request.json.get('master_password')
if not authenticate(master_password):
logging.warning("Tentativo di recuperare l'elenco dei siti con master password errata.")
return jsonify({"error": "Master password errata"}), 403
sites = list_sites()
return jsonify({"sites": sites})
# Avvio dell'app
if __name__ == '__main__':
init_db()
serve(app, host='0.0.0.0', port=5000)