diffusers-pr-api / src /slop_farmer /data /github_api.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
import time
import urllib.error
import urllib.parse
import urllib.request
from collections.abc import Callable, Iterator
from datetime import UTC, datetime
from slop_farmer.data.http import urlopen_with_retry
class PullRequestDiffTooLargeError(RuntimeError):
"""Raised when GitHub refuses to render a PR diff because it is too large."""
class GitHubApiRequestError(RuntimeError):
"""Raised when GitHub returns a non-recoverable HTTP response."""
def __init__(self, status_code: int, path: str, detail: str):
self.status_code = status_code
self.path = path
self.detail = detail
super().__init__(f"GitHub API request failed: {status_code} {path} {detail}")
class GitHubClient:
def __init__(
self,
token: str | None,
per_page: int = 100,
timeout: int = 180,
max_retries: int = 5,
log: Callable[[str], None] | None = None,
):
self.token = token
self.per_page = per_page
self.timeout = timeout
self.max_retries = max_retries
self.log = log
self.base_url = "https://api.github.com"
self.request_count = 0
self.rate_limit_log_every = 25
self._rate_limit_thresholds_logged: dict[str, set[int]] = {}
@staticmethod
def _header_int(headers: dict[str, str], name: str) -> int | None:
value = headers.get(name)
if value is None:
return None
try:
return int(value)
except ValueError:
return None
def _maybe_log_rate_limit(self, path: str, headers: dict[str, str]) -> None:
self.request_count += 1
if not self.log:
return
limit = self._header_int(headers, "x-ratelimit-limit")
remaining = self._header_int(headers, "x-ratelimit-remaining")
used = self._header_int(headers, "x-ratelimit-used")
reset_at = self._header_int(headers, "x-ratelimit-reset")
resource = headers.get("x-ratelimit-resource", "?")
if limit is None or remaining is None:
return
should_log = self.request_count == 1 or self.request_count % self.rate_limit_log_every == 0
thresholds = (1000, 500, 250, 100, 50, 25, 10, 5, 1, 0)
logged = self._rate_limit_thresholds_logged.setdefault(resource, set())
for threshold in thresholds:
if remaining <= threshold and threshold not in logged:
should_log = True
logged.add(threshold)
if not should_log:
return
reset_text = "?"
if reset_at is not None:
reset_text = datetime.fromtimestamp(reset_at, tz=UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
used_text = "?" if used is None else str(used)
self.log(
f"GitHub rate limit: resource={resource} used={used_text} remaining={remaining}/{limit} "
f"reset={reset_text} after {path} (request {self.request_count})"
)
def _request(
self,
path: str,
params: dict[str, object] | None = None,
accept: str = "application/vnd.github+json",
) -> tuple[object, dict[str, str]]:
query = f"?{urllib.parse.urlencode(params)}" if params else ""
url = f"{self.base_url}{path}{query}"
request = urllib.request.Request(url)
request.add_header("Accept", accept)
request.add_header("X-GitHub-Api-Version", "2022-11-28")
if self.token:
request.add_header("Authorization", f"Bearer {self.token}")
while True:
try:
with urlopen_with_retry(
request,
timeout=self.timeout,
max_retries=self.max_retries,
log=self.log,
label=path,
) as response:
payload = response.read().decode("utf-8")
headers = {k.lower(): v for k, v in response.headers.items()}
self._maybe_log_rate_limit(path, headers)
return json.loads(payload), headers
except urllib.error.HTTPError as exc:
if exc.code == 403 and exc.headers.get("X-RateLimit-Remaining") == "0":
reset_at = int(exc.headers.get("X-RateLimit-Reset", "0") or "0")
sleep_for = max(reset_at - int(time.time()), 1)
if self.log:
self.log(
f"GitHub rate limit reached for {path}; sleeping {sleep_for}s until reset"
)
time.sleep(sleep_for)
continue
detail = exc.read().decode("utf-8", errors="replace")
if (
exc.code == 406
and '"field":"diff"' in detail
and '"code":"too_large"' in detail
):
raise PullRequestDiffTooLargeError(
f"GitHub diff too large: {path} {detail}"
) from exc
raise GitHubApiRequestError(exc.code, path, detail) from exc
def _request_text(
self,
path: str,
params: dict[str, object] | None = None,
accept: str = "application/vnd.github.diff",
) -> tuple[str, dict[str, str]]:
query = f"?{urllib.parse.urlencode(params)}" if params else ""
url = f"{self.base_url}{path}{query}"
request = urllib.request.Request(url)
request.add_header("Accept", accept)
request.add_header("X-GitHub-Api-Version", "2022-11-28")
if self.token:
request.add_header("Authorization", f"Bearer {self.token}")
while True:
try:
with urlopen_with_retry(
request,
timeout=self.timeout,
max_retries=self.max_retries,
log=self.log,
label=path,
) as response:
payload = response.read().decode("utf-8", errors="replace")
headers = {k.lower(): v for k, v in response.headers.items()}
self._maybe_log_rate_limit(path, headers)
return payload, headers
except urllib.error.HTTPError as exc:
if exc.code == 403 and exc.headers.get("X-RateLimit-Remaining") == "0":
reset_at = int(exc.headers.get("X-RateLimit-Reset", "0") or "0")
sleep_for = max(reset_at - int(time.time()), 1)
if self.log:
self.log(
f"GitHub rate limit reached for {path}; sleeping {sleep_for}s until reset"
)
time.sleep(sleep_for)
continue
detail = exc.read().decode("utf-8", errors="replace")
if (
exc.code == 406
and '"field":"diff"' in detail
and '"code":"too_large"' in detail
):
raise PullRequestDiffTooLargeError(
f"GitHub diff too large: {path} {detail}"
) from exc
raise GitHubApiRequestError(exc.code, path, detail) from exc
def paginate(
self,
path: str,
params: dict[str, object] | None = None,
accept: str = "application/vnd.github+json",
limit: int | None = None,
) -> Iterator[dict]:
page = 1
yielded = 0
params = dict(params or {})
params["per_page"] = self.per_page
while True:
params["page"] = page
payload, _headers = self._request(path, params=params, accept=accept)
if not isinstance(payload, list):
raise RuntimeError(f"Expected list payload from {path}, got {type(payload)!r}")
if not payload:
break
for item in payload:
if not isinstance(item, dict):
continue
yield item
yielded += 1
if limit is not None and yielded >= limit:
return
if len(payload) < self.per_page:
break
page += 1
def get_json(self, path: str, accept: str = "application/vnd.github+json") -> dict:
payload, _headers = self._request(path, accept=accept)
if not isinstance(payload, dict):
raise RuntimeError(f"Expected dict payload from {path}, got {type(payload)!r}")
return payload
def iter_repo_issues(
self, owner: str, repo: str, since: str | None, limit: int | None
) -> Iterator[dict]:
direction = "asc"
if since is None and limit is not None:
# When the caller asks for a bounded first pass without a watermark,
# prefer the most recently updated items. Using ascending order here
# causes small smoke tests to read the stalest issue/PR stubs first,
# which can make recent-age filters appear to return an empty repo.
direction = "desc"
params: dict[str, object] = {"state": "all", "sort": "updated", "direction": direction}
if since:
params["since"] = since
yield from self.paginate(f"/repos/{owner}/{repo}/issues", params=params, limit=limit)
def iter_issue_comments(
self, owner: str, repo: str, since: str | None, limit: int | None
) -> Iterator[dict]:
params: dict[str, object] = {"sort": "updated", "direction": "asc"}
if since:
params["since"] = since
yield from self.paginate(
f"/repos/{owner}/{repo}/issues/comments", params=params, limit=limit
)
def iter_issue_comments_for_number(
self,
owner: str,
repo: str,
number: int,
since: str | None,
limit: int | None = None,
) -> Iterator[dict]:
params: dict[str, object] = {"sort": "updated", "direction": "asc"}
if since:
params["since"] = since
yield from self.paginate(
f"/repos/{owner}/{repo}/issues/{number}/comments", params=params, limit=limit
)
def get_pull_request(self, owner: str, repo: str, number: int) -> dict:
return self.get_json(f"/repos/{owner}/{repo}/pulls/{number}")
def iter_pull_reviews(
self, owner: str, repo: str, number: int, limit: int | None = None
) -> Iterator[dict]:
yield from self.paginate(f"/repos/{owner}/{repo}/pulls/{number}/reviews", limit=limit)
def iter_pull_review_comments(
self, owner: str, repo: str, number: int, limit: int | None = None
) -> Iterator[dict]:
yield from self.paginate(
f"/repos/{owner}/{repo}/pulls/{number}/comments",
params={"sort": "updated", "direction": "asc"},
limit=limit,
)
def iter_pull_files(
self, owner: str, repo: str, number: int, limit: int | None = None
) -> Iterator[dict]:
yield from self.paginate(f"/repos/{owner}/{repo}/pulls/{number}/files", limit=limit)
def get_pull_request_diff(self, owner: str, repo: str, number: int) -> str:
path = f"/repos/{owner}/{repo}/pulls/{number}"
try:
payload, _headers = self._request_text(
path,
accept="application/vnd.github.diff",
)
return payload
except PullRequestDiffTooLargeError:
if self.log:
self.log(
f"Skipping unified diff for pull request #{number}; GitHub reports diff too large"
)
return ""
def iter_issue_timeline(
self, owner: str, repo: str, number: int, limit: int | None = None
) -> Iterator[dict]:
path = f"/repos/{owner}/{repo}/issues/{number}/timeline"
try:
yield from self.paginate(
path,
accept="application/vnd.github+json, application/vnd.github.mockingbird-preview+json",
limit=limit,
)
except GitHubApiRequestError as exc:
if exc.status_code < 500:
raise
if self.log:
self.log(
f"Skipping timeline fetch for issue #{number} after GitHub {exc.status_code}: {path}"
)
return