g2api-test / src /core /message_processor.py
misonL's picture
Fix: Refactor image handling and improve table styling
98a3470
# src/core/message_processor.py
import json
import time
import base64
import uuid
import requests
from curl_cffi import requests as curl_requests
from src.core.logger import logger # 从新的位置导入 logger
from src.core.utils import Utils # 从新的位置导入 Utils
from app import CONFIG, DEFAULT_HEADERS # 导入 CONFIG 和 DEFAULT_HEADERS
class MessageProcessor:
@staticmethod
def create_chat_response(message, model, is_stream=False):
base_response = {
"id": f"chatcmpl-{uuid.uuid4()}",
"created": int(time.time()),
"model": model
}
if is_stream:
return {
**base_response,
"object": "chat.completion.chunk",
"choices": [{
"index": 0,
"delta": {
"content": message
}
}]
}
return {
**base_response,
"object": "chat.completion",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": message
},
"finish_reason": "stop"
}],
"usage": None
}
def process_model_response(response, model):
result = {"token": None, "imageUrl": None}
if CONFIG["IS_IMG_GEN"]:
if response.get("cachedImageGenerationResponse") and not CONFIG["IS_IMG_GEN2"]:
result["imageUrl"] = response["cachedImageGenerationResponse"]["imageUrl"]
return result
if model == 'grok-2':
result["token"] = response.get("token")
elif model in ['grok-2-search', 'grok-3-search']:
if response.get("webSearchResults") and CONFIG["ISSHOW_SEARCH_RESULTS"]:
result["token"] = f"\r\n{Utils.organize_search_results(response['webSearchResults'])}\r\n"
else:
result["token"] = response.get("token")
elif model == 'grok-3':
result["token"] = response.get("token")
elif model == 'grok-3-deepsearch':
if response.get("messageStepId") and not CONFIG["SHOW_THINKING"]:
return result
if response.get("messageStepId") and not CONFIG["IS_THINKING"]:
result["token"] = "" + response.get("token", "")
CONFIG["IS_THINKING"] = True
elif not response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "final":
result["token"] = "" + response.get("token", "")
CONFIG["IS_THINKING"] = False
elif (response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "assistant") or response.get("messageTag") == "final":
result["token"] = response.get("token")
elif model == 'grok-3-reasoning':
if response.get("isThinking") and not CONFIG["SHOW_THINKING"]:
return result
if response.get("isThinking") and not CONFIG["IS_THINKING"]:
result["token"] = "" + response.get("token", "")
CONFIG["IS_THINKING"] = True
elif not response.get("isThinking") and CONFIG["IS_THINKING"]:
result["token"] = "" + response.get("token", "")
CONFIG["IS_THINKING"] = False
else:
result["token"] = response.get("token")
return result
@staticmethod # 将方法改为静态方法
def handle_image_response(image_url):
max_retries = 2
retry_count = 0
image_base64_response = None
while retry_count < max_retries:
try:
proxy_options = Utils.get_proxy_options()
image_base64_response = curl_requests.get(
f"https://assets.grok.com/{image_url}",
headers={
**DEFAULT_HEADERS,
"Cookie":CONFIG["SERVER"]['COOKIE']
},
impersonate="chrome133a",
**proxy_options
)
if image_base64_response.status_code == 200:
break
retry_count += 1
if retry_count == max_retries:
raise Exception(f"上游服务请求失败! status: {image_base64_response.status_code}")
time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count)
except Exception as error:
logger.error(str(error), "Server")
retry_count += 1
if retry_count == max_retries:
raise
time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count)
image_buffer = image_base64_response.content
if not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"]:
base64_image = base64.b64encode(image_buffer).decode('utf-8')
image_content_type = image_base64_response.headers.get('content-type', 'image/jpeg')
return f"![image](data:{image_content_type};base64,{base64_image})"
logger.info("开始上传图床", "Server")
if CONFIG["API"]["PICGO_KEY"]:
files = {'source': ('image.jpg', image_buffer, 'image/jpeg')}
headers = {
"X-API-Key": CONFIG["API"]["PICGO_KEY"]
}
response_url = requests.post(
"https://www.picgo.net/api/1/upload",
files=files,
headers=headers
)
if response_url.status_code != 200:
return "生图失败,请查看PICGO图床密钥是否设置正确"
else:
logger.info("生图成功", "Server")
result = response_url.json()
return f"![image]({result['image']['url']})"
elif CONFIG["API"]["TUMY_KEY"]:
files = {'file': ('image.jpg', image_buffer, 'image/jpeg')}
headers = {
"Accept": "application/json",
'Authorization': f"Bearer {CONFIG['API']['TUMY_KEY']}"
}
response_url = requests.post(
"https://tu.my/api/v1/upload",
files=files,
headers=headers
)
if response_url.status_code != 200:
return "生图失败,请查看TUMY图床密钥是否设置正确"
else:
try:
result = response_url.json()
logger.info("生图成功", "Server")
return f"![image]({result['data']['links']['url']})"
except Exception as error:
logger.error(str(error), "Server")
return "生图失败,请查看TUMY图床密钥是否设置正确"
def handle_non_stream_response(response, model):
try:
logger.info("开始处理非流式响应", "Server")
stream = response.iter_lines()
full_response = ""
CONFIG["IS_THINKING"] = False
CONFIG["IS_IMG_GEN"] = False
CONFIG["IS_IMG_GEN2"] = False
for chunk in stream:
if not chunk:
continue
try:
line_json = json.loads(chunk.decode("utf-8").strip())
if line_json.get("error"):
logger.error(json.dumps(line_json, indent=2), "Server")
yield json.dumps({"error": "RateLimitError"}) + "\n\n"
return
response_data = line_json.get("result", {}).get("response")
if not response_data:
continue
if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"):
CONFIG["IS_IMG_GEN"] = True
result = MessageProcessor.process_model_response(response_data, model) # 使用类名调用静态方法
if result["token"]:
full_response += result["token"]
if result["imageUrl"]:
CONFIG["IS_IMG_GEN2"] = True
return MessageProcessor.handle_image_response(result["imageUrl"]) # 使用类名调用静态方法
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"处理流式响应行时出错: {str(e)}", "Server")
continue
return full_response
except Exception as error:
logger.error(str(error), "Server")
raise
def handle_stream_response(response, model):
def generate():
logger.info("开始处理流式响应", "Server")
stream = response.iter_lines()
CONFIG["IS_THINKING"] = False
CONFIG["IS_IMG_GEN"] = False
CONFIG["IS_IMG_GEN2"] = False
for chunk in stream:
if not chunk:
continue
try:
line_json = json.loads(chunk.decode("utf-8").strip())
if line_json.get("error"):
logger.error(json.dumps(line_json, indent=2), "Server")
yield json.dumps({"error": "RateLimitError"}) + "\n\n"
return
response_data = line_json.get("result", {}).get("response")
if not response_data:
continue
if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"):
CONFIG["IS_IMG_GEN"] = True
result = MessageProcessor.process_model_response(response_data, model) # 使用类名调用静态方法
if result["token"]:
yield f"data: {json.dumps(MessageProcessor.create_chat_response(result['token'], model, True))}\n\n"
if result["imageUrl"]:
CONFIG["IS_IMG_GEN2"] = True
image_data = MessageProcessor.handle_image_response(result["imageUrl"]) # 使用类名调用静态方法
yield f"data: {json.dumps(MessageProcessor.create_chat_response(image_data, model, True))}\n\n"
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"处理流式响应行时出错: {str(e)}", "Server")
continue
yield "data: [DONE]\n\n"
return generate()