ORCA-ROBOT / server.py
SalexAI's picture
Update server.py
ea832c3 verified
import asyncio
from uuid import uuid4
from datetime import datetime, timezone
from typing import Dict, Any, Optional, List
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from mcp.server.fastmcp import FastMCP, Context
commands: Dict[str, Dict[str, Any]] = {}
queue: List[str] = []
queue_lock = asyncio.Lock()
def now_iso():
return datetime.now(timezone.utc).isoformat()
async def enqueue(command: str, args: Optional[Dict[str, Any]] = None):
cid = str(uuid4())
rec = {
"id": cid,
"command": command,
"args": args or {},
"status": "queued",
"result": None,
"error": None,
"created_at": now_iso(),
"updated_at": now_iso(),
"claimed_by": None,
}
async with queue_lock:
commands[cid] = rec
queue.append(cid)
return rec
mcp = FastMCP(
"Create3 Robot Bridge",
instructions="Queues robot commands for a local Create 3 runner.",
stateless_http=True,
)
@mcp.tool()
async def dock(ctx: Context) -> dict:
return {"ok": True, "queued": await enqueue("dock", {})}
@mcp.tool()
async def undock(ctx: Context) -> dict:
return {"ok": True, "queued": await enqueue("undock", {})}
@mcp.tool()
async def move_cm(cm: float, ctx: Context) -> dict:
return {"ok": True, "queued": await enqueue("move_cm", {"cm": cm})}
app = FastAPI()
class StatusUpdate(BaseModel):
status: str
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
robot_id: Optional[str] = None
@app.get("/")
async def root():
return {"ok": True, "mcp": "/mcp", "health": "/health"}
@app.get("/health")
async def health():
return {"ok": True, "time": now_iso()}
@app.get("/commands")
async def list_commands():
return {"ok": True, "items": list(commands.values())}
@app.post("/commands/next")
async def claim_next_command(robot_id: str):
async with queue_lock:
while queue:
cid = queue.pop(0)
cmd = commands.get(cid)
if not cmd or cmd["status"] != "queued":
continue
cmd["status"] = "running"
cmd["claimed_by"] = robot_id
cmd["updated_at"] = now_iso()
return {"ok": True, "item": cmd}
return {"ok": True, "item": None}
@app.post("/commands/{command_id}/status")
async def update_status(command_id: str, update: StatusUpdate):
cmd = commands.get(command_id)
if not cmd:
raise HTTPException(status_code=404, detail="Command not found")
cmd["status"] = update.status
cmd["result"] = update.result
cmd["error"] = update.error
if update.robot_id:
cmd["claimed_by"] = update.robot_id
cmd["updated_at"] = now_iso()
return {"ok": True}
# Disable slash redirects to avoid bad proxy redirects (http vs https)
app.router.redirect_slashes = False
# Mount both MCP transports
app.mount("/mcp", mcp.streamable_http_app()) # for streamable HTTP clients
app.mount("/sse", mcp.sse_app()) # for ChatGPT connector / SSE clients