node-test / app.py
Bc-AI's picture
Update app.py
f7228a7 verified
"""
SharePUTER™ v2.1 Worker Node — rank injection for unified execution
"""
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
import httpx, asyncio, os, sys, io, json, time, traceback, multiprocessing
from contextlib import redirect_stdout, redirect_stderr
from typing import Optional
app = FastAPI(title="SharePUTER™ v2.1 Worker")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
HEAD_URL = os.environ.get("SACCP_HEAD_URL", "https://bc-ai-saccp-head.hf.space")
NODE_TYPE = os.environ.get("SACCP_NODE_TYPE", "CPU").upper()
NODE_OWNER = os.environ.get("SACCP_NODE_OWNER", "Keeby1237")
AUTO_REG = os.environ.get("SACCP_AUTO_REGISTER", "true").lower() == "true"
NODE_ID: Optional[str] = None
REGISTERED = False
import psutil
def detect_specs():
s = {"cpu_count": multiprocessing.cpu_count(),
"ram_total_gb": round(psutil.virtual_memory().total/(1024**3), 2),
"ram_available_gb": round(psutil.virtual_memory().available/(1024**3), 2),
"platform": sys.platform, "python": sys.version.split()[0],
"gpu_available": False, "gpu_name": "", "gpu_memory_gb": 0}
try:
import torch
if torch.cuda.is_available():
s.update(gpu_available=True, gpu_name=torch.cuda.get_device_name(0),
gpu_memory_gb=round(torch.cuda.get_device_properties(0).total_mem/(1024**3), 2))
except: pass
return s
def detect_libs():
out = []
for lib in ["numpy","pandas","scipy","scikit-learn","torch","transformers","tensorflow",
"keras","pillow","requests","httpx","fastapi","pydantic","matplotlib","seaborn"]:
try: __import__(lib.replace("-","_")); out.append(lib)
except: pass
return out
SPECS = detect_specs()
LIBS = detect_libs()
def safe_execute(code, timeout=600):
result = {"status": "completed", "result": None, "stdout": "", "error": None,
"execution_time": 0, "resource_usage": {}}
out, err = io.StringIO(), io.StringIO()
ns = {}
start = time.time()
mem0 = psutil.Process().memory_info().rss
try:
with redirect_stdout(out), redirect_stderr(err):
exec(compile(code, "<saccp>", "exec"), ns)
result["execution_time"] = round(time.time() - start, 3)
result["stdout"] = out.getvalue()
result["resource_usage"] = {"time_s": result["execution_time"],
"mem_mb": round((psutil.Process().memory_info().rss - mem0)/(1024**2), 2)}
for var in ["result", "__saccp_result__", "output", "__saccp_results__"]:
if var in ns and ns[var] is not None:
v = ns[var]
try: json.dumps(v); result["result"] = v
except: result["result"] = str(v)[:5000]
break
if result["result"] is None and result["stdout"]:
result["result"] = result["stdout"].strip()[-5000:]
except Exception as e:
result.update(status="failed", error=f"{type(e).__name__}: {e}\n{traceback.format_exc()[-1500:]}",
execution_time=round(time.time()-start, 3), stdout=out.getvalue())
return result
@app.get("/", response_class=HTMLResponse)
async def home():
ram = round(psutil.virtual_memory().available/(1024**3), 2)
return HTMLResponse(f"""<!DOCTYPE html><html><head><title>Worker</title>
<style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:600px;margin:0 auto}}
h1{{color:#00ff88}}.s{{background:#111;padding:12px;margin:8px 0;border-radius:8px}}</style></head>
<body><h1>🤖 Worker v2.1</h1><p>ID: <b>{NODE_ID or '—'}</b> | {NODE_TYPE} |
<b style="color:#00ff88">{'ACTIVE' if REGISTERED else 'WAITING'}</b></p>
<div class="s">CPUs: {SPECS['cpu_count']}</div><div class="s">RAM: {ram}/{SPECS['ram_total_gb']}GB</div>
<div class="s">GPU: {SPECS['gpu_name'] or 'None'}</div><div class="s">Libs: {', '.join(LIBS)}</div>
</body></html>""")
@app.get("/health")
async def health():
return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED, "v": "2.1"}
async def worker_loop():
global NODE_ID, REGISTERED
while not REGISTERED:
await asyncio.sleep(2)
if AUTO_REG:
try:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.post(f"{HEAD_URL}/api/register_node", json={
"node_type": NODE_TYPE, "node_url": os.environ.get("SPACE_HOST", ""),
"owner": NODE_OWNER, "specs": SPECS, "installed_libs": LIBS})
if r.status_code == 200: NODE_ID = r.json().get("node_id"); REGISTERED = True; print(f"[W] Registered {NODE_ID}")
except Exception as e: print(f"[W] Reg fail: {e}")
print(f"[W] Active: {NODE_ID} ({NODE_TYPE})")
last_hb = 0
while True:
try:
if time.time() - last_hb > 25:
try:
async with httpx.AsyncClient(timeout=10) as c:
await c.post(f"{HEAD_URL}/api/node_heartbeat", json={"node_id": NODE_ID, "status": "online", "installed_libs": LIBS})
last_hb = time.time()
except: pass
work = None
async with httpx.AsyncClient(timeout=15) as c:
r = await c.get(f"{HEAD_URL}/api/get_work", params={
"node_id": NODE_ID, "node_type": NODE_TYPE,
"has_gpu": str(SPECS['gpu_available']).lower(),
"ram_gb": SPECS['ram_available_gb'], "libs": ",".join(LIBS)})
if r.status_code == 200: work = r.json().get("work")
if not work: await asyncio.sleep(1); continue
fid = work["fragment_id"]
code = work.get("code", "")
print(f"[W] ▶ {fid[:24]}...")
result = safe_execute(code, work.get("timeout_seconds", 600))
print(f"[W] ✓ {fid[:20]}... → {result['status']} ({result['execution_time']}s)")
payload = {"fragment_id": fid, "node_id": NODE_ID, "status": result["status"],
"result": result.get("result"), "error": result.get("error") or "",
"stdout": result.get("stdout") or "", "resource_usage": result.get("resource_usage", {})}
for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=30) as sc:
sr = await sc.post(f"{HEAD_URL}/api/submit_result", json=payload)
if sr.status_code == 200: print(f"[W] 📤 Submitted {fid[:20]}..."); break
else: print(f"[W] ⚠ Submit {sr.status_code}: {sr.text[:100]}")
except Exception as e: print(f"[W] ⚠ Submit attempt {attempt+1}: {e}"); await asyncio.sleep(1)
except Exception as e: print(f"[W] Err: {e}"); await asyncio.sleep(3)
@app.on_event("startup")
async def startup(): asyncio.create_task(worker_loop())
if __name__ == "__main__":
import uvicorn; uvicorn.run(app, host="0.0.0.0", port=7861)