Spaces:
No application file
No application file
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from fastapi import HTTPException, Security | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| import jwt | |
| from app.config import settings | |
| security = HTTPBearer() | |
| class AuthHandler: | |
| """JWT-based authentication handler""" | |
| def __init__(self): | |
| self.secret_key = settings.API_SECRET_KEY | |
| self.algorithm = settings.ALGORITHM | |
| self.token_expiry = settings.ACCESS_TOKEN_EXPIRE_MINUTES | |
| def create_token(self, client_id: str) -> str: | |
| """Create JWT token for authenticated clients""" | |
| expire = datetime.utcnow() + timedelta(minutes=self.token_expiry) | |
| payload = { | |
| "sub": client_id, | |
| "exp": expire, | |
| "iat": datetime.utcnow(), | |
| "type": "access" | |
| } | |
| return jwt.encode(payload, self.secret_key, algorithm=self.algorithm) | |
| def verify_token(self, credentials: HTTPAuthorizationCredentials = Security(security)) -> str: | |
| """Verify JWT token and return client_id""" | |
| token = credentials.credentials | |
| try: | |
| payload = jwt.decode( | |
| token, | |
| self.secret_key, | |
| algorithms=[self.algorithm] | |
| ) | |
| # Validate token type | |
| if payload.get("type") != "access": | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid token type" | |
| ) | |
| # Check expiration | |
| exp = datetime.fromtimestamp(payload.get("exp", 0)) | |
| if exp < datetime.utcnow(): | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Token has expired" | |
| ) | |
| return payload.get("sub", "anonymous") | |
| except jwt.ExpiredSignatureError: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Token has expired" | |
| ) | |
| except jwt.InvalidTokenError: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid token" | |
| ) | |
| auth_handler = AuthHandler() |