| import os |
| import requests |
| import cv2 |
| import numpy as np |
|
|
| BASE_URL = os.environ.get( |
| "ID_API_URL", |
| "http://127.0.0.1:8082" |
| ) |
|
|
| ENDPOINTS = { |
| "id": f"{BASE_URL}/api/check_id", |
| "credit": f"{BASE_URL}/api/check_credit", |
| "mrz": f"{BASE_URL}/api/check_mrz", |
| } |
|
|
|
|
| def _load_image(image): |
| if image is None: |
| return None |
| if isinstance(image, str): |
| img = cv2.imread(image) |
| if img is None: |
| return None |
| return img |
| return image |
|
|
|
|
| def _check_document(image, endpoint_key: str) -> dict: |
| img = _load_image(image) |
| if img is None: |
| return {"error": "No image provided"} |
|
|
| success, img_encoded = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 95]) |
| if not success: |
| return {"error": "Failed to encode image"} |
|
|
| url = ENDPOINTS.get(endpoint_key) |
| if not url: |
| return {"error": f"Unknown endpoint: {endpoint_key}"} |
|
|
| try: |
| response = requests.post( |
| url, |
| files={"image": ("image.jpg", img_encoded.tobytes(), "image/jpeg")}, |
| timeout=30 |
| ) |
| response.raise_for_status() |
| return response.json() |
| except requests.exceptions.ConnectionError: |
| return {"error": f"Cannot connect to server at {BASE_URL}. Make sure the SDK server is running."} |
| except requests.exceptions.Timeout: |
| return {"error": "Request timed out"} |
| except requests.exceptions.RequestException as e: |
| return {"error": f"API request failed: {str(e)}"} |
|
|
|
|
| def check_id(image) -> dict: |
| return _check_document(image, "id") |
|
|
|
|
| def check_credit(image) -> dict: |
| return _check_document(image, "credit") |
|
|
|
|
| def check_mrz(image) -> dict: |
| return _check_document(image, "mrz") |
|
|