PartPacker-CPU / app.py
cpuai's picture
Update app.py
cfef1cb verified
import os
import contextlib
import functools
from datetime import datetime
import cv2
import gradio as gr
import kiui
import numpy as np
import rembg
import torch
import torch.nn as nn
import torch.nn.functional as F
import trimesh
from huggingface_hub import hf_hub_download
try:
import spaces
except ImportError:
class spaces:
class GPU:
def __init__(self, duration=60):
self.duration = duration
def __call__(self, func):
return func
from flow.model import Model
from flow.configs.schema import ModelConfig
from flow.utils import get_random_color, recenter_foreground
from vae.utils import postprocess_mesh
# =========================================================
# CPU / dtype 基础设置
# =========================================================
DEVICE = torch.device("cpu")
DTYPE = torch.float32
# 线程数可按 HF CPU Space 机器情况调整
CPU_THREADS = int(os.environ.get("CPU_THREADS", "2"))
torch.set_num_threads(CPU_THREADS)
torch.set_num_interop_threads(max(1, min(2, CPU_THREADS)))
# 显式设默认浮点 dtype 为 float32
torch.set_default_dtype(torch.float32)
# 对 CPU 推理更稳妥
try:
torch.set_grad_enabled(False)
except Exception:
pass
TRIMESH_GLB_EXPORT = np.array(
[[0, 1, 0], [0, 0, 1], [1, 0, 0]],
dtype=np.float32
)
MAX_SEED = np.iinfo(np.int32).max
bg_remover = rembg.new_session()
# =========================================================
# 工具函数:递归转换任意对象中的浮点 Tensor 为 float32
# =========================================================
def to_cpu_fp32(obj):
"""
递归把对象中的浮点 Tensor 转成 CPU + float32。
支持 Tensor / dict / list / tuple。
"""
if torch.is_tensor(obj):
if obj.is_floating_point():
return obj.to(device=DEVICE, dtype=torch.float32, non_blocking=False)
return obj.to(device=DEVICE, non_blocking=False)
if isinstance(obj, dict):
return {k: to_cpu_fp32(v) for k, v in obj.items()}
if isinstance(obj, list):
return [to_cpu_fp32(v) for v in obj]
if isinstance(obj, tuple):
return tuple(to_cpu_fp32(v) for v in obj)
return obj
# =========================================================
# 工具函数:强制整个模块转 float32
# =========================================================
def force_module_fp32(module: torch.nn.Module):
"""
递归把模块参数和 buffer 都转到 CPU + float32。
"""
module.to(device=DEVICE)
module.float()
for child in module.children():
force_module_fp32(child)
# 处理 buffer
for name, buf in module.named_buffers(recurse=False):
if torch.is_tensor(buf) and buf.is_floating_point():
setattr(module, name, buf.to(device=DEVICE, dtype=torch.float32))
return module
# =========================================================
# 工具函数:禁用 CPU autocast
# =========================================================
@contextlib.contextmanager
def disable_cpu_autocast():
"""
显式关闭 CPU autocast,防止内部偷偷切到 bfloat16。
"""
try:
with torch.autocast(device_type="cpu", enabled=False):
yield
except Exception:
# 某些环境/版本可能不支持该写法,直接退化为普通上下文
yield
# =========================================================
# 兜底补丁 1:全局修补 F.linear
# =========================================================
def patch_functional_linear():
"""
给 torch.nn.functional.linear 打补丁:
如果 input 和 weight dtype 不一致,自动把 input 转成 weight.dtype。
这是最后一道保险。
"""
if getattr(F.linear, "_fp32_safe_patched", False):
return
original_linear = F.linear
@functools.wraps(original_linear)
def linear_fp32_safe(input, weight, bias=None):
if (
torch.is_tensor(input)
and torch.is_tensor(weight)
and input.device.type == "cpu"
and input.is_floating_point()
and weight.is_floating_point()
and input.dtype != weight.dtype
):
input = input.to(dtype=weight.dtype)
if (
bias is not None
and torch.is_tensor(bias)
and bias.is_floating_point()
and torch.is_tensor(weight)
and weight.is_floating_point()
and bias.dtype != weight.dtype
):
bias = bias.to(dtype=weight.dtype)
return original_linear(input, weight, bias)
linear_fp32_safe._fp32_safe_patched = True
F.linear = linear_fp32_safe
# =========================================================
# 兜底补丁 2:给常见模块加 forward pre-hook
# =========================================================
def register_dtype_guard_hooks(root_module: nn.Module):
"""
给常见算子模块注册前置 hook,在 forward 入口把输入对齐到参数 dtype。
"""
hooks = []
guarded_types = (
nn.Linear,
nn.Conv1d,
nn.Conv2d,
nn.Conv3d,
nn.LayerNorm,
nn.GroupNorm,
nn.BatchNorm1d,
nn.BatchNorm2d,
nn.BatchNorm3d,
nn.MultiheadAttention,
)
def cast_obj_to_dtype(obj, dtype, device):
if torch.is_tensor(obj):
if obj.is_floating_point():
return obj.to(device=device, dtype=dtype)
return obj.to(device=device)
if isinstance(obj, dict):
return {k: cast_obj_to_dtype(v, dtype, device) for k, v in obj.items()}
if isinstance(obj, list):
return [cast_obj_to_dtype(v, dtype, device) for v in obj]
if isinstance(obj, tuple):
return tuple(cast_obj_to_dtype(v, dtype, device) for v in obj)
return obj
def pre_hook(module, inputs):
ref_tensor = None
# 先从参数里找参考 dtype
for p in module.parameters(recurse=False):
if torch.is_tensor(p) and p.is_floating_point():
ref_tensor = p
break
# 参数没有,再从 buffer 里找
if ref_tensor is None:
for b in module.buffers(recurse=False):
if torch.is_tensor(b) and b.is_floating_point():
ref_tensor = b
break
if ref_tensor is None:
return inputs
return cast_obj_to_dtype(inputs, ref_tensor.dtype, ref_tensor.device)
for submodule in root_module.modules():
if isinstance(submodule, guarded_types):
hooks.append(submodule.register_forward_pre_hook(pre_hook))
return hooks
# =========================================================
# 兜底补丁 3:包装 forward,统一禁用 autocast + 输入转 fp32
# =========================================================
def wrap_forward_fp32(module: nn.Module):
"""
包装模块的 forward:
1. 进入 forward 前先把输入递归转为 float32
2. forward 期间禁用 CPU autocast
"""
if getattr(module, "_forward_fp32_wrapped", False):
return
original_forward = module.forward
@functools.wraps(original_forward)
def forward_fp32_safe(*args, **kwargs):
args = to_cpu_fp32(args)
kwargs = to_cpu_fp32(kwargs)
with disable_cpu_autocast():
out = original_forward(*args, **kwargs)
return to_cpu_fp32(out)
module.forward = forward_fp32_safe
module._forward_fp32_wrapped = True
# =========================================================
# 下载模型
# =========================================================
flow_ckpt_path = hf_hub_download(
repo_id="nvidia/PartPacker",
filename="flow.pt"
)
vae_ckpt_path = hf_hub_download(
repo_id="nvidia/PartPacker",
filename="vae.pt"
)
# =========================================================
# 模型配置
# =========================================================
model_config = ModelConfig(
vae_conf="vae.configs.part_woenc",
vae_ckpt_path=vae_ckpt_path,
qknorm=True,
qknorm_type="RMSNorm",
use_pos_embed=False,
dino_model="dinov2_vitg14",
hidden_dim=1536,
flow_shift=3.0,
logitnorm_mean=1.0,
logitnorm_std=1.0,
latent_size=4096,
use_parts=True,
)
# =========================================================
# 初始化模型(CPU + float32)
# =========================================================
print("正在加载模型到 CPU ...")
patch_functional_linear()
model = Model(model_config)
model.eval()
model.to(DEVICE)
# 显式按 CPU 加载权重
# 某些环境下 weights_only=True 不兼容时,可退回普通 torch.load
try:
ckpt_dict = torch.load(flow_ckpt_path, map_location=DEVICE, weights_only=True)
except TypeError:
ckpt_dict = torch.load(flow_ckpt_path, map_location=DEVICE)
model.load_state_dict(ckpt_dict, strict=True)
# 强制全模型转 float32
force_module_fp32(model)
model.eval()
# 包装 forward,彻底关闭 CPU autocast
wrap_forward_fp32(model)
if hasattr(model, "dit"):
wrap_forward_fp32(model.dit)
if hasattr(model, "vae"):
wrap_forward_fp32(model.vae)
# 给模型注册 dtype 保护 hook
_DTYPE_GUARD_HOOKS = []
_DTYPE_GUARD_HOOKS.extend(register_dtype_guard_hooks(model))
if hasattr(model, "vae"):
_DTYPE_GUARD_HOOKS.extend(register_dtype_guard_hooks(model.vae))
print("模型加载完成。")
try:
print("主模型 dtype:", next(model.parameters()).dtype)
except StopIteration:
print("主模型没有可见参数。")
def get_random_seed(randomize_seed, seed):
if randomize_seed:
seed = np.random.randint(0, MAX_SEED)
return int(seed)
def process_image(image_path):
"""
处理输入图片:
1. 读图
2. 没有 alpha 就自动去背景
3. 主体居中
4. 缩放到模型输入尺寸
"""
if image_path is None:
raise gr.Error("请先上传图片。")
image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
if image is None:
raise gr.Error("图片读取失败,请上传有效图片。")
if image.ndim == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGBA)
if image.shape[-1] == 4:
image = cv2.cvtColor(image, cv2.COLOR_BGRA2RGBA)
else:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = rembg.remove(image, session=bg_remover)
mask = image[..., -1] > 0
image = recenter_foreground(image, mask, border_ratio=0.1)
image = cv2.resize(image, (518, 518), interpolation=cv2.INTER_AREA)
return image
def process_3d(
input_image,
num_steps=10,
cfg_scale=7.0,
grid_res=128,
seed=42,
simplify_mesh=True,
target_num_faces=20000
):
"""
CPU 版 3D 生成
"""
if input_image is None:
raise gr.Error("请先上传并处理图片。")
try:
kiui.seed_everything(int(seed))
os.makedirs("output", exist_ok=True)
output_glb_path = f"output/partpacker_{datetime.now().strftime('%Y%m%d_%H%M%S')}.glb"
# -------------------------------------------------
# 1) RGBA -> RGB 白底合成 -> float32
# -------------------------------------------------
image = input_image.astype(np.float32) / 255.0
image = image[..., :3] * image[..., 3:4] + (1.0 - image[..., 3:4])
image_tensor = (
torch.from_numpy(image)
.permute(2, 0, 1)
.contiguous()
.unsqueeze(0)
.to(device=DEVICE, dtype=torch.float32)
)
data = {
"cond_images": image_tensor
}
data = to_cpu_fp32(data)
# -------------------------------------------------
# 2) 推理前再次强制模型为 float32
# -------------------------------------------------
force_module_fp32(model)
model.eval()
if hasattr(model, "vae"):
force_module_fp32(model.vae)
model.vae.eval()
# -------------------------------------------------
# 3) 主模型推理:显式禁用 CPU autocast
# -------------------------------------------------
with torch.inference_mode():
with disable_cpu_autocast():
results = model(
data,
num_steps=int(num_steps),
cfg_scale=float(cfg_scale)
)
results = to_cpu_fp32(results)
latent = results.get("latent", None)
if not isinstance(latent, torch.Tensor):
raise gr.Error("模型输出 latent 异常。")
latent = latent.to(device=DEVICE, dtype=torch.float32).contiguous()
# -------------------------------------------------
# 4) VAE 解码:再次显式禁用 CPU autocast
# -------------------------------------------------
data_part0 = {
"latent": latent[:, : model.config.latent_size, :].contiguous()
}
data_part1 = {
"latent": latent[:, model.config.latent_size:, :].contiguous()
}
data_part0 = to_cpu_fp32(data_part0)
data_part1 = to_cpu_fp32(data_part1)
with torch.inference_mode():
with disable_cpu_autocast():
results_part0 = model.vae(data_part0, resolution=int(grid_res))
results_part1 = model.vae(data_part1, resolution=int(grid_res))
results_part0 = to_cpu_fp32(results_part0)
results_part1 = to_cpu_fp32(results_part1)
if not simplify_mesh:
target_num_faces = -1
parts = []
# -------------------------------------------------
# 5) part 0 mesh
# -------------------------------------------------
vertices, faces = results_part0["meshes"][0]
vertices = np.asarray(vertices, dtype=np.float32)
faces = np.asarray(faces, dtype=np.int64)
mesh_part0 = trimesh.Trimesh(vertices, faces, process=False)
mesh_part0.vertices = mesh_part0.vertices @ TRIMESH_GLB_EXPORT.T
mesh_part0 = postprocess_mesh(mesh_part0, int(target_num_faces))
parts.extend(mesh_part0.split(only_watertight=False))
# -------------------------------------------------
# 6) part 1 mesh
# -------------------------------------------------
vertices, faces = results_part1["meshes"][0]
vertices = np.asarray(vertices, dtype=np.float32)
faces = np.asarray(faces, dtype=np.int64)
mesh_part1 = trimesh.Trimesh(vertices, faces, process=False)
mesh_part1.vertices = mesh_part1.vertices @ TRIMESH_GLB_EXPORT.T
mesh_part1 = postprocess_mesh(mesh_part1, int(target_num_faces))
parts.extend(mesh_part1.split(only_watertight=False))
if len(parts) == 0:
raise gr.Error("没有生成有效网格,请换一张更清晰、背景更简单的图片。")
for j, part in enumerate(parts):
part.visual.vertex_colors = get_random_color(j, use_float=True)
scene = trimesh.Scene(parts)
scene.export(output_glb_path)
return output_glb_path
except gr.Error:
raise
except Exception as e:
raise gr.Error(
"CPU 生成失败:"
+ str(e)
+ "\n\n建议:\n"
"1. Inference Steps 先设为 10\n"
"2. Grid Resolution 先设为 128\n"
"3. 勾选 Simplify Mesh\n"
"4. Target Face Count 设为 20000\n"
"5. 使用主体清晰、背景简单的 PNG 图片"
)
_TITLE = "🎨 Image to 3D Model - CPU Version"
_DESCRIPTION = """
### CPU 版说明
这是适配 Hugging Face CPU Space 的版本。
### 建议参数
- Inference Steps:10
- CFG Scale:7.0
- Grid Resolution:128
- Simplify Mesh:开启
- Target Face Count:20000
### 注意
该模型原本更适合 GPU,CPU 下会比较慢。
"""
block = gr.Blocks(title=_TITLE).queue(max_size=2)
with block:
gr.Markdown("# " + _TITLE)
gr.Markdown(_DESCRIPTION)
with gr.Row():
with gr.Column():
input_image = gr.Image(
label="上传图片",
type="filepath"
)
seg_image = gr.Image(
label="处理后图片",
type="numpy",
interactive=False,
image_mode="RGBA"
)
with gr.Accordion("高级设置", open=False):
num_steps = gr.Slider(
label="Inference Steps",
minimum=1,
maximum=30,
step=1,
value=10
)
cfg_scale = gr.Slider(
label="CFG Scale",
minimum=2.0,
maximum=10.0,
step=0.1,
value=7.0
)
input_grid_res = gr.Slider(
label="Grid Resolution",
minimum=64,
maximum=256,
step=1,
value=128
)
with gr.Row():
randomize_seed = gr.Checkbox(label="随机种子", value=True)
seed = gr.Slider(
label="Seed",
minimum=0,
maximum=MAX_SEED,
step=1,
value=0
)
with gr.Row():
simplify_mesh = gr.Checkbox(label="简化网格", value=True)
target_num_faces = gr.Slider(
label="目标面数",
minimum=5000,
maximum=50000,
step=1000,
value=20000
)
button_gen = gr.Button("生成 3D 模型", variant="primary")
with gr.Column():
output_model = gr.Model3D(label="3D 预览", height=512)
with gr.Row():
gr.Examples(
examples=[
["examples/rabbit.png"],
["examples/robot.png"],
["examples/teapot.png"],
],
fn=process_image,
inputs=[input_image],
outputs=[seg_image],
cache_examples=False
)
button_gen.click(
fn=process_image,
inputs=[input_image],
outputs=[seg_image]
).then(
fn=get_random_seed,
inputs=[randomize_seed, seed],
outputs=[seed]
).then(
fn=process_3d,
inputs=[
seg_image,
num_steps,
cfg_scale,
input_grid_res,
seed,
simplify_mesh,
target_num_faces
],
outputs=[output_model]
)
block.launch(
server_name="0.0.0.0",
server_port=int(os.environ.get("PORT", 7860))
)