import asyncio from uuid import uuid4 from datetime import datetime, timezone from typing import Dict, Any, Optional, List from fastapi import FastAPI, HTTPException from pydantic import BaseModel from mcp.server.fastmcp import FastMCP, Context commands: Dict[str, Dict[str, Any]] = {} queue: List[str] = [] queue_lock = asyncio.Lock() def now_iso(): return datetime.now(timezone.utc).isoformat() async def enqueue(command: str, args: Optional[Dict[str, Any]] = None): cid = str(uuid4()) rec = { "id": cid, "command": command, "args": args or {}, "status": "queued", "result": None, "error": None, "created_at": now_iso(), "updated_at": now_iso(), "claimed_by": None, } async with queue_lock: commands[cid] = rec queue.append(cid) return rec mcp = FastMCP( "Create3 Robot Bridge", instructions="Queues robot commands for a local Create 3 runner.", stateless_http=True, ) @mcp.tool() async def dock(ctx: Context) -> dict: return {"ok": True, "queued": await enqueue("dock", {})} @mcp.tool() async def undock(ctx: Context) -> dict: return {"ok": True, "queued": await enqueue("undock", {})} @mcp.tool() async def move_cm(cm: float, ctx: Context) -> dict: return {"ok": True, "queued": await enqueue("move_cm", {"cm": cm})} app = FastAPI() class StatusUpdate(BaseModel): status: str result: Optional[Dict[str, Any]] = None error: Optional[str] = None robot_id: Optional[str] = None @app.get("/") async def root(): return {"ok": True, "mcp": "/mcp", "health": "/health"} @app.get("/health") async def health(): return {"ok": True, "time": now_iso()} @app.get("/commands") async def list_commands(): return {"ok": True, "items": list(commands.values())} @app.post("/commands/next") async def claim_next_command(robot_id: str): async with queue_lock: while queue: cid = queue.pop(0) cmd = commands.get(cid) if not cmd or cmd["status"] != "queued": continue cmd["status"] = "running" cmd["claimed_by"] = robot_id cmd["updated_at"] = now_iso() return {"ok": True, "item": cmd} return {"ok": True, "item": None} @app.post("/commands/{command_id}/status") async def update_status(command_id: str, update: StatusUpdate): cmd = commands.get(command_id) if not cmd: raise HTTPException(status_code=404, detail="Command not found") cmd["status"] = update.status cmd["result"] = update.result cmd["error"] = update.error if update.robot_id: cmd["claimed_by"] = update.robot_id cmd["updated_at"] = now_iso() return {"ok": True} # Disable slash redirects to avoid bad proxy redirects (http vs https) app.router.redirect_slashes = False # Mount both MCP transports app.mount("/mcp", mcp.streamable_http_app()) # for streamable HTTP clients app.mount("/sse", mcp.sse_app()) # for ChatGPT connector / SSE clients