| """ |
| SharePUTER™ v2.1 Worker Node — rank injection for unified execution |
| """ |
|
|
| from fastapi import FastAPI |
| from fastapi.responses import HTMLResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| import httpx, asyncio, os, sys, io, json, time, traceback, multiprocessing |
| from contextlib import redirect_stdout, redirect_stderr |
| from typing import Optional |
|
|
| app = FastAPI(title="SharePUTER™ v2.1 Worker") |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) |
|
|
| HEAD_URL = os.environ.get("SACCP_HEAD_URL", "https://bc-ai-saccp-head.hf.space") |
| NODE_TYPE = os.environ.get("SACCP_NODE_TYPE", "CPU").upper() |
| NODE_OWNER = os.environ.get("SACCP_NODE_OWNER", "Keeby1237") |
| AUTO_REG = os.environ.get("SACCP_AUTO_REGISTER", "true").lower() == "true" |
| NODE_ID: Optional[str] = None |
| REGISTERED = False |
|
|
| import psutil |
|
|
| def detect_specs(): |
| s = {"cpu_count": multiprocessing.cpu_count(), |
| "ram_total_gb": round(psutil.virtual_memory().total/(1024**3), 2), |
| "ram_available_gb": round(psutil.virtual_memory().available/(1024**3), 2), |
| "platform": sys.platform, "python": sys.version.split()[0], |
| "gpu_available": False, "gpu_name": "", "gpu_memory_gb": 0} |
| try: |
| import torch |
| if torch.cuda.is_available(): |
| s.update(gpu_available=True, gpu_name=torch.cuda.get_device_name(0), |
| gpu_memory_gb=round(torch.cuda.get_device_properties(0).total_mem/(1024**3), 2)) |
| except: pass |
| return s |
|
|
| def detect_libs(): |
| out = [] |
| for lib in ["numpy","pandas","scipy","scikit-learn","torch","transformers","tensorflow", |
| "keras","pillow","requests","httpx","fastapi","pydantic","matplotlib","seaborn"]: |
| try: __import__(lib.replace("-","_")); out.append(lib) |
| except: pass |
| return out |
|
|
| SPECS = detect_specs() |
| LIBS = detect_libs() |
|
|
| def safe_execute(code, timeout=600): |
| result = {"status": "completed", "result": None, "stdout": "", "error": None, |
| "execution_time": 0, "resource_usage": {}} |
| out, err = io.StringIO(), io.StringIO() |
| ns = {} |
| start = time.time() |
| mem0 = psutil.Process().memory_info().rss |
| try: |
| with redirect_stdout(out), redirect_stderr(err): |
| exec(compile(code, "<saccp>", "exec"), ns) |
| result["execution_time"] = round(time.time() - start, 3) |
| result["stdout"] = out.getvalue() |
| result["resource_usage"] = {"time_s": result["execution_time"], |
| "mem_mb": round((psutil.Process().memory_info().rss - mem0)/(1024**2), 2)} |
| for var in ["result", "__saccp_result__", "output", "__saccp_results__"]: |
| if var in ns and ns[var] is not None: |
| v = ns[var] |
| try: json.dumps(v); result["result"] = v |
| except: result["result"] = str(v)[:5000] |
| break |
| if result["result"] is None and result["stdout"]: |
| result["result"] = result["stdout"].strip()[-5000:] |
| except Exception as e: |
| result.update(status="failed", error=f"{type(e).__name__}: {e}\n{traceback.format_exc()[-1500:]}", |
| execution_time=round(time.time()-start, 3), stdout=out.getvalue()) |
| return result |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def home(): |
| ram = round(psutil.virtual_memory().available/(1024**3), 2) |
| return HTMLResponse(f"""<!DOCTYPE html><html><head><title>Worker</title> |
| <style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:600px;margin:0 auto}} |
| h1{{color:#00ff88}}.s{{background:#111;padding:12px;margin:8px 0;border-radius:8px}}</style></head> |
| <body><h1>🤖 Worker v2.1</h1><p>ID: <b>{NODE_ID or '—'}</b> | {NODE_TYPE} | |
| <b style="color:#00ff88">{'ACTIVE' if REGISTERED else 'WAITING'}</b></p> |
| <div class="s">CPUs: {SPECS['cpu_count']}</div><div class="s">RAM: {ram}/{SPECS['ram_total_gb']}GB</div> |
| <div class="s">GPU: {SPECS['gpu_name'] or 'None'}</div><div class="s">Libs: {', '.join(LIBS)}</div> |
| </body></html>""") |
|
|
| @app.get("/health") |
| async def health(): |
| return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED, "v": "2.1"} |
|
|
| async def worker_loop(): |
| global NODE_ID, REGISTERED |
| while not REGISTERED: |
| await asyncio.sleep(2) |
| if AUTO_REG: |
| try: |
| async with httpx.AsyncClient(timeout=15) as c: |
| r = await c.post(f"{HEAD_URL}/api/register_node", json={ |
| "node_type": NODE_TYPE, "node_url": os.environ.get("SPACE_HOST", ""), |
| "owner": NODE_OWNER, "specs": SPECS, "installed_libs": LIBS}) |
| if r.status_code == 200: NODE_ID = r.json().get("node_id"); REGISTERED = True; print(f"[W] Registered {NODE_ID}") |
| except Exception as e: print(f"[W] Reg fail: {e}") |
|
|
| print(f"[W] Active: {NODE_ID} ({NODE_TYPE})") |
| last_hb = 0 |
| while True: |
| try: |
| if time.time() - last_hb > 25: |
| try: |
| async with httpx.AsyncClient(timeout=10) as c: |
| await c.post(f"{HEAD_URL}/api/node_heartbeat", json={"node_id": NODE_ID, "status": "online", "installed_libs": LIBS}) |
| last_hb = time.time() |
| except: pass |
|
|
| work = None |
| async with httpx.AsyncClient(timeout=15) as c: |
| r = await c.get(f"{HEAD_URL}/api/get_work", params={ |
| "node_id": NODE_ID, "node_type": NODE_TYPE, |
| "has_gpu": str(SPECS['gpu_available']).lower(), |
| "ram_gb": SPECS['ram_available_gb'], "libs": ",".join(LIBS)}) |
| if r.status_code == 200: work = r.json().get("work") |
|
|
| if not work: await asyncio.sleep(1); continue |
|
|
| fid = work["fragment_id"] |
| code = work.get("code", "") |
| print(f"[W] ▶ {fid[:24]}...") |
|
|
| result = safe_execute(code, work.get("timeout_seconds", 600)) |
| print(f"[W] ✓ {fid[:20]}... → {result['status']} ({result['execution_time']}s)") |
|
|
| payload = {"fragment_id": fid, "node_id": NODE_ID, "status": result["status"], |
| "result": result.get("result"), "error": result.get("error") or "", |
| "stdout": result.get("stdout") or "", "resource_usage": result.get("resource_usage", {})} |
|
|
| for attempt in range(3): |
| try: |
| async with httpx.AsyncClient(timeout=30) as sc: |
| sr = await sc.post(f"{HEAD_URL}/api/submit_result", json=payload) |
| if sr.status_code == 200: print(f"[W] 📤 Submitted {fid[:20]}..."); break |
| else: print(f"[W] ⚠ Submit {sr.status_code}: {sr.text[:100]}") |
| except Exception as e: print(f"[W] ⚠ Submit attempt {attempt+1}: {e}"); await asyncio.sleep(1) |
|
|
| except Exception as e: print(f"[W] Err: {e}"); await asyncio.sleep(3) |
|
|
| @app.on_event("startup") |
| async def startup(): asyncio.create_task(worker_loop()) |
|
|
| if __name__ == "__main__": |
| import uvicorn; uvicorn.run(app, host="0.0.0.0", port=7861) |