from datetime import datetime, timedelta from typing import Optional from fastapi import HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import jwt from app.config import settings security = HTTPBearer() class AuthHandler: """JWT-based authentication handler""" def __init__(self): self.secret_key = settings.API_SECRET_KEY self.algorithm = settings.ALGORITHM self.token_expiry = settings.ACCESS_TOKEN_EXPIRE_MINUTES def create_token(self, client_id: str) -> str: """Create JWT token for authenticated clients""" expire = datetime.utcnow() + timedelta(minutes=self.token_expiry) payload = { "sub": client_id, "exp": expire, "iat": datetime.utcnow(), "type": "access" } return jwt.encode(payload, self.secret_key, algorithm=self.algorithm) def verify_token(self, credentials: HTTPAuthorizationCredentials = Security(security)) -> str: """Verify JWT token and return client_id""" token = credentials.credentials try: payload = jwt.decode( token, self.secret_key, algorithms=[self.algorithm] ) # Validate token type if payload.get("type") != "access": raise HTTPException( status_code=401, detail="Invalid token type" ) # Check expiration exp = datetime.fromtimestamp(payload.get("exp", 0)) if exp < datetime.utcnow(): raise HTTPException( status_code=401, detail="Token has expired" ) return payload.get("sub", "anonymous") except jwt.ExpiredSignatureError: raise HTTPException( status_code=401, detail="Token has expired" ) except jwt.InvalidTokenError: raise HTTPException( status_code=401, detail="Invalid token" ) auth_handler = AuthHandler()