Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI Server for Scene Selection and Video Composition | |
| Dynamically generates video from manifest without hardcoded labels | |
| """ | |
| from fastapi import FastAPI, HTTPException, File, UploadFile, Form | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Any, Optional | |
| import os | |
| import json | |
| import shutil | |
| from pathlib import Path | |
| import subprocess | |
| import sys | |
| import requests | |
| import random | |
| from PIL import Image | |
| from io import BytesIO | |
| from huggingface_hub import HfApi, login | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Pydantic Models | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SceneRequest(BaseModel): | |
| """Scene metadata from manifest""" | |
| label: str | |
| image_query: str | |
| class ManifestRequest(BaseModel): | |
| """Manifest JSON format from client""" | |
| title: str | |
| scenes: List[SceneRequest] | |
| class VideoResponse(BaseModel): | |
| """Response after video generation""" | |
| status: str | |
| message: str | |
| output_path: Optional[str] = None | |
| size_mb: Optional[float] = None | |
| duration_s: Optional[float] = None | |
| class PromptRequest(BaseModel): | |
| """Request with user prompt to generate video from scratch""" | |
| prompt: str | |
| title: Optional[str] = None | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FastAPI App | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="TrendClip Video Composer", | |
| description="Dynamic video composition from manifest without hardcoded configs", | |
| version="2.0" | |
| ) | |
| BASE_DIR = Path(__file__).parent | |
| CANDIDATES_DIR = BASE_DIR / "candidates" | |
| SELECTED_DIR = BASE_DIR / "selected" | |
| RENDERS_DIR = BASE_DIR / "renders" | |
| # Ensure directories exist | |
| CANDIDATES_DIR.mkdir(exist_ok=True) | |
| SELECTED_DIR.mkdir(exist_ok=True) | |
| RENDERS_DIR.mkdir(exist_ok=True) | |
| # Mount static files (UI) | |
| STATIC_DIR = BASE_DIR / "static" | |
| if STATIC_DIR.exists(): | |
| app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Scene Configuration Generation | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Intro scene config (4.7s, 95pt font) | |
| INTRO_CONFIG = { | |
| "duration_s": 4.7, | |
| "motion": {"type": "slow_push_in", "scale_start": 1.0, "scale_end": 1.08}, | |
| "text": {"type": "center_stroke_pop", "entry_frame": 2, "hold_frames": 125, "font_size": 95, "align": "center"}, | |
| "grade": {"crush_blacks": 15, "contrast": 1.15}, | |
| "transition": {"type": "hard_cut", "frames": 1}, | |
| } | |
| # Scene templates for subsequent scenes (2.3s, 110pt font) | |
| SCENES_TEMPLATES = [ | |
| { | |
| "duration_s": 2.3, | |
| "motion": {"type": "snap_zoom", "scale_start": 1.0, "scale_end": 1.12}, | |
| "text": {"type": "center_pop", "entry_frame": 0, "hold_frames": 69, "font_size": 110, "align": "center"}, | |
| "grade": {"warm_tint": True, "lift_mids": 10}, | |
| "transition": {"type": "whip_pan_right", "frames": 4}, | |
| }, | |
| { | |
| "duration_s": 2.3, | |
| "motion": {"type": "static"}, | |
| "text": {"type": "center_fade_pop", "entry_frame": 2, "hold_frames": 66, "font_size": 110, "align": "center"}, | |
| "grade": {"desaturate": True, "lift_blacks": 5}, | |
| "transition": {"type": "whip_pan_right", "frames": 4}, | |
| }, | |
| { | |
| "duration_s": 2.3, | |
| "motion": {"type": "static"}, | |
| "text": {"type": "center_fade_pop", "entry_frame": 2, "hold_frames": 66, "font_size": 110, "align": "center"}, | |
| "grade": {"cool_tint": True, "highlights": -15}, | |
| "transition": {"type": "whip_pan_right", "frames": 4}, | |
| }, | |
| { | |
| "duration_s": 2.3, | |
| "motion": {"type": "static"}, | |
| "text": {"type": "center_fade_pop", "entry_frame": 2, "hold_frames": 66, "font_size": 110, "align": "center"}, | |
| "grade": {"soft_pink": True, "lift_mids": 15}, | |
| "transition": {"type": "whip_pan_right", "frames": 4}, | |
| }, | |
| { | |
| "duration_s": 2.3, | |
| "motion": {"type": "static"}, | |
| "text": {"type": "center_fade_pop", "entry_frame": 2, "hold_frames": 66, "font_size": 110, "align": "center"}, | |
| "grade": {"indoor_warm": True, "lift_shadows": 8}, | |
| "transition": {"type": "whip_pan_right", "frames": 4}, | |
| }, | |
| { | |
| "duration_s": 2.3, | |
| "motion": {"type": "static"}, | |
| "text": {"type": "center_fade_pop", "entry_frame": 2, "hold_frames": 66, "font_size": 110, "align": "center"}, | |
| "grade": {"teal_orange": True, "crush_blacks": 10}, | |
| "transition": {"type": "whip_pan_right", "frames": 4}, | |
| }, | |
| { | |
| "duration_s": 2.3, | |
| "motion": {"type": "static"}, | |
| "text": {"type": "center_fade_pop", "entry_frame": 2, "hold_frames": 66, "font_size": 110, "align": "center"}, | |
| "grade": {"dark_moody": True, "crush_blacks": 20, "desaturate": 15}, | |
| "transition": {"type": "whip_pan_right", "frames": 4}, | |
| }, | |
| { | |
| "duration_s": 2.3, | |
| "motion": {"type": "static"}, | |
| "text": {"type": "center_fade_pop", "entry_frame": 2, "hold_frames": 66, "font_size": 110, "align": "center"}, | |
| "grade": {"warm_indoor": True, "soft_glow": True, "lift_mids": 12}, | |
| "transition": {"type": "end_fade_black", "frames": 30}, | |
| }, | |
| ] | |
| def generate_scene_config(manifest: ManifestRequest) -> list: | |
| """Generate SCENE_CONFIG from manifest with title as intro slide.""" | |
| config = [] | |
| # Scene 0: Title as intro (4.7s, 95pt) | |
| title_cfg = { | |
| "idx": 0, | |
| "label": manifest.title.upper(), | |
| } | |
| title_cfg.update(INTRO_CONFIG) | |
| config.append(title_cfg) | |
| # Scenes 1+: Manifest scenes with templates | |
| for idx, scene in enumerate(manifest.scenes, start=1): | |
| # Extract label and convert to UPPERCASE for captions | |
| label = scene.label.upper() | |
| scene_cfg = { | |
| "idx": idx, | |
| "label": label, | |
| } | |
| # Use templated config for subsequent scenes (cycle through templates) | |
| template_idx = min(idx - 1, len(SCENES_TEMPLATES) - 1) | |
| scene_cfg.update(SCENES_TEMPLATES[template_idx]) | |
| config.append(scene_cfg) | |
| return config | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Upload to HuggingFace Dataset | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def upload_video_to_hf(video_path: Path, video_name: str) -> dict: | |
| """ | |
| Upload generated video to HuggingFace dataset. | |
| Args: | |
| video_path: Path to the MP4 file | |
| video_name: Name for the video in the dataset | |
| Returns: | |
| dict with status and message | |
| """ | |
| try: | |
| # Get HF token from environment | |
| hf_token = os.getenv("HF_TOKEN", "") | |
| if not hf_token: | |
| return { | |
| "status": "warning", | |
| "message": "HF_TOKEN not set, skipping upload", | |
| "uploaded": False | |
| } | |
| # Initialize HF API | |
| api = HfApi(token=hf_token) | |
| # Upload file to dataset | |
| repo_id = "factorstudios/AA" | |
| # Create a unique filename with timestamp | |
| timestamp = __import__('time').strftime("%Y%m%d_%H%M%S") | |
| filename = f"{timestamp}_{video_name}.mp4" | |
| # Upload to dataset | |
| print(f"[HF UPLOAD] Uploading {video_name} to {repo_id}...") | |
| api.upload_file( | |
| path_or_fileobj=str(video_path), | |
| path_in_repo=filename, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Add generated video: {video_name}" | |
| ) | |
| print(f"[HF UPLOAD] Successfully uploaded to {repo_id}/{filename}") | |
| return { | |
| "status": "success", | |
| "message": f"Video uploaded to HuggingFace dataset: {repo_id}/{filename}", | |
| "uploaded": True, | |
| "dataset_path": f"{repo_id}/{filename}" | |
| } | |
| except Exception as e: | |
| print(f"[HF UPLOAD ERROR] {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Failed to upload to HuggingFace: {str(e)}", | |
| "uploaded": False | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Endpoints | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Endpoints | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def generate_video( | |
| manifest: str = Form(...), | |
| files: List[UploadFile] = File(...) | |
| ): | |
| """ | |
| Full pipeline: Upload candidates + manifest β select best β compose β return MP4. | |
| Workflow: | |
| 1. Accepts multiple image files organized by scene (scene_0/img1.jpg, scene_0/img2.jpg, etc.) | |
| 2. Saves to candidates/ folder | |
| 3. Calls scene selector to pick best from each | |
| 4. Composes video | |
| 5. Returns MP4 file | |
| Form parameters: | |
| - manifest: JSON string with {"title": str, "scenes": [{"label": str, "image_query": str}]} | |
| - files: Uploaded image files (can be multiple per scene, sent as scene_N/filename) | |
| Returns: MP4 video file (video/mp4) | |
| """ | |
| try: | |
| # Step 1: Parse manifest | |
| manifest_data = json.loads(manifest) | |
| manifest_req = ManifestRequest(**manifest_data) | |
| # Step 2: Clean and prepare candidates directory | |
| if CANDIDATES_DIR.exists(): | |
| shutil.rmtree(CANDIDATES_DIR) | |
| CANDIDATES_DIR.mkdir(exist_ok=True) | |
| # Step 3: Save uploaded files to candidates/ organized by scene | |
| files_saved = {} | |
| for file in files: | |
| if file.filename: | |
| # Parse filename to extract scene index | |
| # Format: "scene_0/image1.jpg" or just "image1.jpg" | |
| parts = file.filename.split("/") | |
| if len(parts) == 2: | |
| scene_folder = parts[0] # "scene_0" | |
| filename = parts[1] # "image1.jpg" | |
| else: | |
| # Fallback: use filename directly | |
| scene_folder = "scene_0" | |
| filename = file.filename | |
| # Create scene folder if needed | |
| scene_path = CANDIDATES_DIR / scene_folder | |
| scene_path.mkdir(parents=True, exist_ok=True) | |
| # Save file | |
| file_path = scene_path / filename | |
| content = await file.read() | |
| with open(file_path, "wb") as f: | |
| f.write(content) | |
| # Track saved files | |
| if scene_folder not in files_saved: | |
| files_saved[scene_folder] = 0 | |
| files_saved[scene_folder] += 1 | |
| if len(files_saved) == 0: | |
| raise Exception("No files were saved") | |
| # Step 4: Call scene selector to pick best from each candidate folder | |
| select_result = await _select_scenes(manifest_req, CANDIDATES_DIR) | |
| if select_result["status"] != "success": | |
| raise Exception(select_result.get("message", "Scene selection failed")) | |
| # Step 5: Compose video with selected images and manifest | |
| compose_result = await _compose(manifest_req) | |
| if compose_result["status"] != "success": | |
| raise Exception(compose_result.get("message", "Composition failed")) | |
| # Step 6: Return the MP4 file | |
| output_file = Path(compose_result["output_path"]) | |
| if not output_file.exists(): | |
| raise Exception("Output video file not found") | |
| return FileResponse( | |
| path=output_file, | |
| media_type="video/mp4", | |
| filename="video.mp4" | |
| ) | |
| except json.JSONDecodeError as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid manifest JSON: {str(e)}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def select_scenes(manifest: ManifestRequest): | |
| """ | |
| Endpoint for just scene selection (without upload). | |
| Assumes candidates are already in candidates/ folder. | |
| """ | |
| try: | |
| result = await _select_scenes(manifest, CANDIDATES_DIR) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def compose(manifest: ManifestRequest): | |
| """ | |
| Endpoint for just composition. | |
| Assumes selected images are already in selected/ folder. | |
| """ | |
| try: | |
| result = await _compose(manifest) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def _select_scenes(manifest: ManifestRequest, source_dir: Path): | |
| """ | |
| Internal: Select scenes from source directory. | |
| Copies best image from each scene folder to selected/ folder. | |
| Includes title image from scene_0 + content scenes from scene_1, scene_2, etc. | |
| """ | |
| try: | |
| # Clean and recreate selected directory | |
| if SELECTED_DIR.exists(): | |
| shutil.rmtree(SELECTED_DIR) | |
| SELECTED_DIR.mkdir(exist_ok=True) | |
| selected_count = 0 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SELECT FROM TITLE (scene_0) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| title_folder = source_dir / "scene_0" | |
| if title_folder.exists() and title_folder.is_dir(): | |
| images = sorted( | |
| list(title_folder.glob("*.jpg")) + list(title_folder.glob("*.png")), | |
| key=lambda p: p.stat().st_size, | |
| reverse=True | |
| ) | |
| if images: | |
| dest = SELECTED_DIR / "scene_00.jpg" # Use 00 for scene_0 | |
| shutil.copy2(images[0], dest) | |
| selected_count += 1 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SELECT FROM CONTENT SCENES (scene_1, scene_2, etc) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # For each scene, find and copy its image | |
| for i, scene in enumerate(manifest.scenes): | |
| actual_i = i + 1 # scene_1, scene_2, etc | |
| # Try multiple naming conventions | |
| scene_folder = None | |
| for pattern in [f"scene_{actual_i}", f"scene_{actual_i:02d}", f"{actual_i}", f"scene{actual_i}"]: | |
| potential = source_dir / pattern | |
| if potential.exists(): | |
| scene_folder = potential | |
| break | |
| # If no folder, look for files named with scene index | |
| if scene_folder is None: | |
| images = list(source_dir.glob(f"*scene*{actual_i}*")) + list( | |
| source_dir.glob(f"{actual_i:02d}*") | |
| ) | |
| if images: | |
| dest = SELECTED_DIR / f"scene_{actual_i:02d}.jpg" | |
| shutil.copy2(images[0], dest) | |
| selected_count += 1 | |
| continue | |
| # If folder found, get best image | |
| if scene_folder and scene_folder.is_dir(): | |
| images = sorted( | |
| list(scene_folder.glob("*.jpg")) + list(scene_folder.glob("*.png")), | |
| key=lambda p: p.stat().st_size, | |
| reverse=True | |
| ) | |
| if images: | |
| dest = SELECTED_DIR / f"scene_{actual_i:02d}.jpg" | |
| shutil.copy2(images[0], dest) | |
| selected_count += 1 | |
| total_expected = len(manifest.scenes) + 1 # title + content scenes | |
| if selected_count == 0: | |
| raise Exception( | |
| f"No images found in {source_dir}. " | |
| "Expected scene_0/, scene_1/, etc. folders or numbered files." | |
| ) | |
| if selected_count != total_expected: | |
| raise Exception( | |
| f"Expected {total_expected} selected images (title + {len(manifest.scenes)} scenes), found {selected_count}" | |
| ) | |
| return { | |
| "status": "success", | |
| "message": f"Selected {selected_count}/{total_expected} scenes", | |
| "selected_count": selected_count, | |
| "selected_dir": str(SELECTED_DIR), | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": str(e), | |
| } | |
| async def _compose(manifest: ManifestRequest): | |
| """ | |
| Internal: Compose video from manifest and selected images. | |
| Note: generate_scene_config adds title as scene 0, so we expect: | |
| selected_images_count = manifest_scenes + 1 | |
| """ | |
| try: | |
| # Verify selected directory has images | |
| selected_images = sorted(SELECTED_DIR.glob("scene_*.jpg")) | |
| # Generate dynamic SCENE_CONFIG from manifest (adds title as scene 0) | |
| scene_config = generate_scene_config(manifest) | |
| expected_images = len(scene_config) # includes title as scene 0 | |
| if len(selected_images) != expected_images: | |
| raise Exception( | |
| f"Expected {expected_images} selected images (title + {len(manifest.scenes)} scenes), " | |
| f"found {len(selected_images)}" | |
| ) | |
| # Save config as JSON for composer to use | |
| config_json = { | |
| "title": manifest.title, | |
| "scenes": scene_config, | |
| } | |
| config_path = BASE_DIR / "manifest_config.json" | |
| with open(config_path, "w") as f: | |
| json.dump(config_json, f, indent=2) | |
| # Call composer_v2.py with the config | |
| env = os.environ.copy() | |
| env["COMPOSER_MANIFEST_CONFIG"] = str(config_path) | |
| env["PYTHONIOENCODING"] = "utf-8" | |
| result = subprocess.run( | |
| [sys.executable, str(BASE_DIR / "composer_v2.py")], | |
| cwd=str(BASE_DIR), | |
| env=env, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| # Filter out numpy warnings from stderr | |
| # Only treat stderr as error if returncode is non-zero | |
| if result.returncode != 0: | |
| error_msg = result.stderr or result.stdout | |
| if error_msg: | |
| # Get last few meaningful lines | |
| lines = [line for line in error_msg.split("\n") if line.strip()] | |
| error_summary = "\n".join(lines[-3:]) | |
| raise Exception(f"Composer failed with code {result.returncode}: {error_summary}") | |
| # Verify output file exists | |
| output_file = RENDERS_DIR / "sunset_reel.mp4" | |
| if not output_file.exists(): | |
| raise Exception("Output video file not created") | |
| size_mb = output_file.stat().st_size / (1024 * 1024) | |
| duration_s = len(scene_config) * 2.3 + 2.4 # Approximate | |
| return { | |
| "status": "success", | |
| "message": "Video composed successfully", | |
| "output_path": str(output_file), | |
| "size_mb": round(size_mb, 1), | |
| "duration_s": round(duration_s, 1), | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": str(e), | |
| } | |
| async def generate_from_prompt(request: PromptRequest): | |
| """ | |
| Full End-to-End Pipeline: Prompt β Manifest β Images β Selection β Video | |
| Workflow: | |
| 1. Call content-gen server to generate manifest from prompt | |
| 2. Call pinteresting server to download images for each scene | |
| 3. Select best images from candidates | |
| 4. Compose video with manifest labels | |
| 5. Return MP4 file | |
| Args: | |
| request.prompt: User description (e.g., "A motivational video about success") | |
| request.title: Optional override for video title | |
| Returns: MP4 video file (video/mp4) | |
| """ | |
| try: | |
| print(f"\n[PROMPT] {request.prompt[:80]}...") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 1: Generate Manifest from Prompt | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("[STEP 1] Generating manifest from prompt...") | |
| manifest_server = "https://factorstudios-content-gen.hf.space" | |
| manifest_payload = {"topic": request.prompt} | |
| manifest_response = requests.post( | |
| f"{manifest_server}/generate", | |
| json=manifest_payload, | |
| timeout=120 | |
| ) | |
| manifest_response.raise_for_status() | |
| manifest_data = manifest_response.json() | |
| # Override title if provided | |
| if request.title: | |
| manifest_data["title"] = request.title | |
| # Save manifest | |
| manifest_path = BASE_DIR / "manifest_from_prompt.json" | |
| with open(manifest_path, "w") as f: | |
| json.dump(manifest_data, f, indent=2) | |
| scenes = manifest_data.get("scenes", []) | |
| print(f"[OK] Generated manifest with {len(scenes)} scenes") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 2: Download Images from Pinteresting Server | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("[STEP 2] Downloading images for each scene...") | |
| # Clear candidates directory | |
| if CANDIDATES_DIR.exists(): | |
| shutil.rmtree(CANDIDATES_DIR) | |
| CANDIDATES_DIR.mkdir(parents=True, exist_ok=True) | |
| image_server = "https://factorstudios-pinteresting.hf.space" | |
| total_downloaded = 0 | |
| images_per_scene = 5 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STEP 2.0: Download images for TITLE (as scene_0) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| title = manifest_data.get("title", "") | |
| if title: | |
| scene_dir = CANDIDATES_DIR / "scene_0" | |
| scene_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| # Random aesthetic query for title images (rich/luxury vibes) | |
| title_queries = [ | |
| "rich girl luxury aesthetic", | |
| "pretty girl aesthetic", | |
| "luxury lifestyle photography", | |
| "elegant aesthetic woman", | |
| "high fashion editorial", | |
| "luxury aesthetic girl", | |
| "sophisticated woman fashion", | |
| "premium aesthetic lifestyle", | |
| ] | |
| title_image_query = random.choice(title_queries) | |
| payload = {"keyword": title_image_query, "count": images_per_scene} | |
| img_response = requests.post( | |
| f"{image_server}/scrape", | |
| json=payload, | |
| timeout=120 | |
| ) | |
| img_response.raise_for_status() | |
| img_data = img_response.json() | |
| if img_data.get("success"): | |
| images = img_data.get("images", []) | |
| for img_idx, img_info in enumerate(images): | |
| img_url = img_info.get("url") | |
| if not img_url: | |
| continue | |
| try: | |
| dl_response = requests.get(img_url, timeout=15) | |
| dl_response.raise_for_status() | |
| Image.open(BytesIO(dl_response.content)) | |
| file_path = scene_dir / f"candidate_{img_idx:02d}.jpg" | |
| with open(file_path, "wb") as f: | |
| f.write(dl_response.content) | |
| total_downloaded += 1 | |
| except Exception as e: | |
| print(f" [TITLE] Image {img_idx} failed: {e}") | |
| print(f" [TITLE] Downloaded {len(images)} images") | |
| else: | |
| print(f" [TITLE] API error: {img_data.get('message')}") | |
| except Exception as e: | |
| print(f" [TITLE] Request failed: {e}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STEP 2.1: Download images for each CONTENT SCENE (as scene_1+) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| for scene_idx, scene in enumerate(scenes): | |
| actual_scene_idx = scene_idx + 1 # scene_1, scene_2, etc | |
| scene_label = scene.get("label", f"Scene {actual_scene_idx}") | |
| image_query = scene.get("image_query", "") | |
| if not image_query: | |
| print(f" [Scene {actual_scene_idx}] No query found") | |
| continue | |
| # Create scene folder | |
| scene_dir = CANDIDATES_DIR / f"scene_{actual_scene_idx}" | |
| scene_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| # Fetch from pinteresting | |
| payload = {"keyword": image_query, "count": images_per_scene} | |
| img_response = requests.post( | |
| f"{image_server}/scrape", | |
| json=payload, | |
| timeout=120 | |
| ) | |
| img_response.raise_for_status() | |
| img_data = img_response.json() | |
| if img_data.get("success"): | |
| images = img_data.get("images", []) | |
| # Download each image | |
| for img_idx, img_info in enumerate(images): | |
| img_url = img_info.get("url") | |
| if not img_url: | |
| continue | |
| try: | |
| # Download and verify | |
| dl_response = requests.get(img_url, timeout=15) | |
| dl_response.raise_for_status() | |
| # Verify it's valid image | |
| Image.open(BytesIO(dl_response.content)) | |
| # Save | |
| file_path = scene_dir / f"candidate_{img_idx:02d}.jpg" | |
| with open(file_path, "wb") as f: | |
| f.write(dl_response.content) | |
| total_downloaded += 1 | |
| except Exception as e: | |
| print(f" [Scene {actual_scene_idx}] Image {img_idx} failed: {e}") | |
| print(f" [Scene {actual_scene_idx}] Downloaded {len(images)} images") | |
| else: | |
| print(f" [Scene {actual_scene_idx}] API error: {img_data.get('message')}") | |
| except Exception as e: | |
| print(f" [Scene {actual_scene_idx}] Request failed: {e}") | |
| if total_downloaded == 0: | |
| raise Exception(f"No images were downloaded from {image_server}") | |
| print(f"[OK] Downloaded {total_downloaded} images total") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 3: Select Best Images from Candidates | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("[STEP 3] Selecting best images from candidates...") | |
| manifest_req = ManifestRequest(**manifest_data) | |
| select_result = await _select_scenes(manifest_req, CANDIDATES_DIR) | |
| if select_result["status"] != "success": | |
| raise Exception(select_result.get("message", "Scene selection failed")) | |
| print(f"[OK] Selected {select_result['selected_count']} images") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 4: Compose Video | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("[STEP 4] Composing video...") | |
| compose_result = await _compose(manifest_req) | |
| if compose_result["status"] != "success": | |
| raise Exception(compose_result.get("message", "Composition failed")) | |
| print(f"[OK] Video composed ({compose_result['size_mb']:.1f}MB)") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 5: Upload to HuggingFace Dataset | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| output_file = Path(compose_result["output_path"]) | |
| if not output_file.exists(): | |
| raise Exception("Output video file not found") | |
| print("[STEP 5] Uploading to HuggingFace...") | |
| upload_result = await upload_video_to_hf( | |
| output_file, | |
| f"trendclip_{manifest_req.title[:30]}" | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 6: Return Video File with Upload Status | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"[SUCCESS] Video ready: {output_file.name}") | |
| # Read video file | |
| with open(output_file, "rb") as f: | |
| video_data = f.read() | |
| # Return as FileResponse with headers for upload status | |
| response = FileResponse( | |
| path=output_file, | |
| media_type="video/mp4", | |
| filename="video.mp4", | |
| headers={ | |
| "X-Upload-Status": upload_result.get("status", "unknown"), | |
| "X-Upload-Message": upload_result.get("message", ""), | |
| "X-Dataset-Path": upload_result.get("dataset_path", "") | |
| } | |
| ) | |
| return response | |
| except Exception as e: | |
| print(f"[ERROR] {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def root(): | |
| """Serve the UI""" | |
| index_path = BASE_DIR / "static" / "index.html" | |
| if index_path.exists(): | |
| return FileResponse(index_path, media_type="text/html") | |
| return {"message": "TrendClip Video Composer API"} | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "service": "TrendClip Video Composer", | |
| "version": "2.0", | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |