File size: 3,745 Bytes
d3a7520
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""่ฎค่ฏไธŽ Session ็ฎก็†ๆจกๅ—ใ€‚"""

from __future__ import annotations

import logging
import os
import secrets
import uuid
from typing import Optional

import bcrypt
from fastapi import HTTPException, Request
from itsdangerous import BadSignature, SignatureExpired, TimestampSigner

# ไฟๆŒไธŽๅŽŸๅฎž็Žฐไธ€่‡ด็š„ๅธธ้‡ๅ’Œๆ—ฅๅฟ—ๅ็งฐ
logger = logging.getLogger("gateway")


# โ”€โ”€ Session ้…็ฝฎ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
SESSION_COOKIE = "gw_session"
SESSION_MAX_AGE = 86400  # 24 hours

SECRET_KEY = os.environ.get("SESSION_SECRET") or secrets.token_hex(32)
signer = TimestampSigner(SECRET_KEY)

INTERNAL_KEY_SALT = (os.environ.get("INTERNAL_KEY_SALT") or SECRET_KEY).strip()


# โ”€โ”€ ็”จๆˆทๅŠ ่ฝฝไธŽ่ฎค่ฏ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _load_users() -> dict[str, str]:
    """ไปŽ BASIC_AUTH_USERS ๅŠ ่ฝฝ็”จๆˆทๅๅฏ†็ ใ€‚"""
    raw = os.environ.get("BASIC_AUTH_USERS", "").replace("\\n", "\n")
    users: dict[str, str] = {}
    for line in raw.splitlines():
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        if ":" not in line:
            logger.warning("Skipping invalid BASIC_AUTH_USERS line (no colon)")
            continue
        username, password = line.split(":", 1)
        username = username.strip()
        password = password.strip()
        if username and password:
            users[username] = password
    if not users:
        logger.error("No valid users found โ€” authentication will always fail")
    return users


USERS = _load_users()
INTERNAL_KEY_NAMESPACE = uuid.uuid5(uuid.NAMESPACE_DNS, INTERNAL_KEY_SALT)


def _make_internal_api_key(username: str) -> str:
    """ๅŸบไบŽ็”จๆˆทๅ็”Ÿๆˆ็จณๅฎšๅ†…้ƒจ Key๏ผˆไป…ๆœๅŠก็ซฏไฝฟ็”จ๏ผ‰ใ€‚"""
    value = uuid.uuid5(INTERNAL_KEY_NAMESPACE, username)
    return f"sk-{value}"


INTERNAL_KEY_TO_USER = {
    _make_internal_api_key(username): username for username in USERS.keys()
}


def _verify_credentials(username: str, password: str) -> bool:
    """้ชŒ่ฏ็”จๆˆทๅๅฏ†็ ๏ผŒๆ”ฏๆŒๆ˜Žๆ–‡ไธŽ bcryptใ€‚"""
    stored = USERS.get(username)
    if stored is None:
        return False
    if stored.startswith("$2"):
        return bcrypt.checkpw(password.encode(), stored.encode())
    return secrets.compare_digest(stored, password)


# โ”€โ”€ Session โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _make_session(username: str) -> str:
    """็”Ÿๆˆ็ญพๅ็š„ Session tokenใ€‚"""
    return signer.sign(username).decode()


def _verify_session(token: str) -> Optional[str]:
    """้ชŒ่ฏ Session token๏ผŒ่ฟ”ๅ›ž็”จๆˆทๅๆˆ– Noneใ€‚"""
    try:
        username = signer.unsign(token, max_age=SESSION_MAX_AGE).decode()
    except (BadSignature, SignatureExpired):
        return None
    if username not in USERS:
        return None
    return username


def _get_session_user(request: Request) -> Optional[str]:
    """ไปŽ่ฏทๆฑ‚ Cookie ไธญ่งฃๆžๅฝ“ๅ‰็”จๆˆทใ€‚"""
    token = request.cookies.get(SESSION_COOKIE)
    return _verify_session(token) if token else None


def _require_user(request: Request) -> str:
    """FastAPI ไพ่ต–๏ผš่ฆๆฑ‚็”จๆˆทๅทฒ็™ปๅฝ•๏ผŒๅฆๅˆ™ๆŠ›ๅ‡บ 401ใ€‚"""
    username = _get_session_user(request)
    if not username:
        raise HTTPException(status_code=401, detail="Unauthorized")
    return username