# src/core/message_processor.py import json import time import base64 import uuid import requests from curl_cffi import requests as curl_requests from src.core.logger import logger # 从新的位置导入 logger from src.core.utils import Utils # 从新的位置导入 Utils from app import CONFIG, DEFAULT_HEADERS # 导入 CONFIG 和 DEFAULT_HEADERS class MessageProcessor: @staticmethod def create_chat_response(message, model, is_stream=False): base_response = { "id": f"chatcmpl-{uuid.uuid4()}", "created": int(time.time()), "model": model } if is_stream: return { **base_response, "object": "chat.completion.chunk", "choices": [{ "index": 0, "delta": { "content": message } }] } return { **base_response, "object": "chat.completion", "choices": [{ "index": 0, "message": { "role": "assistant", "content": message }, "finish_reason": "stop" }], "usage": None } def process_model_response(response, model): result = {"token": None, "imageUrl": None} if CONFIG["IS_IMG_GEN"]: if response.get("cachedImageGenerationResponse") and not CONFIG["IS_IMG_GEN2"]: result["imageUrl"] = response["cachedImageGenerationResponse"]["imageUrl"] return result if model == 'grok-2': result["token"] = response.get("token") elif model in ['grok-2-search', 'grok-3-search']: if response.get("webSearchResults") and CONFIG["ISSHOW_SEARCH_RESULTS"]: result["token"] = f"\r\n{Utils.organize_search_results(response['webSearchResults'])}\r\n" else: result["token"] = response.get("token") elif model == 'grok-3': result["token"] = response.get("token") elif model == 'grok-3-deepsearch': if response.get("messageStepId") and not CONFIG["SHOW_THINKING"]: return result if response.get("messageStepId") and not CONFIG["IS_THINKING"]: result["token"] = "" + response.get("token", "") CONFIG["IS_THINKING"] = True elif not response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "final": result["token"] = "" + response.get("token", "") CONFIG["IS_THINKING"] = False elif (response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "assistant") or response.get("messageTag") == "final": result["token"] = response.get("token") elif model == 'grok-3-reasoning': if response.get("isThinking") and not CONFIG["SHOW_THINKING"]: return result if response.get("isThinking") and not CONFIG["IS_THINKING"]: result["token"] = "" + response.get("token", "") CONFIG["IS_THINKING"] = True elif not response.get("isThinking") and CONFIG["IS_THINKING"]: result["token"] = "" + response.get("token", "") CONFIG["IS_THINKING"] = False else: result["token"] = response.get("token") return result @staticmethod # 将方法改为静态方法 def handle_image_response(image_url): max_retries = 2 retry_count = 0 image_base64_response = None while retry_count < max_retries: try: proxy_options = Utils.get_proxy_options() image_base64_response = curl_requests.get( f"https://assets.grok.com/{image_url}", headers={ **DEFAULT_HEADERS, "Cookie":CONFIG["SERVER"]['COOKIE'] }, impersonate="chrome133a", **proxy_options ) if image_base64_response.status_code == 200: break retry_count += 1 if retry_count == max_retries: raise Exception(f"上游服务请求失败! status: {image_base64_response.status_code}") time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count) except Exception as error: logger.error(str(error), "Server") retry_count += 1 if retry_count == max_retries: raise time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count) image_buffer = image_base64_response.content if not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"]: base64_image = base64.b64encode(image_buffer).decode('utf-8') image_content_type = image_base64_response.headers.get('content-type', 'image/jpeg') return f"![image](data:{image_content_type};base64,{base64_image})" logger.info("开始上传图床", "Server") if CONFIG["API"]["PICGO_KEY"]: files = {'source': ('image.jpg', image_buffer, 'image/jpeg')} headers = { "X-API-Key": CONFIG["API"]["PICGO_KEY"] } response_url = requests.post( "https://www.picgo.net/api/1/upload", files=files, headers=headers ) if response_url.status_code != 200: return "生图失败,请查看PICGO图床密钥是否设置正确" else: logger.info("生图成功", "Server") result = response_url.json() return f"![image]({result['image']['url']})" elif CONFIG["API"]["TUMY_KEY"]: files = {'file': ('image.jpg', image_buffer, 'image/jpeg')} headers = { "Accept": "application/json", 'Authorization': f"Bearer {CONFIG['API']['TUMY_KEY']}" } response_url = requests.post( "https://tu.my/api/v1/upload", files=files, headers=headers ) if response_url.status_code != 200: return "生图失败,请查看TUMY图床密钥是否设置正确" else: try: result = response_url.json() logger.info("生图成功", "Server") return f"![image]({result['data']['links']['url']})" except Exception as error: logger.error(str(error), "Server") return "生图失败,请查看TUMY图床密钥是否设置正确" def handle_non_stream_response(response, model): try: logger.info("开始处理非流式响应", "Server") stream = response.iter_lines() full_response = "" CONFIG["IS_THINKING"] = False CONFIG["IS_IMG_GEN"] = False CONFIG["IS_IMG_GEN2"] = False for chunk in stream: if not chunk: continue try: line_json = json.loads(chunk.decode("utf-8").strip()) if line_json.get("error"): logger.error(json.dumps(line_json, indent=2), "Server") yield json.dumps({"error": "RateLimitError"}) + "\n\n" return response_data = line_json.get("result", {}).get("response") if not response_data: continue if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"): CONFIG["IS_IMG_GEN"] = True result = MessageProcessor.process_model_response(response_data, model) # 使用类名调用静态方法 if result["token"]: full_response += result["token"] if result["imageUrl"]: CONFIG["IS_IMG_GEN2"] = True return MessageProcessor.handle_image_response(result["imageUrl"]) # 使用类名调用静态方法 except json.JSONDecodeError: continue except Exception as e: logger.error(f"处理流式响应行时出错: {str(e)}", "Server") continue return full_response except Exception as error: logger.error(str(error), "Server") raise def handle_stream_response(response, model): def generate(): logger.info("开始处理流式响应", "Server") stream = response.iter_lines() CONFIG["IS_THINKING"] = False CONFIG["IS_IMG_GEN"] = False CONFIG["IS_IMG_GEN2"] = False for chunk in stream: if not chunk: continue try: line_json = json.loads(chunk.decode("utf-8").strip()) if line_json.get("error"): logger.error(json.dumps(line_json, indent=2), "Server") yield json.dumps({"error": "RateLimitError"}) + "\n\n" return response_data = line_json.get("result", {}).get("response") if not response_data: continue if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"): CONFIG["IS_IMG_GEN"] = True result = MessageProcessor.process_model_response(response_data, model) # 使用类名调用静态方法 if result["token"]: yield f"data: {json.dumps(MessageProcessor.create_chat_response(result['token'], model, True))}\n\n" if result["imageUrl"]: CONFIG["IS_IMG_GEN2"] = True image_data = MessageProcessor.handle_image_response(result["imageUrl"]) # 使用类名调用静态方法 yield f"data: {json.dumps(MessageProcessor.create_chat_response(image_data, model, True))}\n\n" except json.JSONDecodeError: continue except Exception as e: logger.error(f"处理流式响应行时出错: {str(e)}", "Server") continue yield "data: [DONE]\n\n" return generate()