229 lines
7.9 KiB
Python
229 lines
7.9 KiB
Python
from flask import Flask, request, jsonify
|
|
import os
|
|
import psycopg2
|
|
import bcrypt
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
from cryptography.fernet import Fernet
|
|
from waitress import serve
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Configurazione
|
|
hash_file = "data/master_hash.txt"
|
|
|
|
# Configurazione connessione PostgreSQL
|
|
DB_CONFIG = {
|
|
"dbname": os.getenv("DB_NAME"),
|
|
"dbschema": os.getenv("DB_SCHEMA"),
|
|
"dbtable": os.getenv("DB_TABLE"),
|
|
"user": os.getenv("DB_USER"),
|
|
"password": os.getenv("DB_PASSWORD"),
|
|
"host": os.getenv("DB_HOST"),
|
|
"port": os.getenv("DB_PORT")
|
|
}
|
|
|
|
# Configura il logging
|
|
logging.basicConfig(level=logging.INFO, format='- PID: %(process)d %(levelname)s - %(message)s')
|
|
|
|
def get_db_connection():
|
|
return psycopg2.connect(
|
|
dbname=DB_CONFIG["dbname"],
|
|
user=DB_CONFIG["user"],
|
|
password=DB_CONFIG["password"],
|
|
host=DB_CONFIG["host"],
|
|
port=DB_CONFIG["port"]
|
|
)
|
|
|
|
|
|
# Inizializza il database
|
|
def init_db():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"""
|
|
CREATE TABLE IF NOT EXISTS {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (
|
|
id SERIAL PRIMARY KEY,
|
|
site TEXT NOT NULL,
|
|
username TEXT NOT NULL,
|
|
password TEXT NOT NULL,
|
|
client_id TEXT NOT NULL,
|
|
topic TEXT NOT NULL,
|
|
created_at timestamptz DEFAULT CURRENT_TIMESTAMP,
|
|
CONSTRAINT site_user_clientid_unique UNIQUE(site, username, client_id)
|
|
)
|
|
""")
|
|
conn.commit()
|
|
conn.close()
|
|
logging.info("Database inizializzato.")
|
|
|
|
# Salva l'hash della master password
|
|
def save_master_hash(hash):
|
|
with open(hash_file, "wb") as f:
|
|
f.write(hash)
|
|
logging.info("Hash della master password salvato.")
|
|
|
|
# Carica l'hash della master password
|
|
def load_master_hash():
|
|
if not os.path.exists(hash_file):
|
|
logging.warning("Hash della master password non trovato.")
|
|
return None
|
|
with open(hash_file, "rb") as f:
|
|
logging.info("Hash della master password caricato.")
|
|
return f.read()
|
|
|
|
# Deriva una chiave di crittografia dalla master password
|
|
def derive_key(master_password):
|
|
hash = hashlib.sha256(master_password.encode()).digest()
|
|
return base64.urlsafe_b64encode(hash)
|
|
|
|
# Autenticazione della master password
|
|
def authenticate(master_password):
|
|
master_hash = load_master_hash()
|
|
if master_hash is None:
|
|
hashed_password = bcrypt.hashpw(master_password.encode(), bcrypt.gensalt())
|
|
save_master_hash(hashed_password)
|
|
logging.info("Master password impostata per la prima volta.")
|
|
return True
|
|
auth_success = bcrypt.checkpw(master_password.encode(), master_hash)
|
|
if auth_success:
|
|
logging.info("Autenticazione riuscita.")
|
|
else:
|
|
logging.warning("Autenticazione fallita.")
|
|
return auth_success
|
|
|
|
# Aggiungi una password al database
|
|
def add_password(site, username, password, client_id, topic, cipher):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
encrypted_password = cipher.encrypt(password.encode()).decode()
|
|
try:
|
|
cursor.execute(
|
|
f"INSERT INTO {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} (site, username, password, client_id, topic) VALUES (%s, %s, %s, %s, %s)",
|
|
(site, username, encrypted_password, client_id, topic))
|
|
conn.commit()
|
|
logging.info(f"Password aggiunta per il sito: {site}.")
|
|
except psycopg2.Error as e:
|
|
logging.error(f"Errore durante l'aggiunta della password: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
# Recupera una password dal database
|
|
def get_password(site, cipher):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(f"SELECT username, password, client_id, topic FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} WHERE site = %s", (site,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
username, encrypted_password, client_id, topic = row
|
|
decrypted_password = cipher.decrypt(encrypted_password.encode()).decode()
|
|
logging.info(f"Password recuperata per il sito: {site}.")
|
|
return username, decrypted_password, client_id, topic
|
|
logging.warning(f"Sito non trovato: {site}.")
|
|
return None, None, None, None
|
|
except psycopg2.Error as e:
|
|
logging.error(f"Errore durante il recupero della password: {e}")
|
|
return None, None, None, None
|
|
finally:
|
|
conn.close()
|
|
|
|
# Cancella una password dal database
|
|
def delete_password(site):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(f"DELETE FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']} WHERE site = %s", (site,))
|
|
if cursor.rowcount > 0:
|
|
logging.info(f"Password cancellata per il sito: {site}.")
|
|
else:
|
|
logging.warning(f"Nessuna password trovata per il sito: {site}.")
|
|
conn.commit()
|
|
except psycopg2.Error as e:
|
|
logging.error(f"Errore durante la cancellazione della password: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
# Ottieni la lista di tutti i siti
|
|
def list_sites():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(f"SELECT site FROM {DB_CONFIG['dbschema']}.{DB_CONFIG['dbtable']}")
|
|
sites = [row[0] for row in cursor.fetchall()]
|
|
logging.info("Elenco dei siti recuperato.")
|
|
return sites
|
|
except psycopg2.Error as e:
|
|
logging.error(f"Errore durante il recupero dell'elenco dei siti: {e}")
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
# Endpoint per aggiungere una password
|
|
@app.route('/add', methods=['POST'])
|
|
def add_password_api():
|
|
master_password = request.json.get('master_password')
|
|
site = request.json.get('site')
|
|
username = request.json.get('username')
|
|
password = request.json.get('password')
|
|
client_id = request.json.get('client_id')
|
|
topic = request.json.get('topic')
|
|
|
|
if not authenticate(master_password):
|
|
logging.warning("Tentativo di aggiungere una password con master password errata.")
|
|
return jsonify({"error": "Master password errata"}), 403
|
|
|
|
key = derive_key(master_password)
|
|
cipher = Fernet(key)
|
|
add_password(site, username, password, client_id, topic, cipher)
|
|
return jsonify({"message": "Password aggiunta con successo"})
|
|
|
|
# Endpoint per recuperare una password
|
|
@app.route('/get', methods=['POST'])
|
|
def get_password_api():
|
|
master_password = request.json.get('master_password')
|
|
site = request.json.get('site')
|
|
|
|
if not authenticate(master_password):
|
|
logging.warning("Tentativo di recuperare una password con master password errata.")
|
|
return jsonify({"error": "Master password errata"}), 403
|
|
|
|
key = derive_key(master_password)
|
|
cipher = Fernet(key)
|
|
username, password, client_id, topic = get_password(site, cipher)
|
|
|
|
if username is None:
|
|
return jsonify({"error": "Sito non trovato"}), 404
|
|
|
|
return jsonify({"site": site, "username": username, "password": password, "client_id": client_id, "topic": topic})
|
|
|
|
# Endpoint per cancellare una password
|
|
@app.route('/delete', methods=['POST'])
|
|
def delete_password_api():
|
|
master_password = request.json.get('master_password')
|
|
site = request.json.get('site')
|
|
|
|
if not authenticate(master_password):
|
|
logging.warning("Tentativo di cancellare una password con master password errata.")
|
|
return jsonify({"error": "Master password errata"}), 403
|
|
|
|
delete_password(site)
|
|
return jsonify({"message": "Password cancellata con successo"})
|
|
|
|
# Endpoint per listare tutti i siti
|
|
@app.route('/list', methods=['POST'])
|
|
def list_sites_api():
|
|
master_password = request.json.get('master_password')
|
|
|
|
if not authenticate(master_password):
|
|
logging.warning("Tentativo di recuperare l'elenco dei siti con master password errata.")
|
|
return jsonify({"error": "Master password errata"}), 403
|
|
|
|
sites = list_sites()
|
|
return jsonify({"sites": sites})
|
|
|
|
# Avvio dell'app
|
|
if __name__ == '__main__':
|
|
init_db()
|
|
serve(app, host='0.0.0.0', port=5000)
|