YAML Metadata Warning: empty or missing yaml metadata in repo card

Check out the documentation for more information.

from future import annotations

import copy import math import pickle import threading from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np import cv2 import torch

============================================================

ComfyUI Node (pose_data + PKL)

============================================================

_GLOBAL_LOCK = threading.Lock()

class KPSSmoothPoseDataAndRender: """ Сглаживание + рендер позы. Вход: POSEDATA (как объект/dict; обычно приходит из TSLoadPoseDataPickle). Выход: IMAGE (torch [T,H,W,3] float 0..1), POSEDATA (в том же формате, но сглаженный). """

@classmethod
def INPUT_TYPES(cls):
    return {
        "required": {
            "pose_data": ("POSEDATA",),  # <-- ВАЖНО: именно POSEDATA
            "filter_extra_people": ("BOOLEAN", {"default": True}),
            # общий набор параметров сглаживания (вместо body + face_hands)
            "smooth_alpha": ("FLOAT", {"default": 0.7, "min": 0.01, "max": 0.99, "step": 0.01}),
            "gap_frames": ("INT", {"default": 12, "min": 0, "max": 100, "step": 1}),
            "min_run_frames": ("INT", {"default": 2, "min": 1, "max": 60, "step": 1}),
            # пороги отрисовки (в инпут добавляем body/hands, face НЕ добавляем)
            "conf_thresh_body": ("FLOAT", {"default": 0.20, "min": 0.0, "max": 1.0, "step": 0.01}),
            "conf_thresh_hands": ("FLOAT", {"default": 0.50, "min": 0.0, "max": 1.0, "step": 0.01}),
        }
    }

RETURN_TYPES = ("IMAGE", "POSEDATA")  # <-- ВАЖНО: именно POSEDATA
RETURN_NAMES = ("IMAGE", "pose_data")
FUNCTION = "run"
CATEGORY = "posedata"

def run(self, pose_data, **kwargs):
    filter_extra_people = bool(kwargs.get("filter_extra_people", True))

    # общий набор
    smooth_alpha = float(kwargs.get("smooth_alpha", 0.7))
    gap_frames = int(kwargs.get("gap_frames", 12))
    min_run_frames = int(kwargs.get("min_run_frames", 2))

    # пороги рендера
    conf_thresh_body = float(kwargs.get("conf_thresh_body", 0.20))
    conf_thresh_hands = float(kwargs.get("conf_thresh_hands", 0.50))
    conf_thresh_face = 0.20  # <- НЕ добавляем в INPUT, но фиксируем как ты просил

    force_body_18 = bool(kwargs.get("force_body_18", False))

    pose_data = _coerce_pose_data_to_obj(pose_data)

    # pose_data -> frames_json_like
    frames_json_like, meta_ref = _pose_data_to_kps_frames(pose_data, force_body_18=force_body_18)

    with _GLOBAL_LOCK:
        old = _snapshot_tunable_globals()
        try:
            # BODY
            globals()["ALPHA_BODY"] = smooth_alpha
            globals()["SUPER_SMOOTH_ALPHA"] = smooth_alpha
            globals()["MAX_GAP_FRAMES"] = gap_frames
            globals()["MIN_RUN_FRAMES"] = min_run_frames

            # FACE+HANDS (dense) тоже от общего набора
            globals()["DENSE_SUPER_SMOOTH_ALPHA"] = smooth_alpha
            globals()["DENSE_MAX_GAP_FRAMES"] = gap_frames
            globals()["DENSE_MIN_RUN_FRAMES"] = min_run_frames

            globals()["FILTER_EXTRA_PEOPLE"] = filter_extra_people

            smoothed_frames = smooth_KPS_json_obj(
                frames_json_like,
                keep_face_untouched=False,
                keep_hands_untouched=False,
                filter_extra_people=filter_extra_people,
            )
        finally:
            _restore_tunable_globals(old)

    # frames_json_like -> pose_data (обратно в pose_metas)
    out_pose_data = _kps_frames_to_pose_data(pose_data, smoothed_frames, meta_ref, force_body_18=force_body_18)

    # render
    w, h = _extract_canvas_wh(smoothed_frames, default_w=720, default_h=1280)
    frames_np = []
    for fr in smoothed_frames:
        if isinstance(fr, dict) and fr.get("people"):
            img = _draw_pose_frame_full(
                w,
                h,
                fr["people"][0],
                conf_thresh_body=conf_thresh_body,
                conf_thresh_hands=conf_thresh_hands,
                conf_thresh_face=conf_thresh_face,
            )
        else:
            img = np.zeros((h, w, 3), dtype=np.uint8)
        frames_np.append(img)

    frames_t = torch.from_numpy(np.stack(frames_np, axis=0)).float() / 255.0
    return (frames_t, out_pose_data)

============================================================

PKL / pose_data IO

============================================================

class _PoseDummyObj: def init(self, *a, **k): pass

def __setstate__(self, state):
    # поддержка dict и (dict, slotstate)
    if isinstance(state, dict):
        self.__dict__.update(state)
    elif isinstance(state, (list, tuple)) and len(state) == 2 and isinstance(state[0], dict):
        self.__dict__.update(state[0])
        if isinstance(state[1], dict):
            self.__dict__.update(state[1])
        else:
            self.__dict__["_slotstate"] = state[1]
    else:
        self.__dict__["_state"] = state

class _SafeUnpickler(pickle.Unpickler): """ Безопасно грузим PKL из ComfyUI окружения: - ремап numpy._core -> numpy.core - неизвестные классы (WanAnimatePreprocess.*) превращаем в простые объекты с dict """

def find_class(self, module, name):
    # ремап внутренних путей numpy (частая проблема между версиями)
    if module.startswith("numpy._core"):
        module = module.replace("numpy._core", "numpy.core", 1)
    if module.startswith("numpy._globals"):
        module = module.replace("numpy._globals", "numpy", 1)

    # конкретные классы метаданных (если встречаются)
    if name in {"AAPoseMeta"}:
        return _PoseDummyObj

    try:
        return super().find_class(module, name)
    except Exception:
        return _PoseDummyObj

def _load_pose_data_pkl(path: str) -> Any: with open(path, "rb") as f: return _SafeUnpickler(f).load()

def _coerce_pose_data_to_obj(pd: Any) -> Any: """ Accepts: - dict pose_data - object with attributes like .pose_metas (AAPoseMeta-like) - str path to .pkl - dict wrapper with 'pose_data' """ if isinstance(pd, str): obj = _load_pose_data_pkl(pd) return obj

if isinstance(pd, dict) and "pose_data" in pd:
    return pd["pose_data"]

return pd

============================================================

pose_data <-> JSON-like KPS frames

============================================================

def _as_attr(x: Any, key: str, default=None): if isinstance(x, dict): return x.get(key, default) return getattr(x, key, default)

def _set_attr(x: Any, key: str, value: Any): if isinstance(x, dict): x[key] = value else: setattr(x, key, value)

def _xy_p_to_flat(xy: Optional[np.ndarray], p: Optional[np.ndarray]) -> Optional[List[float]]: if xy is None: return None arr = np.asarray(xy) if arr.ndim != 2 or arr.shape[1] < 2: return None N = arr.shape[0] if p is None: pp = np.ones((N,), dtype=np.float32) else: pp = np.asarray(p).reshape(-1) if pp.shape[0] != N: # если вдруг не совпали — подстрахуемся pp = np.ones((N,), dtype=np.float32)

out: List[float] = []
for i in range(N):
    out.extend([float(arr[i, 0]), float(arr[i, 1]), float(pp[i])])
return out

def _flat_to_xy_p(flat: Optional[List[float]]) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: if not isinstance(flat, list) or len(flat) % 3 != 0: return None, None N = len(flat) // 3 xy = np.zeros((N, 2), dtype=np.float32) p = np.zeros((N,), dtype=np.float32) for i in range(N): xy[i, 0] = float(flat[3 * i + 0]) xy[i, 1] = float(flat[3 * i + 1]) p[i] = float(flat[3 * i + 2]) return xy, p

def _pose_data_to_kps_frames(pose_data: Any, *, force_body_18: bool) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """ Делает "как JSON" список кадров: frame = {"people":[{pose_keypoints_2d, face_keypoints_2d, hand_left_keypoints_2d, hand_right_keypoints_2d}], "canvas_width": W, "canvas_height": H} meta_ref: ссылки на pose_metas + тип/доступ, чтобы правильно записать обратно. """ pose_metas = _as_attr(pose_data, "pose_metas", None) if pose_metas is None: # иногда называют иначе pose_metas = _as_attr(pose_data, "frames", None)

if pose_metas is None or not isinstance(pose_metas, list):
    raise ValueError("pose_data does not contain 'pose_metas' list.")

frames: List[Dict[str, Any]] = []
for meta in pose_metas:
    h = _as_attr(meta, "height", 1280)
    w = _as_attr(meta, "width", 720)

    kps_body = _as_attr(meta, "kps_body", None)
    kps_body_p = _as_attr(meta, "kps_body_p", None)

    kps_face = _as_attr(meta, "kps_face", None)
    kps_face_p = _as_attr(meta, "kps_face_p", None)

    kps_lhand = _as_attr(meta, "kps_lhand", None)
    kps_lhand_p = _as_attr(meta, "kps_lhand_p", None)

    kps_rhand = _as_attr(meta, "kps_rhand", None)
    kps_rhand_p = _as_attr(meta, "kps_rhand_p", None)

    # to flat
    pose_flat = _xy_p_to_flat(kps_body, kps_body_p)
    face_flat = _xy_p_to_flat(kps_face, kps_face_p)
    lh_flat = _xy_p_to_flat(kps_lhand, kps_lhand_p)
    rh_flat = _xy_p_to_flat(kps_rhand, kps_rhand_p)

    if force_body_18 and isinstance(pose_flat, list) and len(pose_flat) >= 18 * 3:
        pose_flat = pose_flat[: 18 * 3]

    person = {
        "pose_keypoints_2d": pose_flat if pose_flat is not None else [],
        "face_keypoints_2d": face_flat if face_flat is not None else [],
        "hand_left_keypoints_2d": lh_flat,
        "hand_right_keypoints_2d": rh_flat,
    }

    frame = {"people": [person], "canvas_height": int(h), "canvas_width": int(w)}
    frames.append(frame)

meta_ref = {
    "pose_metas": pose_metas,
    "len": len(pose_metas),
}
return frames, meta_ref

def kps_frames_to_pose_data( pose_data_in: Any, frames_kps: List[Dict[str, Any]], meta_ref: Dict[str, Any], , force_body_18: bool, ) -> Any: """ Записывает обратно сглаженные keypoints в pose_metas[].kps* / kps_*_p. Остальные поля pose_data сохраняем. """ out_pd = copy.deepcopy(pose_data_in) pose_metas_out = _as_attr(out_pd, "pose_metas", None) if pose_metas_out is None: # fallback: вдруг другой ключ pose_metas_out = meta_ref.get("pose_metas")

if pose_metas_out is None or not isinstance(pose_metas_out, list):
    raise ValueError("Failed to locate pose_metas in output pose_data.")

T = min(len(pose_metas_out), len(frames_kps))
for t in range(T):
    meta = pose_metas_out[t]
    fr = frames_kps[t]
    people = fr.get("people", []) if isinstance(fr, dict) else []
    p0 = people[0] if people else None
    if not isinstance(p0, dict):
        continue

    pose_flat = p0.get("pose_keypoints_2d")
    face_flat = p0.get("face_keypoints_2d")
    lh_flat = p0.get("hand_left_keypoints_2d")
    rh_flat = p0.get("hand_right_keypoints_2d")

    if force_body_18 and isinstance(pose_flat, list) and len(pose_flat) >= 18 * 3:
        pose_flat = pose_flat[: 18 * 3]

    body_xy, body_p = _flat_to_xy_p(pose_flat if isinstance(pose_flat, list) else None)
    face_xy, face_p = _flat_to_xy_p(face_flat if isinstance(face_flat, list) else None)
    lh_xy, lh_p = _flat_to_xy_p(lh_flat if isinstance(lh_flat, list) else None)
    rh_xy, rh_p = _flat_to_xy_p(rh_flat if isinstance(rh_flat, list) else None)

    if body_xy is not None and body_p is not None:
        _set_attr(meta, "kps_body", body_xy.astype(np.float32, copy=False))
        _set_attr(meta, "kps_body_p", body_p.astype(np.float32, copy=False))

    if face_xy is not None and face_p is not None:
        _set_attr(meta, "kps_face", face_xy.astype(np.float32, copy=False))
        _set_attr(meta, "kps_face_p", face_p.astype(np.float32, copy=False))

    if lh_xy is not None and lh_p is not None:
        _set_attr(meta, "kps_lhand", lh_xy.astype(np.float32, copy=False))
        _set_attr(meta, "kps_lhand_p", lh_p.astype(np.float32, copy=False))

    if rh_xy is not None and rh_p is not None:
        _set_attr(meta, "kps_rhand", rh_xy.astype(np.float32, copy=False))
        _set_attr(meta, "kps_rhand_p", rh_p.astype(np.float32, copy=False))

    # обновим width/height если нужно
    if isinstance(fr, dict):
        if "canvas_width" in fr:
            _set_attr(meta, "width", int(fr["canvas_width"]))
        if "canvas_height" in fr:
            _set_attr(meta, "height", int(fr["canvas_height"]))

# обязательно положим pose_metas обратно
_set_attr(out_pd, "pose_metas", pose_metas_out)
return out_pd

def _extract_canvas_wh(data: Any, default_w: int, default_h: int) -> Tuple[int, int]: w, h = int(default_w), int(default_h) if isinstance(data, list): for fr in data: if isinstance(fr, dict) and "canvas_width" in fr and "canvas_height" in fr: try: w = int(fr["canvas_width"]) h = int(fr["canvas_height"]) break except Exception: pass return w, h

============================================================

=== START: smooth_KPS_json.py logic (ported as-is)

============================================================

--- Root+Scale carry (when torso disappears on close-up) ---

ROOTSCALE_CARRY_ENABLED = True CARRY_MAX_FRAMES = 48 CARRY_MIN_ANCHORS = 2 CARRY_ANCHOR_JOINTS = [0, 1, 2, 5, 3, 6, 4, 7] CARRY_CONF_GATE = 0.20

--- Main person selection / multi-person filtering ---

FILTER_EXTRA_PEOPLE = True MAIN_PERSON_MODE = "longest_track" TRACK_MATCH_MIN_PX = 80.0 TRACK_MATCH_FACTOR = 3.0 TRACK_MAX_FRAME_GAP = 32

--- Spatial outlier suppression ---

SPATIAL_OUTLIER_FIX = True BONE_MAX_FACTOR = 2.3 TORSO_RADIUS_FACTOR = 4.0

EMA smoothing for BODY only (online)

ALPHA_BODY = 0.70 MAX_STEP_BODY = 60.0 VEL_ALPHA = 0.45 EPS = 0.3 CONF_GATE_BODY = 0.20 CONF_FLOOR_BODY = 0.00

TRACK_DIST_PENALTY = 1.5 FACE_WEIGHT_IN_SCORE = 0.15 HAND_WEIGHT_IN_SCORE = 0.35

ALLOW_DISAPPEAR_JOINTS = {3, 4, 6, 7}

GAP_FILL_ENABLED = True MAX_GAP_FRAMES = 12 MIN_RUN_FRAMES = 2

TORSO_SYNC_ENABLED = True TORSO_JOINTS = {1, 2, 5, 8, 11} TORSO_LOOKAHEAD_FRAMES = 32

SUPER_SMOOTH_ENABLED = True SUPER_SMOOTH_ALPHA = 0.7 SUPER_SMOOTH_MIN_CONF = 0.20

MEDIAN3_ENABLED = True

FACE_SMOOTH_ENABLED = True HANDS_SMOOTH_ENABLED = False

CONF_GATE_FACE = 0.20 CONF_GATE_HAND = 0.50

HAND_MIN_POINTS_PRESENT = 7 MIN_HAND_RUN_FRAMES = 6

DENSE_GAP_FILL_ENABLED = False DENSE_MAX_GAP_FRAMES = 8 DENSE_MIN_RUN_FRAMES = 2

DENSE_MEDIAN3_ENABLED = False DENSE_SUPER_SMOOTH_ENABLED = False DENSE_SUPER_SMOOTH_ALPHA = 0.7

def _snapshot_tunable_globals() -> Dict[str, Any]: keys = [ "FILTER_EXTRA_PEOPLE", "SUPER_SMOOTH_ALPHA", "MAX_GAP_FRAMES", "MIN_RUN_FRAMES", "DENSE_SUPER_SMOOTH_ALPHA", "DENSE_MAX_GAP_FRAMES", "DENSE_MIN_RUN_FRAMES", ] return {k: globals().get(k) for k in keys}

def _restore_tunable_globals(old: Dict[str, Any]) -> None: for k, v in old.items(): globals()[k] = v

def _is_valid_xyc(x: float, y: float, c: float) -> bool: if c is None: return False if c <= 0: return False if x == 0 and y == 0: return False if math.isnan(x) or math.isnan(y) or math.isnan(c): return False return True

def _reshape_keypoints_2d(arr: List[float]) -> List[Tuple[float, float, float]]: if arr is None: return [] if len(arr) % 3 != 0: raise ValueError(f"keypoints length not multiple of 3: {len(arr)}") out = [] for i in range(0, len(arr), 3): out.append((float(arr[i]), float(arr[i + 1]), float(arr[i + 2]))) return out

def _flatten_keypoints_2d(kps: List[Tuple[float, float, float]]) -> List[float]: out: List[float] = [] for x, y, c in kps: out.extend([float(x), float(y), float(c)]) return out

def _sum_conf(arr: Optional[List[float]], sample_step: int = 1) -> float: if not arr: return 0.0 s = 0.0 for i in range(2, len(arr), 3 * sample_step): try: c = float(arr[i]) except Exception: c = 0.0 if c > 0: s += c return s

def _body_center_from_pose(pose_arr: Optional[List[float]]) -> Optional[Tuple[float, float]]: if not pose_arr: return None kps = _reshape_keypoints_2d(pose_arr) idxs = [2, 5, 8, 11, 1] pts = [] for idx in idxs: if idx < len(kps): x, y, c = kps[idx] if _is_valid_xyc(x, y, c): pts.append((x, y)) if not pts: for x, y, c in kps: if _is_valid_xyc(x, y, c): pts.append((x, y)) if not pts: return None cx = sum(p[0] for p in pts) / len(pts) cy = sum(p[1] for p in pts) / len(pts) return (cx, cy)

def _dist(a: Tuple[float, float], b: Tuple[float, float]) -> float: return math.hypot(a[0] - b[0], a[1] - b[1])

def _choose_single_person( people: List[Dict[str, Any]], prev_center: Optional[Tuple[float, float]] ) -> Optional[Dict[str, Any]]: if not people: return None best = None best_score = -1e18

for p in people:
    pose = p.get("pose_keypoints_2d")
    face = p.get("face_keypoints_2d")
    lh = p.get("hand_left_keypoints_2d")
    rh = p.get("hand_right_keypoints_2d")

    score = _sum_conf(pose)
    score += FACE_WEIGHT_IN_SCORE * _sum_conf(face, sample_step=4)
    score += HAND_WEIGHT_IN_SCORE * (_sum_conf(lh, sample_step=2) + _sum_conf(rh, sample_step=2))

    center = _body_center_from_pose(pose)
    if prev_center is not None and center is not None:
        score -= TRACK_DIST_PENALTY * _dist(prev_center, center)

    if score > best_score:
        best_score = score
        best = p

return best

@dataclass class _Track: frames: Dict[int, Dict[str, Any]] centers: Dict[int, Tuple[float, float]] last_t: int last_center: Tuple[float, float]

def _estimate_torso_scale(pose: List[Tuple[float, float, float]]) -> Optional[float]: def dist(i, k) -> Optional[float]: if i >= len(pose) or k >= len(pose): return None xi, yi, ci = pose[i] xk, yk, ck = pose[k] if not _is_valid_xyc(xi, yi, ci) or not _is_valid_xyc(xk, yk, ck): return None return math.hypot(xi - xk, yi - yk)

cand = [dist(2, 5), dist(8, 11), dist(1, 8), dist(1, 11)]
cand = [c for c in cand if c is not None and c > 1e-3]
if not cand:
    return None
return float(sum(cand) / len(cand))

def _track_match_threshold_from_pose(pose_arr: Optional[List[float]]) -> float: if isinstance(pose_arr, list): pose = _reshape_keypoints_2d(pose_arr) s = _estimate_torso_scale(pose) if s is not None: return max(float(TRACK_MATCH_MIN_PX), float(TRACK_MATCH_FACTOR) * float(s)) return float(max(TRACK_MATCH_MIN_PX, 120.0))

def _build_tracks_over_video(frames_data: List[Any]) -> List[_Track]: tracks: List[_Track] = []

for t, frame in enumerate(frames_data):
    if not isinstance(frame, dict):
        continue
    people = frame.get("people", [])
    if not isinstance(people, list) or not people:
        continue

    cand: List[Tuple[int, Dict[str, Any], Tuple[float, float]]] = []
    for i, p in enumerate(people):
        if not isinstance(p, dict):
            continue
        pose = p.get("pose_keypoints_2d")
        c = _body_center_from_pose(pose)
        if c is None:
            continue
        cand.append((i, p, c))

    if not cand:
        continue

    used = set()
    track_order = sorted(range(len(tracks)), key=lambda k: tracks[k].last_t, reverse=True)

    for k in track_order:
        tr = tracks[k]
        age = t - tr.last_t
        if age > int(TRACK_MAX_FRAME_GAP):
            continue

        best_idx = None
        best_d = 1e18

        for i, p, cc in cand:
            if i in used:
                continue

            thr = _track_match_threshold_from_pose(p.get("pose_keypoints_2d"))
            d = _dist(tr.last_center, cc)
            if d <= thr and d < best_d:
                best_d = d
                best_idx = i

        if best_idx is not None:
            i, p, cc = next(x for x in cand if x[0] == best_idx)
            used.add(i)
            tr.frames[t] = p
            tr.centers[t] = cc
            tr.last_t = t
            tr.last_center = cc

    for i, p, cc in cand:
        if i in used:
            continue
        tracks.append(_Track(frames={t: p}, centers={t: cc}, last_t=t, last_center=cc))

return tracks

def _track_presence_score(tr: _Track) -> Tuple[int, float, float]: frames_count = len(tr.frames) face_sum = 0.0 body_sum = 0.0 for p in tr.frames.values(): face_sum += _sum_conf(p.get("face_keypoints_2d"), sample_step=4) body_sum += _sum_conf(p.get("pose_keypoints_2d"), sample_step=1) return (frames_count, face_sum, body_sum)

def _pick_main_track(tracks: List[_Track]) -> Optional[_Track]: if not tracks: return None best = None best_key = (-1, -1e18, -1e18) for tr in tracks: key = _track_presence_score(tr) if key > best_key: best_key = key best = tr return best

@dataclass class BodyState: last_xy: List[Optional[Tuple[float, float]]] last_v: List[Tuple[float, float]]

def __init__(self, joints: int):
    self.last_xy = [None] * joints
    self.last_v = [(0.0, 0.0)] * joints

def _smooth_body_pose(pose_arr: Optional[List[float]], state: BodyState) -> Optional[List[float]]: if pose_arr is None: return None

kps = _reshape_keypoints_2d(pose_arr)
J = len(kps)
if len(state.last_xy) != J:
    state.last_xy = [None] * J
    state.last_v = [(0.0, 0.0)] * J

out: List[Tuple[float, float, float]] = []

for j in range(J):
    x, y, c = kps[j]
    last = state.last_xy[j]
    vx_last, vy_last = state.last_v[j]

    valid_in = _is_valid_xyc(x, y, c) and (c >= CONF_GATE_BODY)

    if valid_in:
        if last is None:
            nx, ny = x, y
            state.last_xy[j] = (nx, ny)
            state.last_v[j] = (0.0, 0.0)
            out.append((nx, ny, float(c)))
            continue

        dx_raw = x - last[0]
        dy_raw = y - last[1]
        if abs(dx_raw) < EPS:
            dx_raw = 0.0
        if abs(dy_raw) < EPS:
            dy_raw = 0.0

        vx = VEL_ALPHA * dx_raw + (1.0 - VEL_ALPHA) * vx_last
        vy = VEL_ALPHA * dy_raw + (1.0 - VEL_ALPHA) * vy_last

        px = last[0] + vx
        py = last[1] + vy

        nx = ALPHA_BODY * x + (1.0 - ALPHA_BODY) * px
        ny = ALPHA_BODY * y + (1.0 - ALPHA_BODY) * py

        ddx = nx - last[0]
        ddy = ny - last[1]
        d = math.hypot(ddx, ddy)
        if d > MAX_STEP_BODY and d > 1e-6:
            scale = MAX_STEP_BODY / d
            nx = last[0] + ddx * scale
            ny = last[1] + ddy * scale
            vx = nx - last[0]
            vy = ny - last[1]

        state.last_xy[j] = (nx, ny)
        state.last_v[j] = (vx, vy)

        out.append((nx, ny, float(c)))
    else:
        out.append((float(x), float(y), float(c)))

return _flatten_keypoints_2d(out)

COCO18_EDGES = [ (1, 2), (2, 3), (3, 4), (1, 5), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11), (11, 12), (12, 13), (8, 11), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), ]

HAND21_EDGES = [ (0, 1), (1, 2), (2, 3), (3, 4), (0, 5), (5, 6), (6, 7), (7, 8), (0, 9), (9, 10), (10, 11), (11, 12), (0, 13), (13, 14), (14, 15), (15, 16), (0, 17), (17, 18), (18, 19), (19, 20), ]

_NEIGHBORS = None

def _build_neighbors(): global _NEIGHBORS if _NEIGHBORS is not None: return neigh = {} for a, b in COCO18_EDGES: neigh.setdefault(a, set()).add(b) neigh.setdefault(b, set()).add(a) _NEIGHBORS = neigh

def _suppress_spatial_outliers_in_pose_arr( pose_arr: Optional[List[float]], *, conf_gate: float ) -> Optional[List[float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return pose_arr

pose = _reshape_keypoints_2d(pose_arr)
J = len(pose)

center = _body_center_from_pose(pose_arr)
scale = _estimate_torso_scale(pose)
if center is None or scale is None:
    return pose_arr

cx, cy = center
max_r = TORSO_RADIUS_FACTOR * scale
max_bone = BONE_MAX_FACTOR * scale

out = [list(p) for p in pose]

def visible(j: int) -> bool:
    if j >= J:
        return False
    x, y, c = out[j]
    return (c >= conf_gate) and not (x == 0 and y == 0)

for j in range(J):
    x, y, c = out[j]
    if c >= conf_gate and not (x == 0 and y == 0):
        if math.hypot(x - cx, y - cy) > max_r:
            out[j] = [0.0, 0.0, 0.0]

for a, b in COCO18_EDGES:
    if a >= J or b >= J:
        continue
    if not visible(a) or not visible(b):
        continue
    ax, ay, ac = out[a]
    bx, by, bc = out[b]
    d = math.hypot(ax - bx, ay - by)
    if d > max_bone:
        if ac <= bc:
            out[a] = [0.0, 0.0, 0.0]
        else:
            out[b] = [0.0, 0.0, 0.0]

flat: List[float] = []
for x, y, c in out:
    flat.extend([float(x), float(y), float(c)])
return flat

def _suppress_isolated_joints_in_pose_arr( pose_arr: Optional[List[float]], *, conf_gate: float, keep: set[int] = None ) -> Optional[List[float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return pose_arr

_build_neighbors()
pose = _reshape_keypoints_2d(pose_arr)
J = len(pose)
out = [list(p) for p in pose]

if keep is None:
    keep = set()

def vis(j: int) -> bool:
    if j >= J:
        return False
    x, y, c = out[j]
    return (c >= conf_gate) and not (x == 0 and y == 0)

for j in range(J):
    if j in keep:
        continue
    if not vis(j):
        continue
    neighs = _NEIGHBORS.get(j, set())
    if not any((n < J and vis(n)) for n in neighs):
        out[j] = [0.0, 0.0, 0.0]

flat = []
for x, y, c in out:
    flat.extend([float(x), float(y), float(c)])
return flat

def _denoise_and_fill_gaps_pose_seq( pose_arr_seq: List[Optional[List[float]]], *, conf_gate: float, min_run: int, max_gap: int, ) -> List[Optional[List[float]]]: if not pose_arr_seq: return pose_arr_seq

J = None
for arr in pose_arr_seq:
    if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0:
        J = len(arr) // 3
        break
if J is None:
    return pose_arr_seq

T = len(pose_arr_seq)
out_seq: List[Optional[List[float]]] = []
for arr in pose_arr_seq:
    if isinstance(arr, list) and len(arr) == J * 3:
        out_seq.append(list(arr))
    else:
        out_seq.append(arr)

def is_vis(arr: List[float], j: int) -> bool:
    x = float(arr[3 * j + 0])
    y = float(arr[3 * j + 1])
    c = float(arr[3 * j + 2])
    return (c >= conf_gate) and not (x == 0 and y == 0)

# 1) remove short flashes
for j in range(J):
    start = None
    for t in range(T + 1):
        cur = False
        if t < T and isinstance(out_seq[t], list):
            cur = is_vis(out_seq[t], j)
        if cur and start is None:
            start = t
        if (not cur) and start is not None:
            run_len = t - start
            if run_len < min_run:
                for k in range(start, t):
                    if not isinstance(out_seq[k], list):
                        continue
                    out_seq[k][3 * j + 0] = 0.0
                    out_seq[k][3 * j + 1] = 0.0
                    out_seq[k][3 * j + 2] = 0.0
            start = None

# 2) gap fill only if returns
for j in range(J):
    last_vis_t = None
    t = 0
    while t < T:
        arr = out_seq[t]
        if not isinstance(arr, list):
            t += 1
            continue

        cur_vis = is_vis(arr, j)
        if cur_vis:
            last_vis_t = t
            t += 1
            continue

        if last_vis_t is None:
            t += 1
            continue

        gap_start = t
        t2 = t
        while t2 < T:
            arr2 = out_seq[t2]
            if isinstance(arr2, list) and is_vis(arr2, j):
                break
            t2 += 1

        if t2 >= T:
            break

        gap_len = t2 - gap_start
        if gap_len <= 0:
            t = t2
            continue

        if gap_len <= max_gap:
            a = out_seq[last_vis_t]
            b = out_seq[t2]
            if isinstance(a, list) and isinstance(b, list):
                ax, ay, ac = float(a[3 * j + 0]), float(a[3 * j + 1]), float(a[3 * j + 2])
                bx, by, bc = float(b[3 * j + 0]), float(b[3 * j + 1]), float(b[3 * j + 2])
                if not (ax == 0 and ay == 0) and not (bx == 0 and by == 0):
                    conf_fill = min(ac, bc)
                    for k in range(gap_len):
                        tt = gap_start + k
                        if not isinstance(out_seq[tt], list):
                            continue
                        r = (k + 1) / (gap_len + 1)
                        x = ax + (bx - ax) * r
                        y = ay + (by - ay) * r
                        out_seq[tt][3 * j + 0] = float(x)
                        out_seq[tt][3 * j + 1] = float(y)
                        out_seq[tt][3 * j + 2] = float(conf_fill)

        t = t2

return out_seq

def _zero_lag_ema_pose_seq( pose_seq: List[Optional[List[float]]], *, alpha: float, conf_gate: float ) -> List[Optional[List[float]]]: if not pose_seq: return pose_seq

J = None
for arr in pose_seq:
    if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0:
        J = len(arr) // 3
        break
if J is None:
    return pose_seq

T = len(pose_seq)

def is_vis(arr: List[float], j: int) -> bool:
    x = float(arr[3 * j + 0])
    y = float(arr[3 * j + 1])
    c = float(arr[3 * j + 2])
    return (c >= conf_gate) and not (x == 0 and y == 0)

fwd = [None] * T
last = [None] * J
for t in range(T):
    arr = pose_seq[t]
    if not isinstance(arr, list) or len(arr) != J * 3:
        fwd[t] = arr
        continue
    out = list(arr)
    for j in range(J):
        if is_vis(arr, j):
            x = float(arr[3 * j + 0])
            y = float(arr[3 * j + 1])
            if last[j] is None:
                sx, sy = x, y
            else:
                sx = alpha * x + (1 - alpha) * last[j][0]
                sy = alpha * y + (1 - alpha) * last[j][1]
            last[j] = (sx, sy)
            out[3 * j + 0] = float(sx)
            out[3 * j + 1] = float(sy)
    fwd[t] = out

bwd = [None] * T
last = [None] * J
for t in range(T - 1, -1, -1):
    arr = fwd[t]
    if not isinstance(arr, list) or len(arr) != J * 3:
        bwd[t] = arr
        continue
    out = list(arr)
    for j in range(J):
        if is_vis(arr, j):
            x = float(arr[3 * j + 0])
            y = float(arr[3 * j + 1])
            if last[j] is None:
                sx, sy = x, y
            else:
                sx = alpha * x + (1 - alpha) * last[j][0]
                sy = alpha * y + (1 - alpha) * last[j][1]
            last[j] = (sx, sy)
            out[3 * j + 0] = float(sx)
            out[3 * j + 1] = float(sy)
    bwd[t] = out

return bwd

def _apply_root_scale( pose_arr: Optional[List[float]], *, src_root: Tuple[float, float], src_scale: float, dst_root: Tuple[float, float], dst_scale: float, ) -> Optional[List[float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return pose_arr if src_scale <= 1e-6 or dst_scale <= 1e-6: return pose_arr

kps = _reshape_keypoints_2d(pose_arr)
out = []
s = dst_scale / src_scale

for x, y, c in kps:
    if c <= 0 or (x == 0 and y == 0):
        out.append((x, y, c))
        continue
    nx = dst_root[0] + (x - src_root[0]) * s
    ny = dst_root[1] + (y - src_root[1]) * s
    out.append((nx, ny, c))

return _flatten_keypoints_2d(out)

def _carry_pose_when_torso_missing( pose_seq: List[Optional[List[float]]], *, conf_gate: float, max_carry: int, anchor_joints: List[int], min_anchors: int, ) -> List[Optional[List[float]]]: if not pose_seq: return pose_seq

J = None
for arr in pose_seq:
    if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0:
        J = len(arr) // 3
        break
if J is None:
    return pose_seq

out = [a if a is None else list(a) for a in pose_seq]

FILL_JOINTS = {1, 8, 9, 10, 11, 12, 13}
FILL_JOINTS -= set(ALLOW_DISAPPEAR_JOINTS)

def is_vis_flat(arr: List[float], j: int) -> bool:
    x = float(arr[3 * j + 0])
    y = float(arr[3 * j + 1])
    c = float(arr[3 * j + 2])
    return (c >= conf_gate) and not (x == 0 and y == 0)

def count_visible(arr: List[float], joints: List[int]) -> int:
    c = 0
    for j in joints:
        if j < J and is_vis_flat(arr, j):
            c += 1
    return c

def root_scale_from_anchors(arr: List[float]) -> Optional[Tuple[Tuple[float, float], float]]:
    pts = []
    for j in anchor_joints:
        if j >= J:
            continue
        if is_vis_flat(arr, j):
            x = float(arr[3 * j + 0])
            y = float(arr[3 * j + 1])
            pts.append((x, y))
    if len(pts) < min_anchors:
        return None

    rx = sum(p[0] for p in pts) / len(pts)
    ry = sum(p[1] for p in pts) / len(pts)

    xs = [p[0] for p in pts]
    ys = [p[1] for p in pts]
    scale = max(max(xs) - min(xs), max(ys) - min(ys))
    if scale <= 1e-3:
        return None

    return (rx, ry), float(scale)

last_good: Optional[List[float]] = None
last_good_rs: Optional[Tuple[Tuple[float, float], float]] = None
carry_left = 0

for t in range(len(out)):
    arr = out[t]
    if not isinstance(arr, list) or len(arr) != J * 3:
        continue

    anchors_ok = count_visible(arr, anchor_joints) >= min_anchors
    fill_vis = sum(1 for j in FILL_JOINTS if j < J and is_vis_flat(arr, j))
    rs = root_scale_from_anchors(arr)

    if anchors_ok and rs is not None and fill_vis >= 2:
        last_good = list(arr)
        last_good_rs = rs
        carry_left = max_carry
        continue

    if anchors_ok and rs is not None and last_good is not None and last_good_rs is not None and carry_left > 0:
        dst_root, dst_scale = rs
        src_root, src_scale = last_good_rs

        carried_full = _apply_root_scale(
            last_good,
            src_root=src_root,
            src_scale=src_scale,
            dst_root=dst_root,
            dst_scale=dst_scale,
        )
        if isinstance(carried_full, list) and len(carried_full) == J * 3:
            for j in FILL_JOINTS:
                if j >= J:
                    continue
                if is_vis_flat(arr, j):
                    continue

                cx = float(carried_full[3 * j + 0])
                cy = float(carried_full[3 * j + 1])
                cc = float(carried_full[3 * j + 2])

                if (cx == 0 and cy == 0) or cc <= 0:
                    continue

                arr[3 * j + 0] = cx
                arr[3 * j + 1] = cy
                arr[3 * j + 2] = max(min(cc, 0.60), conf_gate)

            out[t] = arr
            carry_left -= 1
            continue

    carry_left = max(carry_left - 1, 0)

return out

def _force_full_torso_pair( pose_seq: List[Optional[List[float]]], *, conf_gate: float, anchor_joints: List[int], min_anchors: int, max_lookback: int = 240, fill_legs_with_hip: bool = True, always_fill_if_one_hip: bool = True, ) -> List[Optional[List[float]]]: if not pose_seq: return pose_seq

J = None
for arr in pose_seq:
    if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0:
        J = len(arr) // 3
        break
if J is None:
    return pose_seq

out = [a if a is None else list(a) for a in pose_seq]

R_HIP, R_KNEE, R_ANK = 8, 9, 10
L_HIP, L_KNEE, L_ANK = 11, 12, 13

def is_vis(arr: List[float], j: int) -> bool:
    if j >= J:
        return False
    x = float(arr[3 * j + 0])
    y = float(arr[3 * j + 1])
    c = float(arr[3 * j + 2])
    return (c >= conf_gate) and not (x == 0 and y == 0)

def count_visible(arr: List[float], joints: List[int]) -> int:
    c = 0
    for j in joints:
        if is_vis(arr, j):
            c += 1
    return c

def root_scale_from_anchors(arr: List[float]) -> Optional[Tuple[Tuple[float, float], float]]:
    pts = []
    for j in anchor_joints:
        if j >= J:
            continue
        if is_vis(arr, j):
            pts.append((float(arr[3 * j + 0]), float(arr[3 * j + 1])))
    if len(pts) < min_anchors:
        return None

    rx = sum(p[0] for p in pts) / len(pts)
    ry = sum(p[1] for p in pts) / len(pts)

    xs = [p[0] for p in pts]
    ys = [p[1] for p in pts]
    scale = max(max(xs) - min(xs), max(ys) - min(ys))
    if scale <= 1e-3:
        return None
    return (rx, ry), float(scale)

last_full_idx = None
last_full = None
last_full_rs = None

for t in range(len(out)):
    arr = out[t]
    if not isinstance(arr, list) or len(arr) != J * 3:
        continue

    rs = root_scale_from_anchors(arr)

    r_ok = is_vis(arr, R_HIP)
    l_ok = is_vis(arr, L_HIP)

    anchors_ok = count_visible(arr, anchor_joints) >= min_anchors

    if anchors_ok and rs is not None and r_ok and l_ok:
        last_full_idx = t
        last_full = list(arr)
        last_full_rs = rs
        continue

    if last_full is None or last_full_rs is None or last_full_idx is None:
        continue
    if (t - last_full_idx) > max_lookback:
        continue
    if not (r_ok or l_ok):
        continue
    if r_ok and l_ok:
        continue
    if not always_fill_if_one_hip:
        continue
    if rs is None:
        continue

    dst_root, dst_scale = rs
    src_root, src_scale = last_full_rs

    carried = _apply_root_scale(
        last_full,
        src_root=src_root,
        src_scale=src_scale,
        dst_root=dst_root,
        dst_scale=dst_scale,
    )
    if not (isinstance(carried, list) and len(carried) == J * 3):
        continue

    def copy_joint(j: int):
        if j >= J:
            return
        if is_vis(arr, j):
            return
        cx = float(carried[3 * j + 0])
        cy = float(carried[3 * j + 1])
        cc = float(carried[3 * j + 2])
        if (cx == 0 and cy == 0) or cc <= 0:
            return
        arr[3 * j + 0] = cx
        arr[3 * j + 1] = cy
        arr[3 * j + 2] = max(min(cc, 0.60), conf_gate)

    if not r_ok:
        copy_joint(R_HIP)
        if fill_legs_with_hip:
            copy_joint(R_KNEE)
            copy_joint(R_ANK)

    if not l_ok:
        copy_joint(L_HIP)
        if fill_legs_with_hip:
            copy_joint(L_KNEE)
            copy_joint(L_ANK)

    out[t] = arr

return out

def _median3_pose_seq(pose_seq: List[Optional[List[float]]], *, conf_gate: float) -> List[Optional[List[float]]]: if not pose_seq: return pose_seq

J = None
for arr in pose_seq:
    if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0:
        J = len(arr) // 3
        break
if J is None:
    return pose_seq

T = len(pose_seq)

def is_vis(arr: List[float], j: int) -> bool:
    x = float(arr[3 * j + 0])
    y = float(arr[3 * j + 1])
    c = float(arr[3 * j + 2])
    return (c >= conf_gate) and not (x == 0 and y == 0)

out_seq: List[Optional[List[float]]] = []
for t in range(T):
    arr = pose_seq[t]
    if not isinstance(arr, list) or len(arr) != J * 3:
        out_seq.append(arr)
        continue

    out = list(arr)
    t0 = max(0, t - 1)
    t1 = t
    t2 = min(T - 1, t + 1)

    a0 = pose_seq[t0]
    a1 = pose_seq[t1]
    a2 = pose_seq[t2]

    for j in range(J):
        if not is_vis(arr, j):
            continue

        xs, ys = [], []
        for aa in (a0, a1, a2):
            if isinstance(aa, list) and len(aa) == J * 3 and is_vis(aa, j):
                xs.append(float(aa[3 * j + 0]))
                ys.append(float(aa[3 * j + 1]))

        if len(xs) >= 2:
            xs.sort()
            ys.sort()
            out[3 * j + 0] = float(xs[len(xs) // 2])
            out[3 * j + 1] = float(ys[len(ys) // 2])

    out_seq.append(out)

return out_seq

def _sync_group_appearances( pose_arr_seq: List[Optional[List[float]]], *, group: set[int], conf_gate: float, lookahead: int, ) -> List[Optional[List[float]]]: if not pose_arr_seq: return pose_arr_seq

J = None
for arr in pose_arr_seq:
    if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0:
        J = len(arr) // 3
        break
if J is None:
    return pose_arr_seq

T = len(pose_arr_seq)
out_seq: List[Optional[List[float]]] = []
for arr in pose_arr_seq:
    if isinstance(arr, list) and len(arr) == J * 3:
        out_seq.append(list(arr))
    else:
        out_seq.append(arr)

def is_vis(arr: List[float], j: int) -> bool:
    x = float(arr[3 * j + 0])
    y = float(arr[3 * j + 1])
    c = float(arr[3 * j + 2])
    return (c >= conf_gate) and not (x == 0 and y == 0)

for t in range(T):
    arr = out_seq[t]
    if not isinstance(arr, list):
        continue

    vis = {j for j in group if j < J and is_vis(arr, j)}
    if not vis:
        continue

    missing = {j for j in group if j < J and j not in vis}
    if not missing:
        continue

    appear_t: dict[int, int] = {}
    for j in list(missing):
        t2 = t + 1
        while t2 < T and t2 <= t + lookahead:
            arr2 = out_seq[t2]
            if isinstance(arr2, list) and is_vis(arr2, j):
                appear_t[j] = t2
                break
            t2 += 1

    if not appear_t:
        continue

    for j, t2 in appear_t.items():
        last_t = None
        for tb in range(t - 1, -1, -1):
            arrb = out_seq[tb]
            if isinstance(arrb, list) and is_vis(arrb, j):
                last_t = tb
                break

        if last_t is None:
            b = out_seq[t2]
            if not isinstance(b, list):
                continue
            bx, by, bc = float(b[3 * j + 0]), float(b[3 * j + 1]), float(b[3 * j + 2])
            for k in range(t, t2):
                a = out_seq[k]
                if not isinstance(a, list):
                    continue
                a[3 * j + 0] = bx
                a[3 * j + 1] = by
                a[3 * j + 2] = bc
            continue

        a0 = out_seq[last_t]
        b0 = out_seq[t2]
        if not (isinstance(a0, list) and isinstance(b0, list)):
            continue

        ax, ay, ac = float(a0[3 * j + 0]), float(a0[3 * j + 1]), float(a0[3 * j + 2])
        bx, by, bc = float(b0[3 * j + 0]), float(b0[3 * j + 1]), float(b0[3 * j + 2])

        if (ax == 0 and ay == 0) or (bx == 0 and by == 0):
            continue

        conf_fill = min(ac, bc)
        total = t2 - last_t
        if total <= 0:
            continue

        for tt in range(t, t2):
            a = out_seq[tt]
            if not isinstance(a, list):
                continue
            r = (tt - last_t) / total
            x = ax + (bx - ax) * r
            y = ay + (by - ay) * r
            a[3 * j + 0] = float(x)
            a[3 * j + 1] = float(y)
            a[3 * j + 2] = float(conf_fill)

return out_seq

def _count_valid_points(arr: Optional[List[float]], *, conf_gate: float) -> int: if not isinstance(arr, list) or len(arr) % 3 != 0: return 0 cnt = 0 for i in range(0, len(arr), 3): x, y, c = float(arr[i]), float(arr[i + 1]), float(arr[i + 2]) if c >= conf_gate and not (x == 0 and y == 0): cnt += 1 return cnt

def _zero_out_kps(arr: Optional[List[float]]) -> Optional[List[float]]: if not isinstance(arr, list) or len(arr) % 3 != 0: return arr out = list(arr) for i in range(0, len(out), 3): out[i + 0] = 0.0 out[i + 1] = 0.0 out[i + 2] = 0.0 return out

def _pin_body_wrist_to_hand( p_out: Dict[str, Any], *, side: str, conf_gate_body: float = 0.2, conf_gate_hand: float = 0.2, blend: float = 1.0, ) -> None: if side == "right": bw = 4 hk = "hand_right_keypoints_2d" else: bw = 7 hk = "hand_left_keypoints_2d"

pose = p_out.get("pose_keypoints_2d")
hand = p_out.get(hk)

if not (isinstance(pose, list) and isinstance(hand, list)):
    return
if len(pose) < (bw * 3 + 3):
    return
if len(hand) < 3:
    return

hx, hy, hc = float(hand[0]), float(hand[1]), float(hand[2])
if hc < conf_gate_hand or (hx == 0.0 and hy == 0.0):
    return

bx, by, bc = float(pose[bw * 3 + 0]), float(pose[bw * 3 + 1]), float(pose[bw * 3 + 2])

if bc < conf_gate_body or (bx == 0.0 and by == 0.0):
    pose[bw * 3 + 0] = hx
    pose[bw * 3 + 1] = hy
    pose[bw * 3 + 2] = float(max(bc, min(hc, 0.9)))
else:
    nx = bx * (1.0 - blend) + hx * blend
    ny = by * (1.0 - blend) + hy * blend
    pose[bw * 3 + 0] = nx
    pose[bw * 3 + 1] = ny
    pose[bw * 3 + 2] = float(min(bc, hc))

p_out["pose_keypoints_2d"] = pose

def _fix_elbow_using_wrist(p_out: Dict[str, Any], *, side: str, conf_gate: float = 0.2) -> None: pose = p_out.get("pose_keypoints_2d") if not isinstance(pose, list) or len(pose) % 3 != 0: return

if side == "right":
    sh, el, wr = 2, 3, 4
else:
    sh, el, wr = 5, 6, 7

def get(j):
    return float(pose[3 * j + 0]), float(pose[3 * j + 1]), float(pose[3 * j + 2])

def vis(x, y, c):
    return c >= conf_gate and not (x == 0.0 and y == 0.0)

sx, sy, sc = get(sh)
ex, ey, ec = get(el)
wx, wy, wc = get(wr)

if not (vis(sx, sy, sc) and vis(wx, wy, wc)):
    return

if vis(ex, ey, ec):
    Lse = math.hypot(ex - sx, ey - sy)
    Lew = math.hypot(wx - ex, wy - ey)
else:
    dsw = math.hypot(wx - sx, wy - sy)
    if dsw < 1e-3:
        return
    Lse = 0.55 * dsw
    Lew = 0.45 * dsw

dx = wx - sx
dy = wy - sy
d = math.hypot(dx, dy)
if d < 1e-6:
    return

d2 = max(min(d, (Lse + Lew) - 1e-3), abs(Lse - Lew) + 1e-3)

a = (Lse * Lse - Lew * Lew + d2 * d2) / (2.0 * d2)
h2 = max(Lse * Lse - a * a, 0.0)
h = math.sqrt(h2)

ux = dx / d
uy = dy / d
px = sx + a * ux
py = sy + a * uy

rx = -uy
ry = ux

e1x, e1y = px + h * rx, py + h * ry
e2x, e2y = px - h * rx, py - h * ry

if vis(ex, ey, ec):
    if math.hypot(e1x - ex, e1y - ey) <= math.hypot(e2x - ex, e2y - ey):
        nx, ny = e1x, e1y
    else:
        nx, ny = e2x, e2y
else:
    nx, ny = e1x, e1y

pose[3 * el + 0] = float(nx)
pose[3 * el + 1] = float(ny)
pose[3 * el + 2] = float(max(min(ec, 0.8), conf_gate))

p_out["pose_keypoints_2d"] = pose

def _remove_short_presence_runs_kps_seq( seq: List[Optional[List[float]]], *, conf_gate: float, min_points_present: int, min_run: int, ) -> List[Optional[List[float]]]: if not seq: return seq

present = [(_count_valid_points(a, conf_gate=conf_gate) >= min_points_present) for a in seq]
out = [None if a is None else list(a) for a in seq]

start = None
for t in range(len(seq) + 1):
    cur = present[t] if t < len(seq) else False
    if cur and start is None:
        start = t
    if (not cur) and start is not None:
        run_len = t - start
        if run_len < min_run:
            for k in range(start, t):
                out[k] = _zero_out_kps(out[k])
        start = None

return out

def _zero_sparse_frames_kps_seq( seq: List[Optional[List[float]]], *, conf_gate: float, min_points_present: int ) -> List[Optional[List[float]]]: if not seq: return seq

out: List[Optional[List[float]]] = []
for a in seq:
    if not isinstance(a, list):
        out.append(a)
        continue
    if _count_valid_points(a, conf_gate=conf_gate) < min_points_present:
        out.append(_zero_out_kps(a))
    else:
        out.append(a)
return out

def _suppress_spatial_outliers_in_hand_arr( hand_arr: Optional[List[float]], *, conf_gate: float, max_bone_factor: float = 3.0 ) -> Optional[List[float]]: if not isinstance(hand_arr, list) or len(hand_arr) % 3 != 0: return hand_arr pts = _reshape_keypoints_2d(hand_arr) J = len(pts) if J < 21: return hand_arr

out = [list(p) for p in pts]

def vis(j: int) -> bool:
    x, y, c = out[j]
    return c >= conf_gate and not (x == 0 and y == 0)

vv = [(x, y) for (x, y, c) in out if c >= conf_gate and not (x == 0 and y == 0)]
if len(vv) < 6:
    return hand_arr
xs = [p[0] for p in vv]
ys = [p[1] for p in vv]
scale = max(max(xs) - min(xs), max(ys) - min(ys))
if scale <= 1e-3:
    return hand_arr
max_bone = max_bone_factor * scale

for a, b in HAND21_EDGES:
    if a >= J or b >= J:
        continue
    if not vis(a) or not vis(b):
        continue
    ax, ay, ac = out[a]
    bx, by, bc = out[b]
    d = math.hypot(ax - bx, ay - by)
    if d > max_bone:
        if ac <= bc:
            out[a] = [0.0, 0.0, 0.0]
        else:
            out[b] = [0.0, 0.0, 0.0]

return _flatten_keypoints_2d([(x, y, c) for x, y, c in out])

def _body_head_root_scale_from_pose( pose_arr: Optional[List[float]], *, conf_gate: float ) -> Optional[Tuple[Tuple[float, float], float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return None kps = _reshape_keypoints_2d(pose_arr)

def vis(j: int) -> Optional[Tuple[float, float]]:
    if j >= len(kps):
        return None
    x, y, c = kps[j]
    if c >= conf_gate and not (x == 0 and y == 0):
        return (float(x), float(y))
    return None

pts = []
for j in [0, 1, 14, 15, 16, 17]:
    p = vis(j)
    if p is not None:
        pts.append(p)

if not pts:
    return None

rx = sum(p[0] for p in pts) / len(pts)
ry = sum(p[1] for p in pts) / len(pts)
root = (rx, ry)

def dist(a: int, b: int) -> Optional[float]:
    pa, pb = vis(a), vis(b)
    if pa is None or pb is None:
        return None
    d = math.hypot(pa[0] - pb[0], pa[1] - pb[1])
    return d if d > 1e-3 else None

cands = [dist(14, 15), dist(16, 17), dist(2, 5)]
cands = [c for c in cands if c is not None]
if not cands:
    return None

scale = float(sum(cands) / len(cands))
return root, scale

def _body_wrist_root_scale_from_pose( pose_arr: Optional[List[float]], *, side: str, conf_gate: float ) -> Optional[Tuple[Tuple[float, float], float]]: if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: return None kps = _reshape_keypoints_2d(pose_arr)

if side == "right":
    w, e = 4, 3
else:
    w, e = 7, 6

def vis(j: int) -> Optional[Tuple[float, float]]:
    if j >= len(kps):
        return None
    x, y, c = kps[j]
    if c >= conf_gate and not (x == 0 and y == 0):
        return (float(x), float(y))
    return None

pw = vis(w)
if pw is None:
    return None
root = pw

pe = vis(e)
scale = None
if pe is not None:
    d = math.hypot(pw[0] - pe[0], pw[1] - pe[1])
    if d > 1e-3:
        scale = d

if scale is None:
    p2 = vis(2)
    p5 = vis(5)
    if p2 is not None and p5 is not None:
        d = math.hypot(p2[0] - p5[0], p2[1] - p5[1])
        if d > 1e-3:
            scale = d

if scale is None:
    return None

return root, float(scale)

def _smooth_dense_seq_anchored_to_body( dense_seq: List[Optional[List[float]]], body_pose_seq: List[Optional[List[float]]], *, kind: str, conf_gate_dense: float, conf_gate_body: float, median3: bool, zero_lag_alpha: float, ) -> List[Optional[List[float]]]: if not dense_seq: return dense_seq

Jd = None
for a in dense_seq:
    if isinstance(a, list) and len(a) % 3 == 0 and len(a) > 0:
        Jd = len(a) // 3
        break
if Jd is None:
    return dense_seq

T = len(dense_seq)
out = [None if a is None else list(a) for a in dense_seq]

norm_seq: List[Optional[List[float]]] = [None] * T

for t in range(T):
    arr = out[t]
    body = body_pose_seq[t] if t < len(body_pose_seq) else None
    if not isinstance(arr, list) or len(arr) != Jd * 3 or not isinstance(body, list):
        norm_seq[t] = arr
        continue

    if kind == "face":
        rs = _body_head_root_scale_from_pose(body, conf_gate=conf_gate_body)
    elif kind == "hand_left":
        rs = _body_wrist_root_scale_from_pose(body, side="left", conf_gate=conf_gate_body)
    else:
        rs = _body_wrist_root_scale_from_pose(body, side="right", conf_gate=conf_gate_body)

    if rs is None:
        norm_seq[t] = arr
        continue

    (rx, ry), s = rs
    if s <= 1e-6:
        norm_seq[t] = arr
        continue

    nn = list(arr)
    for j in range(Jd):
        x = float(arr[3 * j + 0])
        y = float(arr[3 * j + 1])
        c = float(arr[3 * j + 2])
        if c >= conf_gate_dense and not (x == 0 and y == 0):
            nn[3 * j + 0] = (x - rx) / s
            nn[3 * j + 1] = (y - ry) / s
    norm_seq[t] = nn

if median3:
    norm_seq = _median3_pose_seq(norm_seq, conf_gate=conf_gate_dense)

norm_seq = _zero_lag_ema_pose_seq(norm_seq, alpha=zero_lag_alpha, conf_gate=conf_gate_dense)

for t in range(T):
    arrn = norm_seq[t]
    body = body_pose_seq[t] if t < len(body_pose_seq) else None
    if not isinstance(arrn, list) or len(arrn) != Jd * 3 or not isinstance(body, list):
        continue

    if kind == "face":
        rs = _body_head_root_scale_from_pose(body, conf_gate=conf_gate_body)
    elif kind == "hand_left":
        rs = _body_wrist_root_scale_from_pose(body, side="left", conf_gate=conf_gate_body)
    else:
        rs = _body_wrist_root_scale_from_pose(body, side="right", conf_gate=conf_gate_body)

    if rs is None:
        continue

    (rx, ry), s = rs
    if s <= 1e-6:
        continue

    orig = out[t]
    for j in range(Jd):
        x = float(arrn[3 * j + 0])
        y = float(arrn[3 * j + 1])
        c = float(arrn[3 * j + 2])

        ox = float(orig[3 * j + 0])
        oy = float(orig[3 * j + 1])
        oc = float(orig[3 * j + 2])

        if oc >= conf_gate_dense and not (ox == 0 and oy == 0) and c >= conf_gate_dense:
            orig[3 * j + 0] = rx + x * s
            orig[3 * j + 1] = ry + y * s

    out[t] = orig

return out

def smooth_KPS_json_obj( data: Any, *, keep_face_untouched: bool = True, keep_hands_untouched: bool = True, filter_extra_people: Optional[bool] = None, ) -> Any: if not isinstance(data, list): raise ValueError("Expected top-level JSON to be a list of frames.")

if filter_extra_people is None:
    filter_extra_people = bool(FILTER_EXTRA_PEOPLE)

chosen_people: List[Optional[Dict[str, Any]]] = [None] * len(data)

if MAIN_PERSON_MODE == "longest_track":
    tracks = _build_tracks_over_video(data)
    main_tr = _pick_main_track(tracks)

    if main_tr is not None:
        for t in range(len(data)):
            if t in main_tr.frames:
                chosen_people[t] = main_tr.frames[t]
    else:
        prev_center: Optional[Tuple[float, float]] = None
        for i, frame in enumerate(data):
            if not isinstance(frame, dict):
                continue
            people = frame.get("people", [])
            if not isinstance(people, list) or len(people) == 0:
                continue
            chosen = _choose_single_person(people, prev_center)
            chosen_people[i] = chosen
            if chosen is not None:
                c = _body_center_from_pose(chosen.get("pose_keypoints_2d"))
                if c is not None:
                    prev_center = c
else:
    prev_center: Optional[Tuple[float, float]] = None
    for i, frame in enumerate(data):
        if not isinstance(frame, dict):
            continue
        people = frame.get("people", [])
        if not isinstance(people, list) or len(people) == 0:
            continue
        chosen = _choose_single_person(people, prev_center)
        chosen_people[i] = chosen
        if chosen is not None:
            c = _body_center_from_pose(chosen.get("pose_keypoints_2d"))
            if c is not None:
                prev_center = c

pose_seq: List[Optional[List[float]]] = []
for p in chosen_people:
    pose_seq.append(p.get("pose_keypoints_2d") if isinstance(p, dict) else None)

if SPATIAL_OUTLIER_FIX:
    pose_seq = [
        _suppress_spatial_outliers_in_pose_arr(arr, conf_gate=CONF_GATE_BODY) if arr is not None else None
        for arr in pose_seq
    ]

if GAP_FILL_ENABLED:
    pose_seq = _denoise_and_fill_gaps_pose_seq(
        pose_seq,
        conf_gate=CONF_GATE_BODY,
        min_run=MIN_RUN_FRAMES,
        max_gap=MAX_GAP_FRAMES,
    )

if TORSO_SYNC_ENABLED:
    pose_seq = _sync_group_appearances(
        pose_seq,
        group=TORSO_JOINTS,
        conf_gate=CONF_GATE_BODY,
        lookahead=TORSO_LOOKAHEAD_FRAMES,
    )

pose_seq = [
    (
        _suppress_isolated_joints_in_pose_arr(arr, conf_gate=CONF_GATE_BODY, keep=TORSO_JOINTS)
        if arr is not None
        else None
    )
    for arr in pose_seq
]

if MEDIAN3_ENABLED:
    pose_seq = _median3_pose_seq(pose_seq, conf_gate=CONF_GATE_BODY)

if SUPER_SMOOTH_ENABLED:
    pose_seq = _zero_lag_ema_pose_seq(pose_seq, alpha=SUPER_SMOOTH_ALPHA, conf_gate=SUPER_SMOOTH_MIN_CONF)

if ROOTSCALE_CARRY_ENABLED:
    pose_seq = _carry_pose_when_torso_missing(
        pose_seq,
        conf_gate=CARRY_CONF_GATE,
        max_carry=CARRY_MAX_FRAMES,
        anchor_joints=CARRY_ANCHOR_JOINTS,
        min_anchors=CARRY_MIN_ANCHORS,
    )

pose_seq = _force_full_torso_pair(
    pose_seq,
    conf_gate=CARRY_CONF_GATE,
    anchor_joints=CARRY_ANCHOR_JOINTS,
    min_anchors=CARRY_MIN_ANCHORS,
    max_lookback=240,
    fill_legs_with_hip=True,
    always_fill_if_one_hip=True,
)

face_seq: List[Optional[List[float]]] = []
lh_seq: List[Optional[List[float]]] = []
rh_seq: List[Optional[List[float]]] = []

for p in chosen_people:
    if isinstance(p, dict):
        face_seq.append(p.get("face_keypoints_2d"))
        lh_seq.append(p.get("hand_left_keypoints_2d"))
        rh_seq.append(p.get("hand_right_keypoints_2d"))
    else:
        face_seq.append(None)
        lh_seq.append(None)
        rh_seq.append(None)

if HANDS_SMOOTH_ENABLED and (not keep_hands_untouched):
    lh_seq = [
        _suppress_spatial_outliers_in_hand_arr(a, conf_gate=CONF_GATE_HAND) if a is not None else None
        for a in lh_seq
    ]
    rh_seq = [
        _suppress_spatial_outliers_in_hand_arr(a, conf_gate=CONF_GATE_HAND) if a is not None else None
        for a in rh_seq
    ]

    lh_seq = _remove_short_presence_runs_kps_seq(
        lh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT, min_run=MIN_HAND_RUN_FRAMES
    )
    rh_seq = _remove_short_presence_runs_kps_seq(
        rh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT, min_run=MIN_HAND_RUN_FRAMES
    )

    lh_seq = _zero_sparse_frames_kps_seq(
        lh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT
    )
    rh_seq = _zero_sparse_frames_kps_seq(
        rh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT
    )

    if DENSE_GAP_FILL_ENABLED:
        lh_seq = _denoise_and_fill_gaps_pose_seq(
            lh_seq, conf_gate=CONF_GATE_HAND, min_run=DENSE_MIN_RUN_FRAMES, max_gap=DENSE_MAX_GAP_FRAMES
        )
        rh_seq = _denoise_and_fill_gaps_pose_seq(
            rh_seq, conf_gate=CONF_GATE_HAND, min_run=DENSE_MIN_RUN_FRAMES, max_gap=DENSE_MAX_GAP_FRAMES
        )

if FACE_SMOOTH_ENABLED and (not keep_face_untouched):
    if DENSE_GAP_FILL_ENABLED:
        face_seq = _denoise_and_fill_gaps_pose_seq(
            face_seq, conf_gate=CONF_GATE_FACE, min_run=DENSE_MIN_RUN_FRAMES, max_gap=DENSE_MAX_GAP_FRAMES
        )

if FACE_SMOOTH_ENABLED and (not keep_face_untouched):
    face_seq = _smooth_dense_seq_anchored_to_body(
        face_seq,
        pose_seq,
        kind="face",
        conf_gate_dense=CONF_GATE_FACE,
        conf_gate_body=CONF_GATE_BODY,
        median3=DENSE_MEDIAN3_ENABLED,
        zero_lag_alpha=DENSE_SUPER_SMOOTH_ALPHA,
    )

if HANDS_SMOOTH_ENABLED and (not keep_hands_untouched):
    lh_seq = _smooth_dense_seq_anchored_to_body(
        lh_seq,
        pose_seq,
        kind="hand_left",
        conf_gate_dense=CONF_GATE_HAND,
        conf_gate_body=CONF_GATE_BODY,
        median3=DENSE_MEDIAN3_ENABLED,
        zero_lag_alpha=DENSE_SUPER_SMOOTH_ALPHA,
    )
    rh_seq = _smooth_dense_seq_anchored_to_body(
        rh_seq,
        pose_seq,
        kind="hand_right",
        conf_gate_dense=CONF_GATE_HAND,
        conf_gate_body=CONF_GATE_BODY,
        median3=DENSE_MEDIAN3_ENABLED,
        zero_lag_alpha=DENSE_SUPER_SMOOTH_ALPHA,
    )

out_frames = []
body_state: Optional[BodyState] = None

for i, frame in enumerate(data):
    if not isinstance(frame, dict):
        out_frames.append(frame)
        continue

    frame_out = copy.deepcopy(frame)
    chosen = chosen_people[i]

    if chosen is None:
        if filter_extra_people:
            frame_out["people"] = []
        out_frames.append(frame_out)
        continue

    p_out = copy.deepcopy(chosen)
    p_out["pose_keypoints_2d"] = pose_seq[i]

    pose_arr = p_out.get("pose_keypoints_2d")
    joints = (len(pose_arr) // 3) if isinstance(pose_arr, list) else 0
    if body_state is None:
        body_state = BodyState(joints if joints > 0 else 18)

    p_out["pose_keypoints_2d"] = _smooth_body_pose(p_out.get("pose_keypoints_2d"), body_state)

    if FACE_SMOOTH_ENABLED and (not keep_face_untouched):
        p_out["face_keypoints_2d"] = face_seq[i]
    else:
        p_out["face_keypoints_2d"] = chosen.get("face_keypoints_2d", p_out.get("face_keypoints_2d"))

    if HANDS_SMOOTH_ENABLED and (not keep_hands_untouched):
        p_out["hand_left_keypoints_2d"] = lh_seq[i]
        p_out["hand_right_keypoints_2d"] = rh_seq[i]
    else:
        p_out["hand_left_keypoints_2d"] = chosen.get("hand_left_keypoints_2d", p_out.get("hand_left_keypoints_2d"))
        p_out["hand_right_keypoints_2d"] = chosen.get(
            "hand_right_keypoints_2d", p_out.get("hand_right_keypoints_2d")
        )

    _pin_body_wrist_to_hand(
        p_out, side="left", conf_gate_body=CONF_GATE_BODY, conf_gate_hand=CONF_GATE_HAND, blend=1.0
    )
    _pin_body_wrist_to_hand(
        p_out, side="right", conf_gate_body=CONF_GATE_BODY, conf_gate_hand=CONF_GATE_HAND, blend=1.0
    )

    _fix_elbow_using_wrist(p_out, side="left", conf_gate=CONF_GATE_BODY)
    _fix_elbow_using_wrist(p_out, side="right", conf_gate=CONF_GATE_BODY)

    if filter_extra_people:
        frame_out["people"] = [p_out]
    else:
        orig_people = frame.get("people", [])
        if not isinstance(orig_people, list):
            frame_out["people"] = [p_out]
        else:
            replaced = False
            new_people = []
            for op in orig_people:
                if (not replaced) and (op is chosen):
                    new_people.append(p_out)
                    replaced = True
                else:
                    new_people.append(copy.deepcopy(op))
            if not replaced:
                new_people = [p_out] + [copy.deepcopy(op) for op in orig_people]
            frame_out["people"] = new_people

    out_frames.append(frame_out)

return out_frames

============================================================

=== END: smooth_KPS_json.py logic

============================================================

============================================================

=== START: render_pose_video.py logic (ported to frame render)

============================================================

OP_COLORS: List[Tuple[int, int, int]] = [ (255, 0, 0), (255, 85, 0), (255, 170, 0), (255, 255, 0), (170, 255, 0), (85, 255, 0), (0, 255, 0), (0, 255, 85), (0, 255, 170), (0, 255, 255), (0, 170, 255), (0, 85, 255), (0, 0, 255), (85, 0, 255), (170, 0, 255), (255, 0, 255), (255, 0, 170), (255, 0, 85), ]

BODY_EDGES: List[Tuple[int, int]] = [ (1, 2), (1, 5), (2, 3), (3, 4), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11), (11, 12), (12, 13), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), ]

BODY_EDGE_COLORS = OP_COLORS[: len(BODY_EDGES)] BODY_JOINT_COLORS = OP_COLORS

HAND_EDGES: List[Tuple[int, int]] = [ (0, 1), (1, 2), (2, 3), (3, 4), (0, 5), (5, 6), (6, 7), (7, 8), (0, 9), (9, 10), (10, 11), (11, 12), (0, 13), (13, 14), (14, 15), (15, 16), (0, 17), (17, 18), (18, 19), (19, 20), ]

def _valid_pt(x: float, y: float, c: float, conf_thresh: float) -> bool: return (c is not None) and (c >= conf_thresh) and not (x == 0 and y == 0)

def _hsv_to_bgr(h: float, s: float, v: float) -> Tuple[int, int, int]: H = int(np.clip(h, 0.0, 1.0) * 179.0) S = int(np.clip(s, 0.0, 1.0) * 255.0) V = int(np.clip(v, 0.0, 1.0) * 255.0) hsv = np.uint8([[[H, S, V]]]) bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)[0, 0] return int(bgr[0]), int(bgr[1]), int(bgr[2])

def _looks_normalized(points: List[Tuple[float, float, float]], conf_thresh: float) -> bool: valid = [(x, y, c) for (x, y, c) in points if _valid_pt(x, y, c, conf_thresh)] if not valid: return False in01 = sum(1 for (x, y, _) in valid if 0.0 <= x <= 1.0 and 0.0 <= y <= 1.0) return (in01 / float(len(valid))) >= 0.7

def _draw_body( canvas: np.ndarray, pose: List[Tuple[float, float, float]], conf_thresh: float, xinsr_stick_scaling: bool = False ) -> None: CH, CW = canvas.shape[:2] stickwidth = 2

valid = [(x, y, c) for (x, y, c) in pose if _valid_pt(x, y, c, conf_thresh)]
norm = False
if valid:
    in01 = sum(1 for (x, y, _) in valid if 0.0 <= x <= 1.0 and 0.0 <= y <= 1.0)
    norm = (in01 / float(len(valid))) >= 0.7

def to_px(x: float, y: float) -> Tuple[float, float]:
    if norm:
        return x * CW, y * CH
    return x, y

max_side = max(CW, CH)
if xinsr_stick_scaling:
    stick_scale = 1 if max_side < 500 else min(2 + (max_side // 1000), 7)
else:
    stick_scale = 1

for idx, (a, b) in enumerate(BODY_EDGES):
    if a >= len(pose) or b >= len(pose):
        continue

    ax, ay, ac = pose[a]
    bx, by, bc = pose[b]
    if not (_valid_pt(ax, ay, ac, conf_thresh) and _valid_pt(bx, by, bc, conf_thresh)):
        continue

    ax, ay = to_px(ax, ay)
    bx, by = to_px(bx, by)

    base = BODY_EDGE_COLORS[idx] if idx < len(BODY_EDGE_COLORS) else (255, 255, 255)

    X = np.array([ay, by], dtype=np.float32)
    Y = np.array([ax, bx], dtype=np.float32)

    mX = float(np.mean(X))
    mY = float(np.mean(Y))
    length = float(np.hypot(X[0] - X[1], Y[0] - Y[1]))
    if length < 1.0:
        continue

    angle = math.degrees(math.atan2(X[0] - X[1], Y[0] - Y[1]))

    polygon = cv2.ellipse2Poly(
        (int(mY), int(mX)),
        (int(length / 2), int(stickwidth * stick_scale)),
        int(angle),
        0,
        360,
        1,
    )

    cv2.fillConvexPoly(
        canvas,
        polygon,
        (int(base[0] * 0.6), int(base[1] * 0.6), int(base[2] * 0.6)),
    )

for j, (x, y, c) in enumerate(pose):
    if not _valid_pt(x, y, c, conf_thresh):
        continue
    x, y = to_px(x, y)
    col = BODY_JOINT_COLORS[j] if j < len(BODY_JOINT_COLORS) else (255, 255, 255)
    cv2.circle(canvas, (int(x), int(y)), 2, col, thickness=-1)

def _draw_hand(canvas: np.ndarray, hand: List[Tuple[float, float, float]], conf_thresh: float) -> None: if not hand or len(hand) < 21: return

CH, CW = canvas.shape[:2]
norm = _looks_normalized(hand, conf_thresh)

def to_px(x: float, y: float) -> Tuple[float, float]:
    return (x * CW, y * CH) if norm else (x, y)

n_edges = len(HAND_EDGES)
for i, (a, b) in enumerate(HAND_EDGES):
    x1, y1, c1 = hand[a]
    x2, y2, c2 = hand[b]
    if _valid_pt(x1, y1, c1, conf_thresh) and _valid_pt(x2, y2, c2, conf_thresh):
        x1, y1 = to_px(x1, y1)
        x2, y2 = to_px(x2, y2)
        bgr = _hsv_to_bgr(i / float(n_edges), 1.0, 1.0)
        cv2.line(canvas, (int(x1), int(y1)), (int(x2), int(y2)), bgr, 1, cv2.LINE_AA)

for x, y, c in hand:
    if _valid_pt(x, y, c, conf_thresh):
        x, y = to_px(x, y)
        cv2.circle(canvas, (int(x), int(y)), 1, (0, 0, 255), -1, cv2.LINE_AA)

def _draw_face(canvas: np.ndarray, face: List[Tuple[float, float, float]], conf_thresh: float) -> None: if not face: return

CH, CW = canvas.shape[:2]
norm = _looks_normalized(face, conf_thresh)

def to_px(x: float, y: float) -> Tuple[float, float]:
    return (x * CW, y * CH) if norm else (x, y)

for x, y, c in face:
    if _valid_pt(x, y, c, conf_thresh):
        x, y = to_px(x, y)
        cv2.circle(canvas, (int(x), int(y)), 0, (255, 255, 255), -1, cv2.LINE_AA)

def _draw_pose_frame_full( w: int, h: int, person: Dict[str, Any], conf_thresh_body: float = 0.10, conf_thresh_hands: float = 0.10, conf_thresh_face: float = 0.10, ) -> np.ndarray: img = np.zeros((h, w, 3), dtype=np.uint8)

pose = _reshape_keypoints_2d(person.get("pose_keypoints_2d") or [])
face = _reshape_keypoints_2d(person.get("face_keypoints_2d") or [])
hand_l = _reshape_keypoints_2d(person.get("hand_left_keypoints_2d") or [])
hand_r = _reshape_keypoints_2d(person.get("hand_right_keypoints_2d") or [])

if pose:
    _draw_body(img, pose, conf_thresh_body)
if hand_l:
    _draw_hand(img, hand_l, conf_thresh_hands)
if hand_r:
    _draw_hand(img, hand_r, conf_thresh_hands)
if face:
    _draw_face(img, face, conf_thresh_face)

return img

============================================================

=== END: render_pose_video.py logic

============================================================

============================================================

ComfyUI mappings

============================================================

NODE_CLASS_MAPPINGS = { "TSPoseDataSmoother": KPSSmoothPoseDataAndRender, }

NODE_DISPLAY_NAME_MAPPINGS = { "TSPoseDataSmoother": "KPS: Smooth + Render (pose_data/PKL)", }

Downloads last month

-

Downloads are not tracked for this model. How to track
Inference Providers NEW
This model isn't deployed by any Inference Provider. 🙋 Ask for provider support