| from asyncio import Queue, create_task, gather, sleep |
| from contextlib import asynccontextmanager |
| from json import dumps, loads |
| from logging import INFO, basicConfig, getLogger |
| from pathlib import Path |
| from typing import Literal |
|
|
| from apscheduler.schedulers.asyncio import AsyncIOScheduler |
| from fastapi import FastAPI, HTTPException |
| from fastapi.responses import JSONResponse, PlainTextResponse |
| from proxybroker import Broker |
| from uvicorn import run as uvicorn_run |
|
|
|
|
| scheduler = AsyncIOScheduler() |
|
|
| try: |
| workdir = Path(__file__).parent |
| except: |
| workdir = Path.cwd().parent |
|
|
| logfile = workdir / 'log.log' |
|
|
| basicConfig( |
| level=INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| filename=str(logfile), |
| filemode='a' |
| ) |
| logger = getLogger('proxy_collector') |
| logger.info('запуск...') |
|
|
|
|
| def delete_logs(): |
| try: |
| logfile.unlink() |
| except Exception as e: |
| logger.error(f'ошибка при удалении логов: {e}') |
|
|
|
|
| is_first_run = True |
|
|
| http_collected_json = workdir / 'http_proxies.json' |
| https_collected_json = workdir / 'https_proxies.json' |
| socks5_collected_json = workdir / 'socks5_proxies.json' |
| collected_json = workdir / 'proxies.json' |
|
|
| countries_list = ['US', 'CA', 'FR', 'FI', 'HR', 'ME', 'CH', 'SE', 'EE', 'DE', 'GB', 'IT', 'NL', 'PL', 'CZ', 'RS', 'RO', 'MD', 'AT', 'BE', 'BG', 'HU', 'DK', 'IS', 'KZ', 'LV', 'LT', 'LU', 'NO', 'PT', 'SK', 'SI'] |
|
|
|
|
| def create_json_from_proxies(proxy_lines: list[str], filename: Path): |
| logger.info('сохранение файла прокси') |
| countries = set() |
| proxies = [] |
|
|
| for line in proxy_lines: |
| parts = line.split() |
| country = parts[1] |
| ping = float(parts[2].strip('s')) |
| protocol = parts[3].strip('[]') |
| host = parts[4].rstrip('>') |
|
|
| if "HTTP:" in protocol: |
| protocol = "HTTP" |
| host = parts[5].rstrip(']>') |
|
|
| countries.add(country) |
| proxies.append({"country": country, "ping": ping, "protocol": protocol, "host": host}) |
|
|
| data = { |
| 'countries': sorted(list(countries)), |
| 'proxies': proxies |
| } |
| filename.write_text(dumps(data, indent=4)) |
| return filename |
|
|
|
|
| async def collect_proxies(proxies_queue: Queue): |
| proxies_list = [] |
| while True: |
| proxy = await proxies_queue.get() |
| if proxy is None: |
| break |
| proxies_list.append(f'{proxy}') |
| |
| return proxies_list |
|
|
|
|
| async def sort_proxies_and_merge(files: list[Path], output_file: Path): |
| logger.info('объединение файлов прокси') |
| all_countries = set() |
| proxies_by_type = {} |
| for file in files: |
| if file.is_file() and file.stat().st_size > 0: |
| data = loads(file.read_text(encoding='utf-8')) |
| proxies = data.get('proxies') |
| if proxies: |
| first_proxy = proxies[0] if proxies else None |
| proxy_type = first_proxy.get('protocol').lower() if first_proxy and first_proxy.get('protocol') else None |
| if proxy_type: |
| sorted_proxies = sorted(proxies, key=lambda x: x.get('ping')) |
| proxies_by_type[proxy_type] = { |
| 'countries': list(set(proxy.get('country') for proxy in sorted_proxies if proxy.get('country'))), |
| 'proxies': sorted_proxies |
| } |
| all_countries.update(proxies_by_type[proxy_type]["countries"]) |
| all_countries = sorted(all_countries) |
| merged_data = {'countries': all_countries, 'proxies_by_type': proxies_by_type} |
| output_file.write_text(dumps(merged_data, indent=4)) |
| return output_file |
|
|
|
|
| async def stop_broker_after_timeout(broker: Broker, timeout_minutes: int): |
| await sleep(timeout_minutes * 60) |
| try: |
| broker.stop() |
| except: |
| pass |
|
|
|
|
| async def find_proxies_by_type(proxy_type: Literal['HTTP', 'HTTPS', 'SOCKS5'], output_json_file: Path, timeout_minutes: int = 50): |
| logger.info(f'начат сбор прокси {proxy_type}') |
| output_json_file.write_text(dumps({'countries': None, 'proxies': []}, indent=4)) |
| proxies_queue = Queue() |
| broker = Broker(proxies_queue, timeout=8, max_conn=200, max_tries=3, verify_ssl=False) |
| stop_task = create_task(stop_broker_after_timeout(broker, timeout_minutes)) |
| await broker.find(types=[proxy_type], countries=countries_list, limit=0) |
| await stop_task |
| proxies_list = await collect_proxies(proxies_queue) |
| saved_proxy = create_json_from_proxies(proxies_list, output_json_file) |
| logger.info(f'завершён сбор прокси {proxy_type}') |
| return saved_proxy |
|
|
|
|
| async def find_proxies(): |
| global is_first_run |
| timeout_minutes = 10 if is_first_run else 50 |
| logger.info(f'запущены задачи по сбору всех типов прокси') |
| results = await gather( |
| find_proxies_by_type('HTTP', http_collected_json, timeout_minutes), |
| find_proxies_by_type('HTTPS', https_collected_json, timeout_minutes), |
| find_proxies_by_type('SOCKS5', socks5_collected_json, timeout_minutes) |
| ) |
| await sort_proxies_and_merge(list(results), collected_json) |
| is_first_run = False |
| logger.info(f'задачи по сбору прокси завершены') |
|
|
|
|
| scheduler.add_job(find_proxies, 'interval', max_instances=1, minutes=60) |
| scheduler.add_job(delete_logs, 'interval', max_instances=1, minutes=1440) |
|
|
|
|
| @asynccontextmanager |
| async def app_lifespan(app: FastAPI): |
| scheduler.start() |
| task = create_task(find_proxies()) |
| yield |
| await task |
| scheduler.shutdown() |
|
|
|
|
| app = FastAPI(lifespan=app_lifespan) |
|
|
|
|
| def not_redy_yet(): |
| return JSONResponse({"error": "ёще не готово, сбор и проверка прокси занимает около часа"}, status_code=200) |
|
|
|
|
| @app.post('*') |
| async def read_root(): |
| return HTTPException(405) |
|
|
|
|
| @app.get('/all/') |
| async def get_proxies(): |
| if collected_json.exists(): |
| return loads(collected_json.read_text()) |
| else: |
| return not_redy_yet() |
|
|
|
|
| @app.get('/http/') |
| async def get_http_proxies(): |
| if http_collected_json.exists(): |
| return loads(http_collected_json.read_text()) |
| else: |
| return not_redy_yet() |
|
|
|
|
| @app.get('/https/') |
| async def get_https_proxies(): |
| if https_collected_json.exists(): |
| return loads(https_collected_json.read_text()) |
| else: |
| return not_redy_yet() |
|
|
|
|
| @app.get('/socks5/') |
| async def get_socks5_proxies(): |
| if socks5_collected_json.exists(): |
| return loads(socks5_collected_json.read_text()) |
| else: |
| return not_redy_yet() |
|
|
|
|
| @app.get('/log/') |
| async def get_logs(): |
| if logfile.exists(): |
| return PlainTextResponse(logfile.read_text(encoding='utf-8'), status_code=200) |
| else: |
| return PlainTextResponse('лог пуст', status_code=200) |
|
|
|
|
| @app.get('/') |
| async def read_root(): |
| return PlainTextResponse('ну пролапс, ну и что', status_code=200) |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90) |
|
|