from django.core.management.base import BaseCommand, CommandError from wallet_api.models import PasswordEntry import json class Command(BaseCommand): help = "Check data alignament across DB and Dynamic Security mosquitto file" def add_arguments(self, parser): parser.add_argument( "dynsecjson", nargs="+", type=str, help="Mosquitto dynamic security json file", ) def handle(self, *args, **options): for dynsec in options["dynsecjson"]: try: with open(dynsec, "r") as dynsecconfig: config = json.load(dynsecconfig) for client in config['clients']: if client['username'] == 'admin': continue user_name = client['username'] try: password_entry = PasswordEntry.objects.get(username=user_name) if password_entry: self.stdout.write(self.style.SUCCESS(f'username {user_name} exists in the database.')) else: self.stdout.write(self.style.WARNING(f'Username {user_name} does not exist in the database.')) except PasswordEntry.DoesNotExist: self.stdout.write(self.style.WARNING(f'Username {user_name} does not exist in the database.')) for role in config['roles']: if role['rolename'] == 'admin': continue role_name = role['rolename'] try: password_entry = PasswordEntry.objects.get(role=role_name) if password_entry: self.stdout.write(self.style.SUCCESS(f'Role {role_name} exists in the database.')) acls_db = password_entry.acls acls_json = role.get('acls', []) for acltype_json in acls_json: if acltype_json['acltype'] not in [d['acltype'] for d in acls_db]: self.stdout.write(self.style.WARNING(f'ACL type {acltype_json["acltype"]} for role {role_name} does not exist in the database.')) else: self.stdout.write(self.style.WARNING(f'Role {role_name} does not exist in the database.')) except PasswordEntry.DoesNotExist: self.stdout.write(self.style.WARNING(f'Role {role_name} does not exist in the database.')) except Exception as e: self.stdout.write(self.style.ERROR(f'Check data alignament across DB and Dynamic Security mosquitto file {dynsec} - {e}'))