302 lines
12 KiB
Python
302 lines
12 KiB
Python
from django.shortcuts import render, redirect, get_object_or_404
|
|
from django.contrib import messages
|
|
from django.http import JsonResponse, HttpResponse
|
|
from django.views.decorators.csrf import csrf_exempt, csrf_protect
|
|
from django.contrib.auth.decorators import login_required
|
|
|
|
import json
|
|
import paho.mqtt.client as mqtt
|
|
import paho.mqtt.publish as publish
|
|
|
|
from dotenv import dotenv_values
|
|
|
|
from .models import PasswordEntry
|
|
from .utils import authenticate, derive_key, encrypt_password, decrypt_password
|
|
from .mqtt_service import MosquittoDynamicSecurity
|
|
|
|
|
|
config = dotenv_values(".env")
|
|
|
|
@csrf_exempt
|
|
def list_sites_api(request):
|
|
if request.method == 'POST':
|
|
data = json.loads(request.body)
|
|
master_password = data.get('master_password')
|
|
|
|
if not authenticate(master_password):
|
|
return JsonResponse({"error": "Master password errata"}, status=403)
|
|
|
|
sites = PasswordEntry.objects.values_list('site', flat=True)
|
|
return JsonResponse({"sites": list(sites)})
|
|
return JsonResponse({"error": "Richiesta non valida"}, status=400)
|
|
|
|
@csrf_exempt
|
|
def get_password_api(request):
|
|
if request.method == 'POST':
|
|
data = json.loads(request.body)
|
|
master_password = data.get('master_password')
|
|
site = data.get('site')
|
|
|
|
if not authenticate(master_password):
|
|
return JsonResponse({"error": "Master password errata"}, status=403)
|
|
|
|
key = derive_key(master_password)
|
|
try:
|
|
entry = PasswordEntry.objects.get(site=site)
|
|
decrypted_password = decrypt_password(entry.password, key)
|
|
return JsonResponse({
|
|
"site": entry.site,
|
|
"username": entry.username,
|
|
"password": decrypted_password,
|
|
"client_id": entry.client_id,
|
|
"topic": entry.topic
|
|
})
|
|
except PasswordEntry.DoesNotExist:
|
|
return JsonResponse({"error": "Sito non trovato"}, status=404)
|
|
return JsonResponse({"error": "Richiesta non valida"}, status=400)
|
|
|
|
@login_required
|
|
def home_view(request):
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
#return render(request, 'home.html')
|
|
|
|
@login_required
|
|
def publish_message(request):
|
|
request_data = json.loads(request.body)
|
|
publish.single(topic=request_data['topic'],
|
|
payload=json.dumps(request_data['msg']),
|
|
hostname=config['MQTT_HOST'],
|
|
port=int(config['MQTT_PORT']),
|
|
keepalive=int(config['MQTT_KEEPALIVE']),
|
|
auth={'username': config['MQTT_USER'], 'password': config['MQTT_PASSWORD']},
|
|
protocol=mqtt.MQTTv5)
|
|
return JsonResponse({'request_data': request_data})
|
|
|
|
@login_required
|
|
def list_users(request):
|
|
users = PasswordEntry.objects.all()
|
|
return render(request, 'wallet_api/list_users.html', {'users': users, 'ase_adm_group': request.user.groups.filter(name='ase_admin').exists()})
|
|
|
|
@login_required
|
|
def create_user(request):
|
|
if request.user.groups.filter(name='ase_admin').exists():
|
|
if request.method == 'POST':
|
|
site = request.POST.get('site')
|
|
username = request.POST.get('username')
|
|
client_id = request.POST.get('clientId')
|
|
topic = request.POST.get('topic')
|
|
password = request.POST.get('password')
|
|
|
|
# Comando per creare un utente
|
|
command = {
|
|
"commands":
|
|
[
|
|
{
|
|
"command": "createClient",
|
|
"username": username,
|
|
"password": password
|
|
}
|
|
],
|
|
"commands": [
|
|
{
|
|
"command": "createRole",
|
|
"rolename": f'{username}_role',
|
|
"textname": "",
|
|
"textdescription": "",
|
|
"acls": [
|
|
{ "acltype": "publishClientSend", "topic": topic, "priority": 0, "allow": True }
|
|
]
|
|
},
|
|
{
|
|
"command": "createRole",
|
|
"rolename": f'{username}_ase_role',
|
|
"textname": "",
|
|
"textdescription": "",
|
|
"acls": [
|
|
{ "acltype": "publishClientSend", "topic": topic, "priority": 0, "allow": True },
|
|
{ "acltype": "publishClientReceive", "topic": topic, "priority": 0, "allow": True },
|
|
{ "acltype": "subscribeLiteral", "topic": topic, "priority": 0, "allow": True },
|
|
{ "acltype": "subscribePattern", "topic": topic, "priority": 0, "allow": True },
|
|
{ "acltype": "unsubscribeLiteral", "topic": topic, "priority": 0, "allow": True },
|
|
{ "acltype": "unsubscribePattern", "topic": topic, "priority": 0, "allow": True }
|
|
]
|
|
},
|
|
{
|
|
"command": "createClient",
|
|
"username": username,
|
|
"password": password,
|
|
"clientid": client_id,
|
|
"textname": f'{username} subscriber',
|
|
"textdescription": f'{username} subscriber',
|
|
"groups": [
|
|
],
|
|
"roles": [
|
|
{ "rolename": f'{username}_role', "priority": 0 }
|
|
]
|
|
},
|
|
{
|
|
"command": "createClient",
|
|
"username": f'{username}_ase',
|
|
"password": f'{password}{config["MQTT_PWDX"]}',
|
|
"clientid": f'{client_id}_ase',
|
|
"textname": f'{username} ASE subscriber',
|
|
"textdescription": f'{username} ASE subscriber',
|
|
"groups": [
|
|
],
|
|
"roles": [
|
|
{ "rolename": f'{username}_ase_role', "priority": 0 }
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
# Invia il comando a Mosquitto
|
|
mqtt_service = MosquittoDynamicSecurity()
|
|
response = mqtt_service.send_command(command)
|
|
if "error" not in response["responses"][0]:
|
|
|
|
key = derive_key(config['MASTER_PASSWORD'])
|
|
encrypted_password = encrypt_password(password, key)
|
|
encrypted_password_ase = encrypt_password(f'{password}_ase', key)
|
|
|
|
PasswordEntry.objects.create(
|
|
site=site,
|
|
username=username,
|
|
password=encrypted_password,
|
|
client_id=client_id,
|
|
topic=topic,
|
|
status='enabled'
|
|
)
|
|
|
|
PasswordEntry.objects.create(
|
|
site=f'{site}_ase',
|
|
username=f'{username}_ase',
|
|
password=encrypted_password_ase,
|
|
client_id=f'{client_id}_ase',
|
|
topic=topic,
|
|
status='enabled'
|
|
)
|
|
|
|
messages.success(request, 'Utente creato con successo!') # Messaggio di successo
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
else:
|
|
messages.warning(request, f'Errore durante la creazione dell\'utente: {response["responses"][0]["error"]}') # Messaggio di errore
|
|
return render(request, 'wallet_api/create_user.html', {'response': response}) # Rimani sulla stessa vista
|
|
|
|
# Se la richiesta non è POST, mostra il form di creazione utente
|
|
return render(request, 'wallet_api/create_user.html')
|
|
else:
|
|
messages.success(request, 'Non hai i permessi per creare utenti MQTT!') # Messaggio di successo
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
|
|
@login_required
|
|
def edit_user(request, slug):
|
|
if request.user.groups.filter(name='ase_admin').exists():
|
|
user = PasswordEntry.objects.filter(slug=slug).first()
|
|
return render(request, 'wallet_api/edit_user.html', {'user': user})
|
|
else:
|
|
messages.success(request, 'Non hai i permessi per cancellare utenti MQTT!') # Messaggio di successo
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
|
|
@login_required
|
|
def delete_user(request, slug):
|
|
if request.user.groups.filter(name='ase_admin').exists():
|
|
user = PasswordEntry.objects.filter(slug=slug).first()
|
|
command = {
|
|
"commands":
|
|
[
|
|
{
|
|
"command": "deleteClient",
|
|
"username": user.username
|
|
},
|
|
{
|
|
"command": "deleteRole",
|
|
"rolename": f'{user.username}_role'
|
|
}
|
|
]
|
|
}
|
|
|
|
# Invia il comando a Mosquitto
|
|
mqtt_service = MosquittoDynamicSecurity()
|
|
response = mqtt_service.send_command(command)
|
|
if "error" not in response["responses"][0]:
|
|
result = PasswordEntry.objects.filter(id=user.id).delete()
|
|
print(result)
|
|
messages.success(request, f'Utente {user.username} eliminato!') # Messaggio di successo
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
else:
|
|
messages.success(request, 'Non hai i permessi per cancellare utenti MQTT!') # Messaggio di successo
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
|
|
@login_required
|
|
def disable_user(request, slug):
|
|
if request.user.groups.filter(name='ase_admin').exists():
|
|
user = PasswordEntry.objects.filter(slug=slug).values('id','username')
|
|
# Comando per creare un utente
|
|
command = {
|
|
"commands":
|
|
[
|
|
{
|
|
"command": "disableClient",
|
|
"username": user.first()["username"]
|
|
}
|
|
]
|
|
}
|
|
|
|
# Invia il comando a Mosquitto
|
|
mqtt_service = MosquittoDynamicSecurity()
|
|
response = mqtt_service.send_command(command)
|
|
if "error" not in response["responses"][0]:
|
|
|
|
PasswordEntry.objects.filter(id=int(user.first()["id"])).update(status='disabled')
|
|
|
|
messages.success(request, 'Utente disabilitato con successo!') # Messaggio di successo
|
|
if request.htmx:
|
|
return render(request, 'partials/enable_user.html')
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
else:
|
|
messages.warning(request, f'Errore durante la disabilitazione dell\'utente: {response["responses"][0]["error"]}') # Messaggio di errore
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
else:
|
|
messages.success(request, 'Non hai i permessi per disabilitare utenti MQTT!') # Messaggio di successo
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
|
|
@login_required
|
|
def enable_user(request, slug):
|
|
if request.user.groups.filter(name='ase_admin').exists():
|
|
user = PasswordEntry.objects.filter(slug=slug).values('id','username')
|
|
# Comando per creare un utente
|
|
command = {
|
|
"commands":
|
|
[
|
|
{
|
|
"command": "enableClient",
|
|
"username": user.first()["username"]
|
|
}
|
|
]
|
|
}
|
|
|
|
# Invia il comando a Mosquitto
|
|
mqtt_service = MosquittoDynamicSecurity()
|
|
response = mqtt_service.send_command(command)
|
|
if "error" not in response["responses"][0]:
|
|
|
|
PasswordEntry.objects.filter(id=int(user.first()["id"])).update(status='enabled')
|
|
|
|
messages.success(request, 'Utente abilitato con successo!') # Messaggio di successo
|
|
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
else:
|
|
messages.warning(request, f'Errore durante la abilitazione dell\'utente: {response["responses"][0]["error"]}') # Messaggio di errore
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
else:
|
|
messages.success(request, 'Non hai i permessi per disabilitare utenti MQTT!') # Messaggio di successo
|
|
return redirect('list_users') # Reindirizza alla lista degli utenti
|
|
|
|
@login_required
|
|
def view_role(request, role):
|
|
return render(request, 'wallet_api/role_info.html', {'role': role })
|
|
|
|
|
|
|