Spaces:
Sleeping
Sleeping
| """ | |
| SharePUTER™ v2.1 Worker Node — rank injection for unified execution | |
| """ | |
| from fastapi import FastAPI | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import httpx, asyncio, os, sys, io, json, time, traceback, multiprocessing | |
| from contextlib import redirect_stdout, redirect_stderr | |
| from typing import Optional | |
| app = FastAPI(title="SharePUTER™ v2.1 Worker") | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| HEAD_URL = os.environ.get("SACCP_HEAD_URL", "https://bc-ai-saccp-head.hf.space") | |
| NODE_TYPE = "CPU" | |
| NODE_OWNER = os.environ.get("SACCP_NODE_OWNER", "Keeby1237") | |
| AUTO_REG = os.environ.get("SACCP_AUTO_REGISTER", "true").lower() == "true" | |
| NODE_ID: Optional[str] = None | |
| REGISTERED = False | |
| import psutil | |
| def detect_specs(): | |
| s = {"cpu_count": multiprocessing.cpu_count(), | |
| "ram_total_gb": round(psutil.virtual_memory().total/(1024**3), 2), | |
| "ram_available_gb": round(psutil.virtual_memory().available/(1024**3), 2), | |
| "platform": sys.platform, "python": sys.version.split()[0], | |
| "gpu_available": False, "gpu_name": "", "gpu_memory_gb": 0} | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| s.update(gpu_available=True, gpu_name=torch.cuda.get_device_name(0), | |
| gpu_memory_gb=round(torch.cuda.get_device_properties(0).total_mem/(1024**3), 2)) | |
| except: pass | |
| return s | |
| def detect_libs(): | |
| out = [] | |
| for lib in ["numpy","pandas","scipy","scikit-learn","torch","transformers","tensorflow", | |
| "keras","pillow","requests","httpx","fastapi","pydantic","matplotlib","seaborn"]: | |
| try: __import__(lib.replace("-","_")); out.append(lib) | |
| except: pass | |
| return out | |
| SPECS = detect_specs() | |
| LIBS = detect_libs() | |
| def safe_execute(code, timeout=600): | |
| result = {"status": "completed", "result": None, "stdout": "", "error": None, | |
| "execution_time": 0, "resource_usage": {}} | |
| out, err = io.StringIO(), io.StringIO() | |
| ns = {} | |
| start = time.time() | |
| mem0 = psutil.Process().memory_info().rss | |
| try: | |
| with redirect_stdout(out), redirect_stderr(err): | |
| exec(compile(code, "<saccp>", "exec"), ns) | |
| result["execution_time"] = round(time.time() - start, 3) | |
| result["stdout"] = out.getvalue() | |
| result["resource_usage"] = {"time_s": result["execution_time"], | |
| "mem_mb": round((psutil.Process().memory_info().rss - mem0)/(1024**2), 2)} | |
| for var in ["result", "__saccp_result__", "output", "__saccp_results__"]: | |
| if var in ns and ns[var] is not None: | |
| v = ns[var] | |
| try: json.dumps(v); result["result"] = v | |
| except: result["result"] = str(v)[:5000] | |
| break | |
| if result["result"] is None and result["stdout"]: | |
| result["result"] = result["stdout"].strip()[-5000:] | |
| except Exception as e: | |
| result.update(status="failed", error=f"{type(e).__name__}: {e}\n{traceback.format_exc()[-1500:]}", | |
| execution_time=round(time.time()-start, 3), stdout=out.getvalue()) | |
| return result | |
| async def home(): | |
| ram = round(psutil.virtual_memory().available/(1024**3), 2) | |
| return HTMLResponse(f"""<!DOCTYPE html><html><head><title>Worker</title> | |
| <style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:600px;margin:0 auto}} | |
| h1{{color:#00ff88}}.s{{background:#111;padding:12px;margin:8px 0;border-radius:8px}}</style></head> | |
| <body><h1>🤖 Worker v2.1</h1><p>ID: <b>{NODE_ID or '—'}</b> | {NODE_TYPE} | | |
| <b style="color:#00ff88">{'ACTIVE' if REGISTERED else 'WAITING'}</b></p> | |
| <div class="s">CPUs: {SPECS['cpu_count']}</div><div class="s">RAM: {ram}/{SPECS['ram_total_gb']}GB</div> | |
| <div class="s">GPU: {SPECS['gpu_name'] or 'None'}</div><div class="s">Libs: {', '.join(LIBS)}</div> | |
| </body></html>""") | |
| async def health(): | |
| return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED, "v": "2.1"} | |
| async def worker_loop(): | |
| global NODE_ID, REGISTERED | |
| while not REGISTERED: | |
| await asyncio.sleep(2) | |
| if AUTO_REG: | |
| try: | |
| async with httpx.AsyncClient(timeout=15) as c: | |
| r = await c.post(f"{HEAD_URL}/api/register_node", json={ | |
| "node_type": NODE_TYPE, "node_url": os.environ.get("SPACE_HOST", ""), | |
| "owner": NODE_OWNER, "specs": SPECS, "installed_libs": LIBS}) | |
| if r.status_code == 200: NODE_ID = r.json().get("node_id"); REGISTERED = True; print(f"[W] Registered {NODE_ID}") | |
| except Exception as e: print(f"[W] Reg fail: {e}") | |
| print(f"[W] Active: {NODE_ID} ({NODE_TYPE})") | |
| last_hb = 0 | |
| while True: | |
| try: | |
| if time.time() - last_hb > 25: | |
| try: | |
| async with httpx.AsyncClient(timeout=10) as c: | |
| await c.post(f"{HEAD_URL}/api/node_heartbeat", json={"node_id": NODE_ID, "status": "online", "installed_libs": LIBS}) | |
| last_hb = time.time() | |
| except: pass | |
| work = None | |
| async with httpx.AsyncClient(timeout=15) as c: | |
| r = await c.get(f"{HEAD_URL}/api/get_work", params={ | |
| "node_id": NODE_ID, "node_type": NODE_TYPE, | |
| "has_gpu": str(SPECS['gpu_available']).lower(), | |
| "ram_gb": SPECS['ram_available_gb'], "libs": ",".join(LIBS)}) | |
| if r.status_code == 200: work = r.json().get("work") | |
| if not work: await asyncio.sleep(1); continue | |
| fid = work["fragment_id"] | |
| code = work.get("code", "") | |
| print(f"[W] ▶ {fid[:24]}...") | |
| result = safe_execute(code, work.get("timeout_seconds", 600)) | |
| print(f"[W] ✓ {fid[:20]}... → {result['status']} ({result['execution_time']}s)") | |
| payload = {"fragment_id": fid, "node_id": NODE_ID, "status": result["status"], | |
| "result": result.get("result"), "error": result.get("error") or "", | |
| "stdout": result.get("stdout") or "", "resource_usage": result.get("resource_usage", {})} | |
| for attempt in range(3): | |
| try: | |
| async with httpx.AsyncClient(timeout=30) as sc: | |
| sr = await sc.post(f"{HEAD_URL}/api/submit_result", json=payload) | |
| if sr.status_code == 200: print(f"[W] 📤 Submitted {fid[:20]}..."); break | |
| else: print(f"[W] ⚠ Submit {sr.status_code}: {sr.text[:100]}") | |
| except Exception as e: print(f"[W] ⚠ Submit attempt {attempt+1}: {e}"); await asyncio.sleep(1) | |
| except Exception as e: print(f"[W] Err: {e}"); await asyncio.sleep(3) | |
| async def startup(): asyncio.create_task(worker_loop()) | |
| if __name__ == "__main__": | |
| import uvicorn; uvicorn.run(app, host="0.0.0.0", port=7861) |