fix + test api with resl client

This commit is contained in:
2025-02-06 15:10:51 +01:00
parent 2d172a3620
commit 861a0e58e5
20 changed files with 376 additions and 254 deletions

View File

@@ -9,6 +9,7 @@ urlpatterns = [
path('user/list/', views.list_users, name='list_users'),
path('user/create_user/', views.create_user, name='create_user'),
path('user/edit/<slug:slug>', views.edit_user, name='edit_user'),
path('user/delete/<slug:slug>', views.delete_user, name='delete_user'),
path('user/disable/<slug:slug>', views.disable_user, name='disable_user'),
path('user/enable/<slug:slug>', views.enable_user, name='enable_user'),
path('user/role/<str:role>', views.view_role, name='view_role'),

View File

@@ -1,76 +0,0 @@
import bcrypt
import base64
import hashlib
import json
from cryptography.fernet import Fernet
from .models import MasterHash
from django.http import JsonResponse
# 1. Salva l'hash della master password e la chiave principale cifrata
def save_master_data(hashed_password, encrypted_data_key):
entry, created = MasterHash.objects.get_or_create(id="1")
entry.hash = hashed_password
entry.encrypted_data_key = encrypted_data_key
entry.save()
# 2. Carica i dati della master password
def load_master_data():
try:
entry = MasterHash.objects.get(id="1")
return entry.hash, entry.encrypted_data_key
except Exception:
return None, None
# 3. Autenticazione della master password
def authenticate(master_password):
stored_hash, encrypted_data_key = load_master_data()
if stored_hash is None:
hashed_password = bcrypt.hashpw(master_password.encode(), bcrypt.gensalt())
key = Fernet.generate_key() # Genera una chiave principale
derived_key = derive_key(master_password) # Deriva la chiave dalla master password
encrypted_data_key = encrypt_password(key.decode(), derived_key) # Cifra la chiave principale
save_master_data(hashed_password, encrypted_data_key)
return True, key
# Controlla se la password inserita è corretta
if bcrypt.checkpw(master_password.encode(), stored_hash.tobytes()):
derived_key = derive_key(master_password)
decrypted_data_key = decrypt_password(encrypted_data_key, derived_key) # Decifra la chiave principale
return True, decrypted_data_key.encode()
return False, None
# 4. Funzione per cambiare la master password
def change_master_password(old_password, new_password):
authenticated, data_key = authenticate(old_password)
if not authenticated:
return False # Fallisce se la vecchia password non è corretta
# Deriviamo la nuova chiave dalla nuova master password
new_derived_key = derive_key(new_password)
# Cifriamo la chiave principale con la nuova chiave derivata
new_encrypted_data_key = encrypt_password(data_key.decode(), new_derived_key)
# Creiamo il nuovo hash della nuova master password
new_hashed_password = bcrypt.hashpw(new_password.encode(), bcrypt.gensalt())
# Aggiorniamo il database con i nuovi dati
save_master_data(new_hashed_password, new_encrypted_data_key)
return True # Cambio password riuscito
# 5. Deriva una chiave da una password
def derive_key(master_password):
hash = hashlib.sha256(master_password.encode()).digest()
return base64.urlsafe_b64encode(hash)
# 6. Cifra una password con una chiave
def encrypt_password(password, key):
cipher = Fernet(key)
return cipher.encrypt(password.encode()).decode()
# 7. Decifra una password con una chiave
def decrypt_password(encrypted_password, key):
cipher = Fernet(key)
return cipher.decrypt(encrypted_password.encode()).decode()

View File

@@ -1,8 +1,8 @@
from django.shortcuts import render, redirect, get_object_or_404
from django.contrib import messages
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth.decorators import login_required, permission_required
from django.views.decorators.csrf import csrf_exempt, csrf_protect
from django.contrib.auth.decorators import login_required
import json
import paho.mqtt.client as mqtt
@@ -16,13 +16,51 @@ from .mqtt_service import MosquittoDynamicSecurity
config = dotenv_values(".env")
# authenticate(config['MASTER_PASSWORD'])
@csrf_exempt
def list_sites_api(request):
if request.method == 'POST':
data = json.loads(request.body)
master_password = data.get('master_password')
if not authenticate(master_password):
return JsonResponse({"error": "Master password errata"}, status=403)
sites = PasswordEntry.objects.values_list('site', flat=True)
return JsonResponse({"sites": list(sites)})
return JsonResponse({"error": "Richiesta non valida"}, status=400)
@csrf_exempt
def get_password_api(request):
if request.method == 'POST':
data = json.loads(request.body)
master_password = data.get('master_password')
site = data.get('site')
if not authenticate(master_password):
return JsonResponse({"error": "Master password errata"}, status=403)
key = derive_key(master_password)
try:
entry = PasswordEntry.objects.get(site=site)
decrypted_password = decrypt_password(entry.password, key)
return JsonResponse({
"site": entry.site,
"username": entry.username,
"password": decrypted_password,
"client_id": entry.client_id,
"topic": entry.topic
})
except PasswordEntry.DoesNotExist:
return JsonResponse({"error": "Sito non trovato"}, status=404)
return JsonResponse({"error": "Richiesta non valida"}, status=400)
@login_required
def home_view(request):
return redirect('list_users') # Reindirizza alla lista degli utenti
#return render(request, 'home.html')
@csrf_exempt
@login_required
def publish_message(request):
request_data = json.loads(request.body)
publish.single(topic=request_data['topic'],
@@ -58,6 +96,57 @@ def create_user(request):
"username": username,
"password": password
}
],
"commands": [
{
"command": "createRole",
"rolename": f'{username}_role',
"textname": "",
"textdescription": "",
"acls": [
{ "acltype": "publishClientSend", "topic": topic, "priority": 0, "allow": True }
]
},
{
"command": "createRole",
"rolename": f'{username}_ase_role',
"textname": "",
"textdescription": "",
"acls": [
{ "acltype": "publishClientSend", "topic": topic, "priority": 0, "allow": True },
{ "acltype": "publishClientReceive", "topic": topic, "priority": 0, "allow": True },
{ "acltype": "subscribeLiteral", "topic": topic, "priority": 0, "allow": True },
{ "acltype": "subscribePattern", "topic": topic, "priority": 0, "allow": True },
{ "acltype": "unsubscribeLiteral", "topic": topic, "priority": 0, "allow": True },
{ "acltype": "unsubscribePattern", "topic": topic, "priority": 0, "allow": True }
]
},
{
"command": "createClient",
"username": username,
"password": password,
"clientid": client_id,
"textname": f'{username} subscriber',
"textdescription": f'{username} subscriber',
"groups": [
],
"roles": [
{ "rolename": f'{username}_role', "priority": 0 }
]
},
{
"command": "createClient",
"username": f'{username}_ase',
"password": f'{password}{config["MQTT_PWDX"]}',
"clientid": f'{client_id}_ase',
"textname": f'{username} ASE subscriber',
"textdescription": f'{username} ASE subscriber',
"groups": [
],
"roles": [
{ "rolename": f'{username}_ase_role', "priority": 0 }
]
}
]
}
@@ -68,6 +157,7 @@ def create_user(request):
key = derive_key(config['MASTER_PASSWORD'])
encrypted_password = encrypt_password(password, key)
encrypted_password_ase = encrypt_password(f'{password}_ase', key)
PasswordEntry.objects.create(
site=site,
@@ -78,6 +168,15 @@ def create_user(request):
status='enabled'
)
PasswordEntry.objects.create(
site=f'{site}_ase',
username=f'{username}_ase',
password=encrypted_password_ase,
client_id=f'{client_id}_ase',
topic=topic,
status='enabled'
)
messages.success(request, 'Utente creato con successo!') # Messaggio di successo
return redirect('list_users') # Reindirizza alla lista degli utenti
else:
@@ -96,9 +195,39 @@ def edit_user(request, slug):
user = PasswordEntry.objects.filter(slug=slug).first()
return render(request, 'wallet_api/edit_user.html', {'user': user})
else:
messages.success(request, 'Non hai i permessi per creare utenti MQTT!') # Messaggio di successo
messages.success(request, 'Non hai i permessi per cancellare utenti MQTT!') # Messaggio di successo
return redirect('list_users') # Reindirizza alla lista degli utenti
@login_required
def delete_user(request, slug):
if request.user.groups.filter(name='ase_admin').exists():
user = PasswordEntry.objects.filter(slug=slug).first()
command = {
"commands":
[
{
"command": "deleteClient",
"username": user.username
},
{
"command": "deleteRole",
"rolename": f'{user.username}_role'
}
]
}
# Invia il comando a Mosquitto
mqtt_service = MosquittoDynamicSecurity()
response = mqtt_service.send_command(command)
if "error" not in response["responses"][0]:
result = PasswordEntry.objects.filter(id=user.id).delete()
print(result)
messages.success(request, f'Utente {user.username} eliminato!') # Messaggio di successo
return redirect('list_users') # Reindirizza alla lista degli utenti
else:
messages.success(request, 'Non hai i permessi per cancellare utenti MQTT!') # Messaggio di successo
return redirect('list_users') # Reindirizza alla lista degli utenti
@login_required
def disable_user(request, slug):
if request.user.groups.filter(name='ase_admin').exists():
@@ -164,43 +293,9 @@ def enable_user(request, slug):
messages.success(request, 'Non hai i permessi per disabilitare utenti MQTT!') # Messaggio di successo
return redirect('list_users') # Reindirizza alla lista degli utenti
@login_required
def view_role(request, role):
return render(request, 'wallet_api/role_info.html', {'role': role })
@csrf_exempt
def get_password_api(request):
if request.method == 'POST':
data = json.loads(request.body)
master_password = data.get('master_password')
site = data.get('site')
if not authenticate(master_password):
return JsonResponse({"error": "Master password errata"}, status=403)
key = derive_key(master_password)
try:
entry = PasswordEntry.objects.get(site=site)
decrypted_password = decrypt_password(entry.password, key)
return JsonResponse({
"site": entry.site,
"username": entry.username,
"password": decrypted_password,
"client_id": entry.client_id,
"topic": entry.topic
})
except PasswordEntry.DoesNotExist:
return JsonResponse({"error": "Sito non trovato"}, status=404)
return JsonResponse({"error": "Richiesta non valida"}, status=400)
@csrf_exempt
def list_sites_api(request):
if request.method == 'POST':
data = json.loads(request.body)
master_password = data.get('master_password')
if not authenticate(master_password):
return JsonResponse({"error": "Master password errata"}, status=403)
sites = PasswordEntry.objects.values_list('site', flat=True)
return JsonResponse({"sites": list(sites)})
return JsonResponse({"error": "Richiesta non valida"}, status=400)