import os import asyncio from fastapi import FastAPI, HTTPException from playwright.async_api import async_playwright from pydantic import BaseModel from typing import List, Optional import uvicorn app = FastAPI(title="Pinterest Scraper API") # Cache for search results (optional optimization) search_cache = {} CACHE_DURATION = 300 # 5 minutes class ScrapeRequest(BaseModel): keyword: str count: int = 10 aspect_ratio: str = None # Options: "9:16", "16:9", "1:1", "4:5", "any" class ScrapeResponse(BaseModel): success: bool message: str images: List[dict] # Each image has url, width, height, aspect_ratio keyword: str def check_aspect_ratio(width: int, height: int, target_ratio: str) -> bool: """Check if image matches target aspect ratio within tolerance.""" if not target_ratio or target_ratio == "any": return True current_ratio = width / height ratios = { "9:16": 9/16, # Vertical (Shorts/Reels) "16:9": 16/9, # Horizontal (Landscape) "1:1": 1/1, # Square "4:5": 4/5, # Portrait (Instagram) "3:4": 3/4, # Portrait (Standard) "21:9": 21/9, # Ultrawide } if target_ratio not in ratios: return True target = ratios[target_ratio] tolerance = 0.15 # 15% tolerance return abs(current_ratio - target) <= tolerance * target async def scrape_pinterest_api(keyword: str, count: int, aspect_ratio: str = None): print(f"Starting scrape for '{keyword}', count={count}, ratio={aspect_ratio}") images = [] # List of dict with url, width, height try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" ) page = await context.new_page() search_url = f"https://www.pinterest.com/search/pins/?q={keyword.replace(' ', '%20')}" try: await page.goto(search_url, timeout=60000) await page.wait_for_selector("div[data-test-id='pin']", timeout=15000) except Exception as e: await browser.close() return [], str(e) downloaded_count = 0 seen_urls = set() last_height = await page.evaluate("document.body.scrollHeight") no_new_content_count = 0 scroll_attempts = 0 max_scrolls = 8 # Limit scroll attempts while downloaded_count < count and scroll_attempts < max_scrolls: # Wait for images to load and get dimensions properly await page.wait_for_timeout(500) # Let lazy images load img_data = await page.evaluate(""" () => { const pins = document.querySelectorAll("div[data-test-id='pin']"); return Array.from(pins).map(pin => { const img = pin.querySelector('img'); if (!img || !img.src) return null; // Get actual rendered dimensions from parent container const rect = pin.getBoundingClientRect(); return { src: img.src, // Use container aspect ratio if image not loaded width: img.naturalWidth || Math.round(rect.width), height: img.naturalHeight || Math.round(rect.height), container_width: Math.round(rect.width), container_height: Math.round(rect.height) }; }).filter(item => item && item.src.includes('pinimg.com')); } """) for img_info in img_data: if downloaded_count >= count: break src = img_info.get("src", "") if not src: continue # Convert to high-res URL high_res_url = src.replace("236x", "736x").replace("474x", "736x") if high_res_url not in seen_urls: seen_urls.add(high_res_url) # Use natural dimensions if available, else container dimensions width = img_info.get("width", 0) or img_info.get("container_width", 0) height = img_info.get("height", 0) or img_info.get("container_height", 0) # Check aspect ratio if specified passes_ratio = True if aspect_ratio and aspect_ratio != "any": if width > 0 and height > 0: passes_ratio = check_aspect_ratio(width, height, aspect_ratio) print(f"Checking ratio: {width}x{height} = {width/height:.2f} for {aspect_ratio} -> {passes_ratio}") if passes_ratio: images.append({ "url": high_res_url, "width": width, "height": height, "aspect_ratio": f"{width}:{height}" if width > 0 else "unknown" }) downloaded_count += 1 if downloaded_count >= count: break scroll_attempts += 1 # Scroll down - reduced wait time await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await page.wait_for_timeout(800) # Reduced from 2000ms new_height = await page.evaluate("document.body.scrollHeight") if new_height == last_height: no_new_content_count += 1 if no_new_content_count > 3: break else: no_new_content_count = 0 last_height = new_height await browser.close() except Exception as e: print(f"Playwright error: {e}") return [], str(e) return images, None @app.post("/scrape", response_model=ScrapeResponse) async def scrape(request: ScrapeRequest): if not request.keyword: raise HTTPException(status_code=400, detail="Keyword is required") if request.count < 1 or request.count > 20: raise HTTPException(status_code=400, detail="Count must be between 1 and 20") paths, error = await scrape_pinterest_api(request.keyword, request.count, request.aspect_ratio) if error: return ScrapeResponse( success=False, message=f"Error: {error}", images=[], keyword=request.keyword ) return ScrapeResponse( success=True, message=f"Found {len(paths)} images", images=paths, keyword=request.keyword ) @app.get("/health") async def health(): return {"status": "healthy", "service": "pinterest-scraper-api"} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)