Spaces:
No application file
No application file
File size: 2,239 Bytes
28a1786 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | from datetime import datetime, timedelta
from typing import Optional
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from app.config import settings
security = HTTPBearer()
class AuthHandler:
"""JWT-based authentication handler"""
def __init__(self):
self.secret_key = settings.API_SECRET_KEY
self.algorithm = settings.ALGORITHM
self.token_expiry = settings.ACCESS_TOKEN_EXPIRE_MINUTES
def create_token(self, client_id: str) -> str:
"""Create JWT token for authenticated clients"""
expire = datetime.utcnow() + timedelta(minutes=self.token_expiry)
payload = {
"sub": client_id,
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, credentials: HTTPAuthorizationCredentials = Security(security)) -> str:
"""Verify JWT token and return client_id"""
token = credentials.credentials
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
# Validate token type
if payload.get("type") != "access":
raise HTTPException(
status_code=401,
detail="Invalid token type"
)
# Check expiration
exp = datetime.fromtimestamp(payload.get("exp", 0))
if exp < datetime.utcnow():
raise HTTPException(
status_code=401,
detail="Token has expired"
)
return payload.get("sub", "anonymous")
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail="Token has expired"
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=401,
detail="Invalid token"
)
auth_handler = AuthHandler() |