from django.shortcuts import render, redirect, get_object_or_404 from django.contrib import messages from django.http import JsonResponse, HttpResponse from django.views.decorators.csrf import csrf_exempt import json import paho.mqtt.client as mqtt import paho.mqtt.publish as publish from dotenv import dotenv_values from .models import PasswordEntry from .utils import authenticate, derive_key, encrypt_password, decrypt_password from .mqtt_service import MosquittoDynamicSecurity config = dotenv_values(".env") authenticate(config['MASTER_PASSWORD']) def home_view(request): return render(request, 'home.html') @csrf_exempt def publish_message(request): request_data = json.loads(request.body) publish.single(topic=request_data['topic'], payload=json.dumps(request_data['msg']), hostname=config['MQTT_HOST'], port=int(config['MQTT_PORT']), keepalive=int(config['MQTT_KEEPALIVE']), auth={'username': config['MQTT_USER'], 'password': config['MQTT_PASSWORD']}, protocol=mqtt.MQTTv5) return JsonResponse({'request_data': request_data}) def list_users(request): users = PasswordEntry.objects.all() return render(request, 'wallet_api/list_users.html', {'users': users}) def create_user(request): if request.method == 'POST': site = request.POST.get('site') username = request.POST.get('username') client_id = request.POST.get('clientId') topic = request.POST.get('topic') password = request.POST.get('password') # Comando per creare un utente command = { "commands": [ { "command": "createClient", "username": username, "password": password } ] } # Invia il comando a Mosquitto mqtt_service = MosquittoDynamicSecurity() response = mqtt_service.send_command(command) if "error" not in response["responses"][0]: key = derive_key(config['MASTER_PASSWORD']) encrypted_password = encrypt_password(password, key) PasswordEntry.objects.create( site=site, username=username, password=encrypted_password, client_id=client_id, topic=topic, status='enabled' ) messages.success(request, 'Utente creato con successo!') # Messaggio di successo return redirect('list_users') # Reindirizza alla lista degli utenti else: messages.warning(request, f'Errore durante la creazione dell\'utente: {response["responses"][0]["error"]}') # Messaggio di errore return render(request, 'wallet_api/create_user.html', {'response': response}) # Rimani sulla stessa vista # Se la richiesta non รจ POST, mostra il form di creazione utente return render(request, 'wallet_api/create_user.html') def edit_user(request, slug): user = PasswordEntry.objects.filter(slug=slug).first() return render(request, 'wallet_api/edit_user.html', {'user': user}) def disable_user(request, slug): user = PasswordEntry.objects.filter(slug=slug).values('id','username') print(user.first()) # Comando per creare un utente command = { "commands": [ { "command": "disableClient", "username": user.first()["username"] } ] } # Invia il comando a Mosquitto mqtt_service = MosquittoDynamicSecurity() response = mqtt_service.send_command(command) if "error" not in response["responses"][0]: PasswordEntry.objects.update( status='disabled' ) messages.success(request, 'Utente disabilitato con successo!') # Messaggio di successo if request.htmx: return render(request, 'partials/enable_user.html') return redirect('list_users') # Reindirizza alla lista degli utenti else: messages.warning(request, f'Errore durante la disabilitazione dell\'utente: {response["responses"][0]["error"]}') # Messaggio di errore return redirect('list_users') # Reindirizza alla lista degli utenti def enable_user(request, slug): user = PasswordEntry.objects.filter(slug=slug).values('id','username') print(user.first()) # Comando per creare un utente command = { "commands": [ { "command": "enableClient", "username": user.first()["username"] } ] } # Invia il comando a Mosquitto mqtt_service = MosquittoDynamicSecurity() response = mqtt_service.send_command(command) if "error" not in response["responses"][0]: PasswordEntry.objects.update( status='enabled' ) messages.success(request, 'Utente abilitato con successo!') # Messaggio di successo return redirect('list_users') # Reindirizza alla lista degli utenti else: messages.warning(request, f'Errore durante la abilitazione dell\'utente: {response["responses"][0]["error"]}') # Messaggio di errore return redirect('list_users') # Reindirizza alla lista degli utenti def view_role(request, role): return render(request, 'wallet_api/role_info.html', {'role': role }) @csrf_exempt def get_password_api(request): if request.method == 'POST': data = json.loads(request.body) master_password = data.get('master_password') site = data.get('site') if not authenticate(master_password): return JsonResponse({"error": "Master password errata"}, status=403) key = derive_key(master_password) try: entry = PasswordEntry.objects.get(site=site) decrypted_password = decrypt_password(entry.password, key) return JsonResponse({ "site": entry.site, "username": entry.username, "password": decrypted_password, "client_id": entry.client_id, "topic": entry.topic }) except PasswordEntry.DoesNotExist: return JsonResponse({"error": "Sito non trovato"}, status=404) @csrf_exempt def list_sites_api(request): if request.method == 'POST': data = json.loads(request.body) master_password = data.get('master_password') if not authenticate(master_password): return JsonResponse({"error": "Master password errata"}, status=403) sites = PasswordEntry.objects.values_list('site', flat=True) return JsonResponse({"sites": list(sites)}) @csrf_exempt def add_password_api(request): if request.method == 'POST': data = json.loads(request.body) master_password = data.get('master_password') site = data.get('site') username = data.get('username') password = data.get('password') client_id = data.get('client_id') topic = data.get('topic') if not authenticate(master_password): return JsonResponse({"error": "Master password errata"}, status=403) key = derive_key(master_password) encrypted_password = encrypt_password(password, key) PasswordEntry.objects.create( site=site, username=username, password=encrypted_password, client_id=client_id, topic=topic ) return JsonResponse({"message": "Password aggiunta con successo"})