""" SharePUTER™ v2.1 Worker Node — rank injection for unified execution """ from fastapi import FastAPI from fastapi.responses import HTMLResponse from fastapi.middleware.cors import CORSMiddleware import httpx, asyncio, os, sys, io, json, time, traceback, multiprocessing from contextlib import redirect_stdout, redirect_stderr from typing import Optional app = FastAPI(title="SharePUTER™ v2.1 Worker") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) HEAD_URL = os.environ.get("SACCP_HEAD_URL", "https://bc-ai-saccp-head.hf.space") NODE_TYPE = "CPU" NODE_OWNER = os.environ.get("SACCP_NODE_OWNER", "Keeby1237") AUTO_REG = os.environ.get("SACCP_AUTO_REGISTER", "true").lower() == "true" NODE_ID: Optional[str] = None REGISTERED = False import psutil def detect_specs(): s = {"cpu_count": multiprocessing.cpu_count(), "ram_total_gb": round(psutil.virtual_memory().total/(1024**3), 2), "ram_available_gb": round(psutil.virtual_memory().available/(1024**3), 2), "platform": sys.platform, "python": sys.version.split()[0], "gpu_available": False, "gpu_name": "", "gpu_memory_gb": 0} try: import torch if torch.cuda.is_available(): s.update(gpu_available=True, gpu_name=torch.cuda.get_device_name(0), gpu_memory_gb=round(torch.cuda.get_device_properties(0).total_mem/(1024**3), 2)) except: pass return s def detect_libs(): out = [] for lib in ["numpy","pandas","scipy","scikit-learn","torch","transformers","tensorflow", "keras","pillow","requests","httpx","fastapi","pydantic","matplotlib","seaborn"]: try: __import__(lib.replace("-","_")); out.append(lib) except: pass return out SPECS = detect_specs() LIBS = detect_libs() def safe_execute(code, timeout=600): result = {"status": "completed", "result": None, "stdout": "", "error": None, "execution_time": 0, "resource_usage": {}} out, err = io.StringIO(), io.StringIO() ns = {} start = time.time() mem0 = psutil.Process().memory_info().rss try: with redirect_stdout(out), redirect_stderr(err): exec(compile(code, "", "exec"), ns) result["execution_time"] = round(time.time() - start, 3) result["stdout"] = out.getvalue() result["resource_usage"] = {"time_s": result["execution_time"], "mem_mb": round((psutil.Process().memory_info().rss - mem0)/(1024**2), 2)} for var in ["result", "__saccp_result__", "output", "__saccp_results__"]: if var in ns and ns[var] is not None: v = ns[var] try: json.dumps(v); result["result"] = v except: result["result"] = str(v)[:5000] break if result["result"] is None and result["stdout"]: result["result"] = result["stdout"].strip()[-5000:] except Exception as e: result.update(status="failed", error=f"{type(e).__name__}: {e}\n{traceback.format_exc()[-1500:]}", execution_time=round(time.time()-start, 3), stdout=out.getvalue()) return result @app.get("/", response_class=HTMLResponse) async def home(): ram = round(psutil.virtual_memory().available/(1024**3), 2) return HTMLResponse(f"""Worker

🤖 Worker v2.1

ID: {NODE_ID or '—'} | {NODE_TYPE} | {'ACTIVE' if REGISTERED else 'WAITING'}

CPUs: {SPECS['cpu_count']}
RAM: {ram}/{SPECS['ram_total_gb']}GB
GPU: {SPECS['gpu_name'] or 'None'}
Libs: {', '.join(LIBS)}
""") @app.get("/health") async def health(): return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED, "v": "2.1"} async def worker_loop(): global NODE_ID, REGISTERED while not REGISTERED: await asyncio.sleep(2) if AUTO_REG: try: async with httpx.AsyncClient(timeout=15) as c: r = await c.post(f"{HEAD_URL}/api/register_node", json={ "node_type": NODE_TYPE, "node_url": os.environ.get("SPACE_HOST", ""), "owner": NODE_OWNER, "specs": SPECS, "installed_libs": LIBS}) if r.status_code == 200: NODE_ID = r.json().get("node_id"); REGISTERED = True; print(f"[W] Registered {NODE_ID}") except Exception as e: print(f"[W] Reg fail: {e}") print(f"[W] Active: {NODE_ID} ({NODE_TYPE})") last_hb = 0 while True: try: if time.time() - last_hb > 25: try: async with httpx.AsyncClient(timeout=10) as c: await c.post(f"{HEAD_URL}/api/node_heartbeat", json={"node_id": NODE_ID, "status": "online", "installed_libs": LIBS}) last_hb = time.time() except: pass work = None async with httpx.AsyncClient(timeout=15) as c: r = await c.get(f"{HEAD_URL}/api/get_work", params={ "node_id": NODE_ID, "node_type": NODE_TYPE, "has_gpu": str(SPECS['gpu_available']).lower(), "ram_gb": SPECS['ram_available_gb'], "libs": ",".join(LIBS)}) if r.status_code == 200: work = r.json().get("work") if not work: await asyncio.sleep(1); continue fid = work["fragment_id"] code = work.get("code", "") print(f"[W] ▶ {fid[:24]}...") result = safe_execute(code, work.get("timeout_seconds", 600)) print(f"[W] ✓ {fid[:20]}... → {result['status']} ({result['execution_time']}s)") payload = {"fragment_id": fid, "node_id": NODE_ID, "status": result["status"], "result": result.get("result"), "error": result.get("error") or "", "stdout": result.get("stdout") or "", "resource_usage": result.get("resource_usage", {})} for attempt in range(3): try: async with httpx.AsyncClient(timeout=30) as sc: sr = await sc.post(f"{HEAD_URL}/api/submit_result", json=payload) if sr.status_code == 200: print(f"[W] 📤 Submitted {fid[:20]}..."); break else: print(f"[W] ⚠ Submit {sr.status_code}: {sr.text[:100]}") except Exception as e: print(f"[W] ⚠ Submit attempt {attempt+1}: {e}"); await asyncio.sleep(1) except Exception as e: print(f"[W] Err: {e}"); await asyncio.sleep(3) @app.on_event("startup") async def startup(): asyncio.create_task(worker_loop()) if __name__ == "__main__": import uvicorn; uvicorn.run(app, host="0.0.0.0", port=7861)