Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| from fastapi import FastAPI, HTTPException | |
| from playwright.async_api import async_playwright | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import uvicorn | |
| app = FastAPI(title="Pinterest Scraper API") | |
| # Cache for search results (optional optimization) | |
| search_cache = {} | |
| CACHE_DURATION = 300 # 5 minutes | |
| class ScrapeRequest(BaseModel): | |
| keyword: str | |
| count: int = 10 | |
| aspect_ratio: str = None # Options: "9:16", "16:9", "1:1", "4:5", "any" | |
| class ScrapeResponse(BaseModel): | |
| success: bool | |
| message: str | |
| images: List[dict] # Each image has url, width, height, aspect_ratio | |
| keyword: str | |
| def check_aspect_ratio(width: int, height: int, target_ratio: str) -> bool: | |
| """Check if image matches target aspect ratio within tolerance.""" | |
| if not target_ratio or target_ratio == "any": | |
| return True | |
| current_ratio = width / height | |
| ratios = { | |
| "9:16": 9/16, # Vertical (Shorts/Reels) | |
| "16:9": 16/9, # Horizontal (Landscape) | |
| "1:1": 1/1, # Square | |
| "4:5": 4/5, # Portrait (Instagram) | |
| "3:4": 3/4, # Portrait (Standard) | |
| "21:9": 21/9, # Ultrawide | |
| } | |
| if target_ratio not in ratios: | |
| return True | |
| target = ratios[target_ratio] | |
| tolerance = 0.15 # 15% tolerance | |
| return abs(current_ratio - target) <= tolerance * target | |
| async def scrape_pinterest_api(keyword: str, count: int, aspect_ratio: str = None): | |
| print(f"Starting scrape for '{keyword}', count={count}, ratio={aspect_ratio}") | |
| images = [] # List of dict with url, width, height | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| context = await browser.new_context( | |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
| ) | |
| page = await context.new_page() | |
| search_url = f"https://www.pinterest.com/search/pins/?q={keyword.replace(' ', '%20')}" | |
| try: | |
| await page.goto(search_url, timeout=60000) | |
| await page.wait_for_selector("div[data-test-id='pin']", timeout=15000) | |
| except Exception as e: | |
| await browser.close() | |
| return [], str(e) | |
| downloaded_count = 0 | |
| seen_urls = set() | |
| last_height = await page.evaluate("document.body.scrollHeight") | |
| no_new_content_count = 0 | |
| scroll_attempts = 0 | |
| max_scrolls = 8 # Limit scroll attempts | |
| while downloaded_count < count and scroll_attempts < max_scrolls: | |
| # Wait for images to load and get dimensions properly | |
| await page.wait_for_timeout(500) # Let lazy images load | |
| img_data = await page.evaluate(""" | |
| () => { | |
| const pins = document.querySelectorAll("div[data-test-id='pin']"); | |
| return Array.from(pins).map(pin => { | |
| const img = pin.querySelector('img'); | |
| if (!img || !img.src) return null; | |
| // Get actual rendered dimensions from parent container | |
| const rect = pin.getBoundingClientRect(); | |
| return { | |
| src: img.src, | |
| // Use container aspect ratio if image not loaded | |
| width: img.naturalWidth || Math.round(rect.width), | |
| height: img.naturalHeight || Math.round(rect.height), | |
| container_width: Math.round(rect.width), | |
| container_height: Math.round(rect.height) | |
| }; | |
| }).filter(item => item && item.src.includes('pinimg.com')); | |
| } | |
| """) | |
| for img_info in img_data: | |
| if downloaded_count >= count: | |
| break | |
| src = img_info.get("src", "") | |
| if not src: | |
| continue | |
| # Convert to high-res URL | |
| high_res_url = src.replace("236x", "736x").replace("474x", "736x") | |
| if high_res_url not in seen_urls: | |
| seen_urls.add(high_res_url) | |
| # Use natural dimensions if available, else container dimensions | |
| width = img_info.get("width", 0) or img_info.get("container_width", 0) | |
| height = img_info.get("height", 0) or img_info.get("container_height", 0) | |
| # Check aspect ratio if specified | |
| passes_ratio = True | |
| if aspect_ratio and aspect_ratio != "any": | |
| if width > 0 and height > 0: | |
| passes_ratio = check_aspect_ratio(width, height, aspect_ratio) | |
| print(f"Checking ratio: {width}x{height} = {width/height:.2f} for {aspect_ratio} -> {passes_ratio}") | |
| if passes_ratio: | |
| images.append({ | |
| "url": high_res_url, | |
| "width": width, | |
| "height": height, | |
| "aspect_ratio": f"{width}:{height}" if width > 0 else "unknown" | |
| }) | |
| downloaded_count += 1 | |
| if downloaded_count >= count: | |
| break | |
| scroll_attempts += 1 | |
| # Scroll down - reduced wait time | |
| await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") | |
| await page.wait_for_timeout(800) # Reduced from 2000ms | |
| new_height = await page.evaluate("document.body.scrollHeight") | |
| if new_height == last_height: | |
| no_new_content_count += 1 | |
| if no_new_content_count > 3: | |
| break | |
| else: | |
| no_new_content_count = 0 | |
| last_height = new_height | |
| await browser.close() | |
| except Exception as e: | |
| print(f"Playwright error: {e}") | |
| return [], str(e) | |
| return images, None | |
| async def scrape(request: ScrapeRequest): | |
| if not request.keyword: | |
| raise HTTPException(status_code=400, detail="Keyword is required") | |
| if request.count < 1 or request.count > 20: | |
| raise HTTPException(status_code=400, detail="Count must be between 1 and 20") | |
| paths, error = await scrape_pinterest_api(request.keyword, request.count, request.aspect_ratio) | |
| if error: | |
| return ScrapeResponse( | |
| success=False, | |
| message=f"Error: {error}", | |
| images=[], | |
| keyword=request.keyword | |
| ) | |
| return ScrapeResponse( | |
| success=True, | |
| message=f"Found {len(paths)} images", | |
| images=paths, | |
| keyword=request.keyword | |
| ) | |
| async def health(): | |
| return {"status": "healthy", "service": "pinterest-scraper-api"} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |