| | import os |
| | import time |
| | import gradio as gr |
| | from byteplussdkarkruntime import Ark |
| | import requests |
| | from datetime import datetime |
| |
|
| | |
| | API_KEY = os.environ.get("Key", "").strip() |
| | print(f"✅ Key loaded, length: {len(API_KEY)}") |
| |
|
| | |
| | client = Ark( |
| | base_url="https://1hit.no/proxy/proxy.php", |
| | api_key=API_KEY, |
| | timeout=30.0, |
| | max_retries=3, |
| | ) |
| |
|
| | |
| |
|
| | def poll_via_json(task_id): |
| | """Check io.json for task status""" |
| | json_url = "https://1hit.no/proxy/io.json" |
| | try: |
| | response = requests.get(json_url) |
| | data = response.json() |
| | if task_id in data: |
| | task_data = data[task_id] |
| | status = task_data.get('status') |
| | if status == 'succeeded': |
| | return task_data.get('video_url') |
| | elif status == 'failed': |
| | return "FAILED" |
| | else: |
| | return "PROCESSING" |
| | except: |
| | pass |
| | return None |
| |
|
| |
|
| | def update_prompt(choice): |
| | return DEFAULT_PROMPTS.get(choice, "") |
| |
|
| | |
| | def manual_poll(task_id): |
| | """Manually poll a specific task and update io.json""" |
| | if not task_id: |
| | return "❌ Please enter a task ID" |
| | |
| | try: |
| | |
| | url = f"https://1hit.no/proxy/proxy.php?task_id={task_id}" |
| | headers = {"Authorization": f"Bearer {API_KEY}"} |
| | |
| | response = requests.get(url, headers=headers) |
| | |
| | |
| | io_response = requests.get("https://1hit.no/proxy/io.json") |
| | io_data = io_response.json() |
| | |
| | if task_id in io_data: |
| | status = io_data[task_id].get('status') |
| | video_url = io_data[task_id].get('video_url') |
| | |
| | result = f"✅ Task {task_id}\n" |
| | result += f"Status: {status}\n" |
| | if video_url: |
| | result += f"Video URL: {video_url}\n" |
| | if 'response' in io_data[task_id]: |
| | result += f"Full response: {io_data[task_id]['response']}\n" |
| | return result |
| | else: |
| | return f"❌ Task {task_id} not found in io.json" |
| | |
| | except Exception as e: |
| | return f"❌ Error: {str(e)}" |
| |
|
| | def get_raw_json(): |
| | """Fetch raw io.json""" |
| | try: |
| | r = requests.get("https://1hit.no/proxy/io.json") |
| | return r.json() |
| | except: |
| | return {"error": "Could not fetch io.json"} |
| |
|
| | |
| | def get_task_list(): |
| | """Get list of all tasks from io.json""" |
| | try: |
| | r = requests.get("https://1hit.no/proxy/io.json") |
| | data = r.json() |
| | tasks = [] |
| | for task_id, task_data in data.items(): |
| | status = task_data.get('status', 'unknown') |
| | created = task_data.get('created', 0) |
| | |
| | time_str = datetime.fromtimestamp(created).strftime('%Y-%m-%d %H:%M:%S') |
| | |
| | |
| | prompt = task_data.get('request', {}).get('content', [{}])[0].get('text', 'No prompt') |
| | prompt_preview = prompt[:50] + '...' if len(prompt) > 50 else prompt |
| | |
| | tasks.append({ |
| | "id": task_id, |
| | "status": status, |
| | "created": time_str, |
| | "created_ts": created, |
| | "video_url": task_data.get('video_url', ''), |
| | "prompt": prompt_preview |
| | }) |
| | |
| | tasks.sort(key=lambda x: x['created_ts'], reverse=True) |
| | return tasks |
| | except Exception as e: |
| | print(f"Error getting task list: {e}") |
| | return [] |
| |
|
| | def refresh_task_list(): |
| | """Refresh the task list dropdown""" |
| | tasks = get_task_list() |
| | choices = [f"{t['id']} | {t['status']} | {t['created']}" for t in tasks] |
| | return gr.Dropdown(choices=choices, value=choices[0] if choices else None) |
| |
|
| | def load_task(selection): |
| | """Load selected task details""" |
| | if not selection: |
| | return "No task selected", None, None |
| | |
| | task_id = selection.split(' | ')[0] |
| | try: |
| | r = requests.get("https://1hit.no/proxy/io.json") |
| | data = r.json() |
| | task = data.get(task_id, {}) |
| | |
| | video_url = task.get('video_url', '') |
| | status = task.get('status', 'unknown') |
| | |
| | |
| | prompt = task.get('request', {}).get('content', [{}])[0].get('text', 'No prompt') |
| | |
| | |
| | created = task.get('created', 0) |
| | created_str = datetime.fromtimestamp(created).strftime('%Y-%m-%d %H:%M:%S') |
| | |
| | |
| | response = task.get('response', {}) |
| | |
| | info = f"### Task Details\n\n" |
| | info += f"**Task ID:** `{task_id}`\n" |
| | info += f"**Status:** `{status}`\n" |
| | info += f"**Created:** {created_str}\n" |
| | info += f"**Prompt:** {prompt}\n\n" |
| | |
| | if status == 'succeeded' and video_url: |
| | info += f"✅ **Video ready!**\n" |
| | info += f"**Download:** [Click here]({video_url})\n" |
| | elif status == 'failed': |
| | error = task.get('response', {}).get('error', 'Unknown error') |
| | info += f"❌ **Failed:** {error}\n" |
| | |
| | return info, video_url, video_url |
| | except Exception as e: |
| | return f"Error loading task: {e}", None, None |
| |
|
| | def update_url_display(image_path, manual_url): |
| | """Update the URL display based on inputs""" |
| | if manual_url and manual_url.strip(): |
| | return manual_url.strip() |
| | elif image_path: |
| | return f"/file={image_path}" |
| | else: |
| | return "No image selected" |
| |
|
| | |
| | with gr.Blocks(title="BytePlus Video Gallery", theme=gr.themes.Soft()) as demo: |
| | |
| | |
| | |
| | |
| | |
| | with gr.TabItem("🔧 Manual Polling"): |
| | gr.Markdown("### 🔧 Manual Polling & Debug") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("#### Poll Specific Task") |
| | task_id_input = gr.Textbox( |
| | label="Task ID", |
| | placeholder="Enter task ID (cgt-...)" |
| | ) |
| | poll_btn = gr.Button("🔄 Poll Now", variant="primary") |
| | poll_result = gr.Textbox(label="Poll Result", lines=10) |
| | |
| | poll_btn.click( |
| | fn=manual_poll, |
| | inputs=task_id_input, |
| | outputs=poll_result |
| | ) |
| | |
| | with gr.Column(): |
| | gr.Markdown("#### Current io.json") |
| | refresh_btn = gr.Button("🔄 Refresh io.json", variant="secondary") |
| | raw_json = gr.JSON(label="io.json Contents") |
| | |
| | refresh_btn.click( |
| | fn=get_raw_json, |
| | outputs=raw_json |
| | ) |
| | |
| | gr.Markdown(""" |
| | ### 📝 Instructions |
| | 1. Copy a task ID from above (like `cgt-20260217072358-hszt9`) |
| | 2. Paste it in the Task ID field |
| | 3. Click "Poll Now" to force an update |
| | 4. Check io.json to see if status changed |
| | """) |
| | |
| | |
| | with gr.TabItem("📺 Watch & Download"): |
| | gr.Markdown("### 📺 Browse and Download Generated Videos") |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | refresh_list_btn = gr.Button("🔄 Refresh Task List", variant="primary") |
| | task_list = gr.Dropdown( |
| | label="Select Task", |
| | choices=[], |
| | interactive=True |
| | ) |
| | |
| | with gr.Column(scale=2): |
| | task_info = gr.Markdown("Select a task to view details") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | video_player = gr.Video(label="Video Player") |
| | download_link = gr.File(label="Download Video") |
| | |
| | |
| | refresh_list_btn.click( |
| | fn=refresh_task_list, |
| | outputs=task_list |
| | ) |
| | |
| | |
| | task_list.change( |
| | fn=load_task, |
| | inputs=task_list, |
| | outputs=[task_info, video_player, download_link] |
| | ) |
| | |
| | |
| | demo.load( |
| | fn=refresh_task_list, |
| | outputs=task_list |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo.launch(server_name="0.0.0.0") |