| import gradio as gr |
| import openai |
| import requests |
| import json |
| from typing import Dict, Any, List, Tuple |
| from datetime import datetime |
| import os |
|
|
| class MCPClient: |
| """MCP Client for communicating with the MCP server""" |
| |
| def __init__(self, server_url: str): |
| self.server_url = server_url.rstrip('/') |
| |
| def call_tool_sync(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: |
| """Synchronous tool call using requests instead of aiohttp""" |
| if arguments is None: |
| arguments = {} |
| |
| mcp_request = { |
| "jsonrpc": "2.0", |
| "id": 1, |
| "method": "tools/call", |
| "params": { |
| "name": tool_name, |
| "arguments": arguments |
| } |
| } |
| |
| try: |
| response = requests.post( |
| f"{self.server_url}/mcp", |
| json=mcp_request, |
| headers={ |
| "Content-Type": "application/json", |
| "ngrok-skip-browser-warning": "true" |
| }, |
| timeout=30 |
| ) |
| |
| if response.status_code == 200: |
| result = response.json() |
| if "result" in result and "content" in result["result"]: |
| content = result["result"]["content"][0]["text"] |
| return json.loads(content) |
| return result |
| else: |
| return { |
| "success": False, |
| "error": f"HTTP {response.status_code}: {response.text}" |
| } |
| except Exception as e: |
| return { |
| "success": False, |
| "error": f"Connection error: {str(e)}" |
| } |
| |
| def list_tools_sync(self) -> List[Dict[str, Any]]: |
| """Synchronous tool listing using requests""" |
| mcp_request = { |
| "jsonrpc": "2.0", |
| "id": 1, |
| "method": "tools/list" |
| } |
| |
| try: |
| response = requests.post( |
| f"{self.server_url}/mcp", |
| json=mcp_request, |
| headers={ |
| "Content-Type": "application/json", |
| "ngrok-skip-browser-warning": "true" |
| }, |
| timeout=30 |
| ) |
| |
| if response.status_code == 200: |
| result = response.json() |
| return result.get("result", {}).get("tools", []) |
| return [] |
| except Exception as e: |
| print(f"Error listing tools: {str(e)}") |
| return [] |
|
|
| class AIAssistant: |
| """AI Assistant with MCP integration""" |
| |
| def __init__(self, openai_api_key: str, mcp_client: MCPClient): |
| try: |
| self.openai_client = openai.OpenAI( |
| api_key=openai_api_key, |
| timeout=30.0 |
| ) |
| except Exception as e: |
| |
| openai.api_key = openai_api_key |
| self.openai_client = openai |
| self.mcp_client = mcp_client |
| self.available_tools = [] |
| |
| def initialize(self): |
| """Initialize the assistant by fetching available tools""" |
| self.available_tools = self.mcp_client.list_tools_sync() |
| |
| def get_system_prompt(self) -> str: |
| """Generate system prompt with available tools""" |
| tools_description = "\n".join([ |
| f"- {tool['name']}: {tool['description']}" |
| for tool in self.available_tools |
| ]) |
| |
| return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools. |
| |
| Available tools: |
| {tools_description} |
| |
| When a user asks for information that can be retrieved using these tools, you should: |
| 1. Identify which tool(s) would be helpful |
| 2. Call the appropriate tool(s) with the right parameters |
| 3. Wait for the results before providing your final response |
| |
| For SAP-related queries (purchase orders, requisitions), use the SAP tools. |
| For news-related queries, use the news tools. |
| |
| To call a tool, use this exact format: |
| CALL_TOOL: tool_name |
| or |
| CALL_TOOL: tool_name(parameter1=value1, parameter2=value2) |
| |
| Examples: |
| - For "show me purchase orders": CALL_TOOL: get_purchase_orders |
| - For "get 20 purchase orders": CALL_TOOL: get_purchase_orders(top=20) |
| - For "latest tech news": CALL_TOOL: get_news_headlines(category=technology) |
| - For "get news from BBC": CALL_TOOL: get_news_by_source(source_id=bbc-news) |
| - For "get news from CNN": CALL_TOOL: get_news_by_source(source_id=cnn) |
| - For "get news from Reuters": CALL_TOOL: get_news_by_source(source_id=reuters) |
| |
| IMPORTANT: For news by source queries, always include the source_id parameter: |
| - BBC: source_id=bbc-news |
| - CNN: source_id=cnn |
| - Reuters: source_id=reuters |
| - Associated Press: source_id=associated-press |
| - The Guardian: source_id=the-guardian |
| - Washington Post: source_id=the-washington-post |
| |
| After calling a tool, I will provide you with the results to interpret for the user. |
| """ |
| |
| def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]: |
| """Extract tool calls from AI response""" |
| tool_calls = [] |
| lines = response.split('\n') |
| |
| for line in lines: |
| line = line.strip() |
| if line.startswith('CALL_TOOL:'): |
| try: |
| |
| tool_part = line[10:].strip() |
| |
| |
| if '(' in tool_part and ')' in tool_part: |
| tool_name = tool_part.split('(')[0].strip() |
| params_str = tool_part.split('(')[1].split(')')[0] |
| |
| params = {} |
| if params_str.strip(): |
| for param in params_str.split(','): |
| if '=' in param: |
| key, value = param.split('=', 1) |
| key = key.strip() |
| value = value.strip().strip('"\'') |
| try: |
| if value.isdigit(): |
| value = int(value) |
| elif value.lower() in ['true', 'false']: |
| value = value.lower() == 'true' |
| except: |
| pass |
| params[key] = value |
| |
| tool_calls.append({ |
| 'name': tool_name, |
| 'arguments': params |
| }) |
| else: |
| |
| tool_name = tool_part.strip() |
| tool_calls.append({ |
| 'name': tool_name, |
| 'arguments': {} |
| }) |
| |
| except Exception as e: |
| print(f"Error parsing tool call '{line}': {e}") |
| continue |
| |
| return tool_calls |
| |
| def truncate_tool_result(self, result: Dict[str, Any], max_chars: int = 2000) -> Dict[str, Any]: |
| """Truncate tool results to prevent context overflow""" |
| if not isinstance(result, dict): |
| return result |
| |
| result_copy = result.copy() |
| result_str = json.dumps(result_copy, indent=2) |
| |
| if len(result_str) > max_chars: |
| |
| for key, value in result_copy.items(): |
| if isinstance(value, list) and len(value) > 3: |
| result_copy[key] = value[:3] + [f"... ({len(value) - 3} more items truncated)"] |
| elif isinstance(value, str) and len(value) > 500: |
| result_copy[key] = value[:500] + "... (truncated)" |
| |
| |
| result_str = json.dumps(result_copy, indent=2) |
| if len(result_str) > max_chars: |
| result_copy = { |
| "success": result.get("success", False), |
| "truncated": True, |
| "message": f"Result truncated due to size. Original had {len(result_str)} characters.", |
| "sample_data": str(result)[:1000] + "..." if len(str(result)) > 1000 else str(result) |
| } |
| |
| return result_copy |
|
|
| def process_message(self, user_message: str) -> Tuple[str, str]: |
| """Process user message and handle tool calls""" |
| tool_info = "" |
| |
| try: |
| messages = [ |
| {"role": "system", "content": self.get_system_prompt()}, |
| {"role": "user", "content": user_message} |
| ] |
| |
| |
| if hasattr(self.openai_client, 'chat'): |
| response = self.openai_client.chat.completions.create( |
| model="gpt-3.5-turbo", |
| messages=messages, |
| temperature=0.7, |
| max_tokens=800 |
| ) |
| ai_response = response.choices[0].message.content |
| else: |
| |
| response = self.openai_client.ChatCompletion.create( |
| model="gpt-3.5-turbo", |
| messages=messages, |
| temperature=0.7, |
| max_tokens=800 |
| ) |
| ai_response = response.choices[0].message.content |
| tool_calls = self.extract_tool_calls(ai_response) |
| |
| |
| print(f"AI Response: {ai_response}") |
| print(f"Extracted tool calls: {tool_calls}") |
| |
| if tool_calls: |
| tool_results = [] |
| |
| for tool_call in tool_calls: |
| tool_info += f"π§ Calling: {tool_call['name']}\n" |
| |
| |
| result = self.mcp_client.call_tool_sync( |
| tool_call['name'], |
| tool_call['arguments'] |
| ) |
| |
| |
| truncated_result = self.truncate_tool_result(result) |
| |
| tool_results.append({ |
| 'tool': tool_call['name'], |
| 'result': truncated_result |
| }) |
| |
| if result.get('success'): |
| tool_info += f"β
{tool_call['name']} completed\n" |
| else: |
| tool_info += f"β {tool_call['name']} failed: {result.get('error', 'Unknown error')}\n" |
| |
| |
| tool_results_text = "\n\n".join([ |
| f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)[:1500]}{'...(truncated)' if len(json.dumps(tr['result'], indent=2)) > 1500 else ''}" |
| for tr in tool_results |
| ]) |
| |
| final_messages = messages + [ |
| {"role": "assistant", "content": ai_response}, |
| {"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."} |
| ] |
| |
| |
| if hasattr(self.openai_client, 'chat'): |
| final_response = self.openai_client.chat.completions.create( |
| model="gpt-3.5-turbo", |
| messages=final_messages, |
| temperature=0.7, |
| max_tokens=800 |
| ) |
| return final_response.choices[0].message.content, tool_info |
| else: |
| final_response = self.openai_client.ChatCompletion.create( |
| model="gpt-3.5-turbo", |
| messages=final_messages, |
| temperature=0.7, |
| max_tokens=800 |
| ) |
| return final_response.choices[0].message.content, tool_info |
| else: |
| return ai_response, "" |
| |
| except Exception as e: |
| return f"β Error processing your request: {str(e)}", "" |
|
|
| |
| assistant = None |
| mcp_client = None |
|
|
| def test_connection(mcp_url): |
| """Test MCP server connection""" |
| if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io": |
| return "β Please enter a valid MCP server URL" |
| |
| try: |
| |
| response = requests.get(f"{mcp_url.rstrip('/')}/health", timeout=10) |
| if response.status_code == 200: |
| data = response.json() |
| |
| |
| mcp_request = { |
| "jsonrpc": "2.0", |
| "id": 1, |
| "method": "tools/list" |
| } |
| |
| mcp_response = requests.post( |
| f"{mcp_url.rstrip('/')}/mcp", |
| json=mcp_request, |
| headers={ |
| "Content-Type": "application/json", |
| "ngrok-skip-browser-warning": "true" |
| }, |
| timeout=10 |
| ) |
| |
| if mcp_response.status_code == 200: |
| mcp_data = mcp_response.json() |
| tools = mcp_data.get("result", {}).get("tools", []) |
| tool_names = [tool.get("name", "Unknown") for tool in tools] |
| |
| return f"β
Connected successfully!\nHealth Status: {data.get('status', 'Unknown')}\nMCP Tools: {len(tools)}\nAvailable: {', '.join(tool_names)}" |
| else: |
| return f"β
Health OK, but MCP endpoint failed: HTTP {mcp_response.status_code}" |
| else: |
| return f"β Connection failed: HTTP {response.status_code}" |
| except Exception as e: |
| return f"β Connection error: {str(e)}" |
|
|
| def initialize_assistant(openai_key, mcp_url): |
| """Initialize the AI assistant""" |
| global assistant, mcp_client |
| |
| if not openai_key: |
| return "β Please enter your OpenAI API key" |
| |
| if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io": |
| return "β Please enter a valid MCP server URL" |
| |
| try: |
| mcp_client = MCPClient(mcp_url) |
| assistant = AIAssistant(openai_key, mcp_client) |
| assistant.initialize() |
| return f"β
AI Assistant initialized with {len(assistant.available_tools)} tools available" |
| except Exception as e: |
| return f"β Failed to initialize: {str(e)}" |
|
|
| def chat_interface(message, history, openai_key, mcp_url): |
| """Main chat interface""" |
| global assistant |
| |
| if not assistant: |
| init_result = initialize_assistant(openai_key, mcp_url) |
| if "β" in init_result: |
| history.append([message, init_result]) |
| return history, "" |
| |
| try: |
| print(f"Calling process_message with: {message}") |
| |
| |
| |
| if len(history) > 10: |
| history = history[-10:] |
| |
| |
| result = assistant.process_message(message) |
| print(f"process_message returned: {type(result)} - {result}") |
| |
| |
| if isinstance(result, tuple) and len(result) == 2: |
| response, tool_info = result |
| print(f"Unpacked: response={response}, tool_info={tool_info}") |
| else: |
| response = str(result) |
| tool_info = "" |
| print(f"Single result: {response}") |
| |
| |
| if tool_info: |
| full_response = f"**Tool Execution:**\n{tool_info}\n\n**Response:**\n{response}" |
| else: |
| full_response = response |
| |
| history.append([message, full_response]) |
| return history, "" |
| except Exception as e: |
| import traceback |
| error_response = f"β Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}" |
| print(f"Error in chat_interface: {error_response}") |
| history.append([message, error_response]) |
| return history, "" |
|
|
| |
| with gr.Blocks(title="AI Assistant with SAP & News Integration", theme=gr.themes.Soft()) as demo: |
| gr.Markdown("# π€ AI Assistant with SAP & News Integration") |
| gr.Markdown("Chat with an AI that can access SAP business data and news through natural language queries.") |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| chatbot = gr.Chatbot( |
| height=500, |
| show_label=False, |
| container=True, |
| bubble_full_width=False |
| ) |
| |
| msg = gr.Textbox( |
| placeholder="Ask me about SAP data, news, or anything else...", |
| show_label=False, |
| container=False |
| ) |
| |
| with gr.Row(): |
| submit_btn = gr.Button("Send", variant="primary") |
| clear_btn = gr.Button("Clear", variant="secondary") |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### βοΈ Configuration") |
| |
| openai_key = gr.Textbox( |
| label="OpenAI API Key", |
| type="password", |
| placeholder="sk-..." |
| ) |
| |
| mcp_url = gr.Textbox( |
| label="MCP Server URL", |
| value="https://your-ngrok-url.ngrok.io", |
| placeholder="https://abc123.ngrok.io" |
| ) |
| |
| test_btn = gr.Button("Test Connection", variant="secondary") |
| connection_status = gr.Textbox(label="Connection Status", interactive=False) |
| |
| gr.Markdown("### π Example Queries") |
| gr.Markdown(""" |
| - "Show me recent purchase orders" |
| - "Get purchase requisitions" |
| - "What's the latest tech news?" |
| - "Get news from BBC" |
| - "Show me business news from the US" |
| """) |
| |
| |
| def respond(message, history, openai_key, mcp_url): |
| return chat_interface(message, history, openai_key, mcp_url) |
| |
| submit_btn.click( |
| respond, |
| [msg, chatbot, openai_key, mcp_url], |
| [chatbot, msg] |
| ) |
| |
| msg.submit( |
| respond, |
| [msg, chatbot, openai_key, mcp_url], |
| [chatbot, msg] |
| ) |
| |
| clear_btn.click(lambda: ([], ""), outputs=[chatbot, msg]) |
| |
| test_btn.click( |
| test_connection, |
| [mcp_url], |
| [connection_status] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |