File size: 3,745 Bytes
d3a7520 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | """่ฎค่ฏไธ Session ็ฎก็ๆจกๅใ"""
from __future__ import annotations
import logging
import os
import secrets
import uuid
from typing import Optional
import bcrypt
from fastapi import HTTPException, Request
from itsdangerous import BadSignature, SignatureExpired, TimestampSigner
# ไฟๆไธๅๅฎ็ฐไธ่ด็ๅธธ้ๅๆฅๅฟๅ็งฐ
logger = logging.getLogger("gateway")
# โโ Session ้
็ฝฎ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
SESSION_COOKIE = "gw_session"
SESSION_MAX_AGE = 86400 # 24 hours
SECRET_KEY = os.environ.get("SESSION_SECRET") or secrets.token_hex(32)
signer = TimestampSigner(SECRET_KEY)
INTERNAL_KEY_SALT = (os.environ.get("INTERNAL_KEY_SALT") or SECRET_KEY).strip()
# โโ ็จๆทๅ ่ฝฝไธ่ฎค่ฏ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def _load_users() -> dict[str, str]:
"""ไป BASIC_AUTH_USERS ๅ ่ฝฝ็จๆทๅๅฏ็ ใ"""
raw = os.environ.get("BASIC_AUTH_USERS", "").replace("\\n", "\n")
users: dict[str, str] = {}
for line in raw.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if ":" not in line:
logger.warning("Skipping invalid BASIC_AUTH_USERS line (no colon)")
continue
username, password = line.split(":", 1)
username = username.strip()
password = password.strip()
if username and password:
users[username] = password
if not users:
logger.error("No valid users found โ authentication will always fail")
return users
USERS = _load_users()
INTERNAL_KEY_NAMESPACE = uuid.uuid5(uuid.NAMESPACE_DNS, INTERNAL_KEY_SALT)
def _make_internal_api_key(username: str) -> str:
"""ๅบไบ็จๆทๅ็ๆ็จณๅฎๅ
้จ Key๏ผไป
ๆๅก็ซฏไฝฟ็จ๏ผใ"""
value = uuid.uuid5(INTERNAL_KEY_NAMESPACE, username)
return f"sk-{value}"
INTERNAL_KEY_TO_USER = {
_make_internal_api_key(username): username for username in USERS.keys()
}
def _verify_credentials(username: str, password: str) -> bool:
"""้ช่ฏ็จๆทๅๅฏ็ ๏ผๆฏๆๆๆไธ bcryptใ"""
stored = USERS.get(username)
if stored is None:
return False
if stored.startswith("$2"):
return bcrypt.checkpw(password.encode(), stored.encode())
return secrets.compare_digest(stored, password)
# โโ Session โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def _make_session(username: str) -> str:
"""็ๆ็ญพๅ็ Session tokenใ"""
return signer.sign(username).decode()
def _verify_session(token: str) -> Optional[str]:
"""้ช่ฏ Session token๏ผ่ฟๅ็จๆทๅๆ Noneใ"""
try:
username = signer.unsign(token, max_age=SESSION_MAX_AGE).decode()
except (BadSignature, SignatureExpired):
return None
if username not in USERS:
return None
return username
def _get_session_user(request: Request) -> Optional[str]:
"""ไป่ฏทๆฑ Cookie ไธญ่งฃๆๅฝๅ็จๆทใ"""
token = request.cookies.get(SESSION_COOKIE)
return _verify_session(token) if token else None
def _require_user(request: Request) -> str:
"""FastAPI ไพ่ต๏ผ่ฆๆฑ็จๆทๅทฒ็ปๅฝ๏ผๅฆๅๆๅบ 401ใ"""
username = _get_session_user(request)
if not username:
raise HTTPException(status_code=401, detail="Unauthorized")
return username
|