pinteresting / api_app.py
factorstudios's picture
Upload 5 files
cfa4580 verified
import os
import asyncio
from fastapi import FastAPI, HTTPException
from playwright.async_api import async_playwright
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
app = FastAPI(title="Pinterest Scraper API")
# Cache for search results (optional optimization)
search_cache = {}
CACHE_DURATION = 300 # 5 minutes
class ScrapeRequest(BaseModel):
keyword: str
count: int = 10
aspect_ratio: str = None # Options: "9:16", "16:9", "1:1", "4:5", "any"
class ScrapeResponse(BaseModel):
success: bool
message: str
images: List[dict] # Each image has url, width, height, aspect_ratio
keyword: str
def check_aspect_ratio(width: int, height: int, target_ratio: str) -> bool:
"""Check if image matches target aspect ratio within tolerance."""
if not target_ratio or target_ratio == "any":
return True
current_ratio = width / height
ratios = {
"9:16": 9/16, # Vertical (Shorts/Reels)
"16:9": 16/9, # Horizontal (Landscape)
"1:1": 1/1, # Square
"4:5": 4/5, # Portrait (Instagram)
"3:4": 3/4, # Portrait (Standard)
"21:9": 21/9, # Ultrawide
}
if target_ratio not in ratios:
return True
target = ratios[target_ratio]
tolerance = 0.15 # 15% tolerance
return abs(current_ratio - target) <= tolerance * target
async def scrape_pinterest_api(keyword: str, count: int, aspect_ratio: str = None):
print(f"Starting scrape for '{keyword}', count={count}, ratio={aspect_ratio}")
images = [] # List of dict with url, width, height
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
)
page = await context.new_page()
search_url = f"https://www.pinterest.com/search/pins/?q={keyword.replace(' ', '%20')}"
try:
await page.goto(search_url, timeout=60000)
await page.wait_for_selector("div[data-test-id='pin']", timeout=15000)
except Exception as e:
await browser.close()
return [], str(e)
downloaded_count = 0
seen_urls = set()
last_height = await page.evaluate("document.body.scrollHeight")
no_new_content_count = 0
scroll_attempts = 0
max_scrolls = 8 # Limit scroll attempts
while downloaded_count < count and scroll_attempts < max_scrolls:
# Wait for images to load and get dimensions properly
await page.wait_for_timeout(500) # Let lazy images load
img_data = await page.evaluate("""
() => {
const pins = document.querySelectorAll("div[data-test-id='pin']");
return Array.from(pins).map(pin => {
const img = pin.querySelector('img');
if (!img || !img.src) return null;
// Get actual rendered dimensions from parent container
const rect = pin.getBoundingClientRect();
return {
src: img.src,
// Use container aspect ratio if image not loaded
width: img.naturalWidth || Math.round(rect.width),
height: img.naturalHeight || Math.round(rect.height),
container_width: Math.round(rect.width),
container_height: Math.round(rect.height)
};
}).filter(item => item && item.src.includes('pinimg.com'));
}
""")
for img_info in img_data:
if downloaded_count >= count:
break
src = img_info.get("src", "")
if not src:
continue
# Convert to high-res URL
high_res_url = src.replace("236x", "736x").replace("474x", "736x")
if high_res_url not in seen_urls:
seen_urls.add(high_res_url)
# Use natural dimensions if available, else container dimensions
width = img_info.get("width", 0) or img_info.get("container_width", 0)
height = img_info.get("height", 0) or img_info.get("container_height", 0)
# Check aspect ratio if specified
passes_ratio = True
if aspect_ratio and aspect_ratio != "any":
if width > 0 and height > 0:
passes_ratio = check_aspect_ratio(width, height, aspect_ratio)
print(f"Checking ratio: {width}x{height} = {width/height:.2f} for {aspect_ratio} -> {passes_ratio}")
if passes_ratio:
images.append({
"url": high_res_url,
"width": width,
"height": height,
"aspect_ratio": f"{width}:{height}" if width > 0 else "unknown"
})
downloaded_count += 1
if downloaded_count >= count:
break
scroll_attempts += 1
# Scroll down - reduced wait time
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(800) # Reduced from 2000ms
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == last_height:
no_new_content_count += 1
if no_new_content_count > 3:
break
else:
no_new_content_count = 0
last_height = new_height
await browser.close()
except Exception as e:
print(f"Playwright error: {e}")
return [], str(e)
return images, None
@app.post("/scrape", response_model=ScrapeResponse)
async def scrape(request: ScrapeRequest):
if not request.keyword:
raise HTTPException(status_code=400, detail="Keyword is required")
if request.count < 1 or request.count > 20:
raise HTTPException(status_code=400, detail="Count must be between 1 and 20")
paths, error = await scrape_pinterest_api(request.keyword, request.count, request.aspect_ratio)
if error:
return ScrapeResponse(
success=False,
message=f"Error: {error}",
images=[],
keyword=request.keyword
)
return ScrapeResponse(
success=True,
message=f"Found {len(paths)} images",
images=paths,
keyword=request.keyword
)
@app.get("/health")
async def health():
return {"status": "healthy", "service": "pinterest-scraper-api"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)