Spaces:
Paused
Paused
File size: 10,745 Bytes
df4585d 98a3470 df4585d 1a0d4d9 df4585d 98a3470 df4585d 1a0d4d9 df4585d 98a3470 df4585d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 | # src/core/message_processor.py
import json
import time
import base64
import uuid
import requests
from curl_cffi import requests as curl_requests
from src.core.logger import logger # 从新的位置导入 logger
from src.core.utils import Utils # 从新的位置导入 Utils
from app import CONFIG, DEFAULT_HEADERS # 导入 CONFIG 和 DEFAULT_HEADERS
class MessageProcessor:
@staticmethod
def create_chat_response(message, model, is_stream=False):
base_response = {
"id": f"chatcmpl-{uuid.uuid4()}",
"created": int(time.time()),
"model": model
}
if is_stream:
return {
**base_response,
"object": "chat.completion.chunk",
"choices": [{
"index": 0,
"delta": {
"content": message
}
}]
}
return {
**base_response,
"object": "chat.completion",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": message
},
"finish_reason": "stop"
}],
"usage": None
}
def process_model_response(response, model):
result = {"token": None, "imageUrl": None}
if CONFIG["IS_IMG_GEN"]:
if response.get("cachedImageGenerationResponse") and not CONFIG["IS_IMG_GEN2"]:
result["imageUrl"] = response["cachedImageGenerationResponse"]["imageUrl"]
return result
if model == 'grok-2':
result["token"] = response.get("token")
elif model in ['grok-2-search', 'grok-3-search']:
if response.get("webSearchResults") and CONFIG["ISSHOW_SEARCH_RESULTS"]:
result["token"] = f"\r\n{Utils.organize_search_results(response['webSearchResults'])}\r\n"
else:
result["token"] = response.get("token")
elif model == 'grok-3':
result["token"] = response.get("token")
elif model == 'grok-3-deepsearch':
if response.get("messageStepId") and not CONFIG["SHOW_THINKING"]:
return result
if response.get("messageStepId") and not CONFIG["IS_THINKING"]:
result["token"] = "" + response.get("token", "")
CONFIG["IS_THINKING"] = True
elif not response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "final":
result["token"] = "" + response.get("token", "")
CONFIG["IS_THINKING"] = False
elif (response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "assistant") or response.get("messageTag") == "final":
result["token"] = response.get("token")
elif model == 'grok-3-reasoning':
if response.get("isThinking") and not CONFIG["SHOW_THINKING"]:
return result
if response.get("isThinking") and not CONFIG["IS_THINKING"]:
result["token"] = "" + response.get("token", "")
CONFIG["IS_THINKING"] = True
elif not response.get("isThinking") and CONFIG["IS_THINKING"]:
result["token"] = "" + response.get("token", "")
CONFIG["IS_THINKING"] = False
else:
result["token"] = response.get("token")
return result
@staticmethod # 将方法改为静态方法
def handle_image_response(image_url):
max_retries = 2
retry_count = 0
image_base64_response = None
while retry_count < max_retries:
try:
proxy_options = Utils.get_proxy_options()
image_base64_response = curl_requests.get(
f"https://assets.grok.com/{image_url}",
headers={
**DEFAULT_HEADERS,
"Cookie":CONFIG["SERVER"]['COOKIE']
},
impersonate="chrome133a",
**proxy_options
)
if image_base64_response.status_code == 200:
break
retry_count += 1
if retry_count == max_retries:
raise Exception(f"上游服务请求失败! status: {image_base64_response.status_code}")
time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count)
except Exception as error:
logger.error(str(error), "Server")
retry_count += 1
if retry_count == max_retries:
raise
time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count)
image_buffer = image_base64_response.content
if not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"]:
base64_image = base64.b64encode(image_buffer).decode('utf-8')
image_content_type = image_base64_response.headers.get('content-type', 'image/jpeg')
return f""
logger.info("开始上传图床", "Server")
if CONFIG["API"]["PICGO_KEY"]:
files = {'source': ('image.jpg', image_buffer, 'image/jpeg')}
headers = {
"X-API-Key": CONFIG["API"]["PICGO_KEY"]
}
response_url = requests.post(
"https://www.picgo.net/api/1/upload",
files=files,
headers=headers
)
if response_url.status_code != 200:
return "生图失败,请查看PICGO图床密钥是否设置正确"
else:
logger.info("生图成功", "Server")
result = response_url.json()
return f""
elif CONFIG["API"]["TUMY_KEY"]:
files = {'file': ('image.jpg', image_buffer, 'image/jpeg')}
headers = {
"Accept": "application/json",
'Authorization': f"Bearer {CONFIG['API']['TUMY_KEY']}"
}
response_url = requests.post(
"https://tu.my/api/v1/upload",
files=files,
headers=headers
)
if response_url.status_code != 200:
return "生图失败,请查看TUMY图床密钥是否设置正确"
else:
try:
result = response_url.json()
logger.info("生图成功", "Server")
return f""
except Exception as error:
logger.error(str(error), "Server")
return "生图失败,请查看TUMY图床密钥是否设置正确"
def handle_non_stream_response(response, model):
try:
logger.info("开始处理非流式响应", "Server")
stream = response.iter_lines()
full_response = ""
CONFIG["IS_THINKING"] = False
CONFIG["IS_IMG_GEN"] = False
CONFIG["IS_IMG_GEN2"] = False
for chunk in stream:
if not chunk:
continue
try:
line_json = json.loads(chunk.decode("utf-8").strip())
if line_json.get("error"):
logger.error(json.dumps(line_json, indent=2), "Server")
yield json.dumps({"error": "RateLimitError"}) + "\n\n"
return
response_data = line_json.get("result", {}).get("response")
if not response_data:
continue
if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"):
CONFIG["IS_IMG_GEN"] = True
result = MessageProcessor.process_model_response(response_data, model) # 使用类名调用静态方法
if result["token"]:
full_response += result["token"]
if result["imageUrl"]:
CONFIG["IS_IMG_GEN2"] = True
return MessageProcessor.handle_image_response(result["imageUrl"]) # 使用类名调用静态方法
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"处理流式响应行时出错: {str(e)}", "Server")
continue
return full_response
except Exception as error:
logger.error(str(error), "Server")
raise
def handle_stream_response(response, model):
def generate():
logger.info("开始处理流式响应", "Server")
stream = response.iter_lines()
CONFIG["IS_THINKING"] = False
CONFIG["IS_IMG_GEN"] = False
CONFIG["IS_IMG_GEN2"] = False
for chunk in stream:
if not chunk:
continue
try:
line_json = json.loads(chunk.decode("utf-8").strip())
if line_json.get("error"):
logger.error(json.dumps(line_json, indent=2), "Server")
yield json.dumps({"error": "RateLimitError"}) + "\n\n"
return
response_data = line_json.get("result", {}).get("response")
if not response_data:
continue
if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"):
CONFIG["IS_IMG_GEN"] = True
result = MessageProcessor.process_model_response(response_data, model) # 使用类名调用静态方法
if result["token"]:
yield f"data: {json.dumps(MessageProcessor.create_chat_response(result['token'], model, True))}\n\n"
if result["imageUrl"]:
CONFIG["IS_IMG_GEN2"] = True
image_data = MessageProcessor.handle_image_response(result["imageUrl"]) # 使用类名调用静态方法
yield f"data: {json.dumps(MessageProcessor.create_chat_response(image_data, model, True))}\n\n"
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"处理流式响应行时出错: {str(e)}", "Server")
continue
yield "data: [DONE]\n\n"
return generate() |