Spaces:
Paused
Paused
| # src/core/message_processor.py | |
| import json | |
| import time | |
| import base64 | |
| import uuid | |
| import requests | |
| from curl_cffi import requests as curl_requests | |
| from src.core.logger import logger # 从新的位置导入 logger | |
| from src.core.utils import Utils # 从新的位置导入 Utils | |
| from app import CONFIG, DEFAULT_HEADERS # 导入 CONFIG 和 DEFAULT_HEADERS | |
| class MessageProcessor: | |
| def create_chat_response(message, model, is_stream=False): | |
| base_response = { | |
| "id": f"chatcmpl-{uuid.uuid4()}", | |
| "created": int(time.time()), | |
| "model": model | |
| } | |
| if is_stream: | |
| return { | |
| **base_response, | |
| "object": "chat.completion.chunk", | |
| "choices": [{ | |
| "index": 0, | |
| "delta": { | |
| "content": message | |
| } | |
| }] | |
| } | |
| return { | |
| **base_response, | |
| "object": "chat.completion", | |
| "choices": [{ | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": message | |
| }, | |
| "finish_reason": "stop" | |
| }], | |
| "usage": None | |
| } | |
| def process_model_response(response, model): | |
| result = {"token": None, "imageUrl": None} | |
| if CONFIG["IS_IMG_GEN"]: | |
| if response.get("cachedImageGenerationResponse") and not CONFIG["IS_IMG_GEN2"]: | |
| result["imageUrl"] = response["cachedImageGenerationResponse"]["imageUrl"] | |
| return result | |
| if model == 'grok-2': | |
| result["token"] = response.get("token") | |
| elif model in ['grok-2-search', 'grok-3-search']: | |
| if response.get("webSearchResults") and CONFIG["ISSHOW_SEARCH_RESULTS"]: | |
| result["token"] = f"\r\n{Utils.organize_search_results(response['webSearchResults'])}\r\n" | |
| else: | |
| result["token"] = response.get("token") | |
| elif model == 'grok-3': | |
| result["token"] = response.get("token") | |
| elif model == 'grok-3-deepsearch': | |
| if response.get("messageStepId") and not CONFIG["SHOW_THINKING"]: | |
| return result | |
| if response.get("messageStepId") and not CONFIG["IS_THINKING"]: | |
| result["token"] = "" + response.get("token", "") | |
| CONFIG["IS_THINKING"] = True | |
| elif not response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "final": | |
| result["token"] = "" + response.get("token", "") | |
| CONFIG["IS_THINKING"] = False | |
| elif (response.get("messageStepId") and CONFIG["IS_THINKING"] and response.get("messageTag") == "assistant") or response.get("messageTag") == "final": | |
| result["token"] = response.get("token") | |
| elif model == 'grok-3-reasoning': | |
| if response.get("isThinking") and not CONFIG["SHOW_THINKING"]: | |
| return result | |
| if response.get("isThinking") and not CONFIG["IS_THINKING"]: | |
| result["token"] = "" + response.get("token", "") | |
| CONFIG["IS_THINKING"] = True | |
| elif not response.get("isThinking") and CONFIG["IS_THINKING"]: | |
| result["token"] = "" + response.get("token", "") | |
| CONFIG["IS_THINKING"] = False | |
| else: | |
| result["token"] = response.get("token") | |
| return result | |
| # 将方法改为静态方法 | |
| def handle_image_response(image_url): | |
| max_retries = 2 | |
| retry_count = 0 | |
| image_base64_response = None | |
| while retry_count < max_retries: | |
| try: | |
| proxy_options = Utils.get_proxy_options() | |
| image_base64_response = curl_requests.get( | |
| f"https://assets.grok.com/{image_url}", | |
| headers={ | |
| **DEFAULT_HEADERS, | |
| "Cookie":CONFIG["SERVER"]['COOKIE'] | |
| }, | |
| impersonate="chrome133a", | |
| **proxy_options | |
| ) | |
| if image_base64_response.status_code == 200: | |
| break | |
| retry_count += 1 | |
| if retry_count == max_retries: | |
| raise Exception(f"上游服务请求失败! status: {image_base64_response.status_code}") | |
| time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count) | |
| except Exception as error: | |
| logger.error(str(error), "Server") | |
| retry_count += 1 | |
| if retry_count == max_retries: | |
| raise | |
| time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * retry_count) | |
| image_buffer = image_base64_response.content | |
| if not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"]: | |
| base64_image = base64.b64encode(image_buffer).decode('utf-8') | |
| image_content_type = image_base64_response.headers.get('content-type', 'image/jpeg') | |
| return f"" | |
| logger.info("开始上传图床", "Server") | |
| if CONFIG["API"]["PICGO_KEY"]: | |
| files = {'source': ('image.jpg', image_buffer, 'image/jpeg')} | |
| headers = { | |
| "X-API-Key": CONFIG["API"]["PICGO_KEY"] | |
| } | |
| response_url = requests.post( | |
| "https://www.picgo.net/api/1/upload", | |
| files=files, | |
| headers=headers | |
| ) | |
| if response_url.status_code != 200: | |
| return "生图失败,请查看PICGO图床密钥是否设置正确" | |
| else: | |
| logger.info("生图成功", "Server") | |
| result = response_url.json() | |
| return f"" | |
| elif CONFIG["API"]["TUMY_KEY"]: | |
| files = {'file': ('image.jpg', image_buffer, 'image/jpeg')} | |
| headers = { | |
| "Accept": "application/json", | |
| 'Authorization': f"Bearer {CONFIG['API']['TUMY_KEY']}" | |
| } | |
| response_url = requests.post( | |
| "https://tu.my/api/v1/upload", | |
| files=files, | |
| headers=headers | |
| ) | |
| if response_url.status_code != 200: | |
| return "生图失败,请查看TUMY图床密钥是否设置正确" | |
| else: | |
| try: | |
| result = response_url.json() | |
| logger.info("生图成功", "Server") | |
| return f"" | |
| except Exception as error: | |
| logger.error(str(error), "Server") | |
| return "生图失败,请查看TUMY图床密钥是否设置正确" | |
| def handle_non_stream_response(response, model): | |
| try: | |
| logger.info("开始处理非流式响应", "Server") | |
| stream = response.iter_lines() | |
| full_response = "" | |
| CONFIG["IS_THINKING"] = False | |
| CONFIG["IS_IMG_GEN"] = False | |
| CONFIG["IS_IMG_GEN2"] = False | |
| for chunk in stream: | |
| if not chunk: | |
| continue | |
| try: | |
| line_json = json.loads(chunk.decode("utf-8").strip()) | |
| if line_json.get("error"): | |
| logger.error(json.dumps(line_json, indent=2), "Server") | |
| yield json.dumps({"error": "RateLimitError"}) + "\n\n" | |
| return | |
| response_data = line_json.get("result", {}).get("response") | |
| if not response_data: | |
| continue | |
| if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"): | |
| CONFIG["IS_IMG_GEN"] = True | |
| result = MessageProcessor.process_model_response(response_data, model) # 使用类名调用静态方法 | |
| if result["token"]: | |
| full_response += result["token"] | |
| if result["imageUrl"]: | |
| CONFIG["IS_IMG_GEN2"] = True | |
| return MessageProcessor.handle_image_response(result["imageUrl"]) # 使用类名调用静态方法 | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| logger.error(f"处理流式响应行时出错: {str(e)}", "Server") | |
| continue | |
| return full_response | |
| except Exception as error: | |
| logger.error(str(error), "Server") | |
| raise | |
| def handle_stream_response(response, model): | |
| def generate(): | |
| logger.info("开始处理流式响应", "Server") | |
| stream = response.iter_lines() | |
| CONFIG["IS_THINKING"] = False | |
| CONFIG["IS_IMG_GEN"] = False | |
| CONFIG["IS_IMG_GEN2"] = False | |
| for chunk in stream: | |
| if not chunk: | |
| continue | |
| try: | |
| line_json = json.loads(chunk.decode("utf-8").strip()) | |
| if line_json.get("error"): | |
| logger.error(json.dumps(line_json, indent=2), "Server") | |
| yield json.dumps({"error": "RateLimitError"}) + "\n\n" | |
| return | |
| response_data = line_json.get("result", {}).get("response") | |
| if not response_data: | |
| continue | |
| if response_data.get("doImgGen") or response_data.get("imageAttachmentInfo"): | |
| CONFIG["IS_IMG_GEN"] = True | |
| result = MessageProcessor.process_model_response(response_data, model) # 使用类名调用静态方法 | |
| if result["token"]: | |
| yield f"data: {json.dumps(MessageProcessor.create_chat_response(result['token'], model, True))}\n\n" | |
| if result["imageUrl"]: | |
| CONFIG["IS_IMG_GEN2"] = True | |
| image_data = MessageProcessor.handle_image_response(result["imageUrl"]) # 使用类名调用静态方法 | |
| yield f"data: {json.dumps(MessageProcessor.create_chat_response(image_data, model, True))}\n\n" | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| logger.error(f"处理流式响应行时出错: {str(e)}", "Server") | |
| continue | |
| yield "data: [DONE]\n\n" | |
| return generate() |