Files
web-app-python/app/api/auth.py
2025-10-20 19:10:08 +02:00

131 lines
4.2 KiB
Python

from datetime import timedelta
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.database import get_db
from app.core.security import verify_password, create_access_token, decode_access_token
from app.models import Utente
from app.schemas.auth import Token, LoginRequest, RegisterFCMToken
router = APIRouter(prefix="/auth", tags=["Authentication"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Session = Depends(get_db)
) -> Utente:
"""Dependency per ottenere l'utente corrente dal token JWT"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Credenziali non valide",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_access_token(token)
if payload is None:
raise credentials_exception
email: str = payload.get("sub")
if email is None:
raise credentials_exception
user = db.query(Utente).filter(Utente.email == email).first()
if user is None:
raise credentials_exception
if not user.attivo:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Utente non attivo"
)
return user
@router.post("/token", response_model=Token)
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Session = Depends(get_db)
):
"""Endpoint per login con OAuth2 password flow"""
user = db.query(Utente).filter(Utente.email == form_data.username).first()
if not user or not verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email o password non corretti",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.attivo:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Utente non attivo"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "cliente_id": user.cliente_id},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/login", response_model=Token)
async def login_json(login_data: LoginRequest, db: Session = Depends(get_db)):
"""Endpoint per login con JSON"""
user = db.query(Utente).filter(Utente.email == login_data.email).first()
if not user or not verify_password(login_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email o password non corretti"
)
if not user.attivo:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Utente non attivo"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "cliente_id": user.cliente_id},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/register-fcm-token")
async def register_fcm_token(
token_data: RegisterFCMToken,
current_user: Annotated[Utente, Depends(get_current_user)],
db: Session = Depends(get_db)
):
"""Registra o aggiorna il FCM token per l'utente corrente"""
current_user.fcm_token = token_data.fcm_token
db.commit()
return {"message": "FCM token registrato con successo"}
@router.get("/me")
async def get_me(current_user: Annotated[Utente, Depends(get_current_user)]):
"""Restituisce le informazioni dell'utente corrente"""
return {
"id": current_user.id,
"email": current_user.email,
"nome": current_user.nome,
"cognome": current_user.cognome,
"ruolo": current_user.ruolo,
"cliente_id": current_user.cliente_id,
}