diffusers-pr-api / src /slop_farmer /reports /analysis_cache.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from slop_farmer.data.parquet_io import read_json, write_json
HYBRID_REVIEW_CACHE_MANIFEST_FILENAME = "hybrid-review-cache-manifest.json"
HYBRID_REVIEW_CACHE_ENTRIES_FILENAME = "hybrid-review-cache.jsonl"
HYBRID_REVIEW_CACHE_SCHEMA_VERSION = "1.0"
PREPARED_REVIEW_UNIT_SCHEMA_VERSION = "1.0"
def _canonical_json_bytes(data: Any) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
separators=(",", ":"),
sort_keys=True,
).encode("utf-8")
def _normalize_review_item_for_hash(item: dict[str, Any]) -> dict[str, Any]:
normalized = dict(item)
filenames = normalized.get("filenames")
if filenames is not None:
normalized["filenames"] = sorted(str(filename) for filename in filenames)
explicit_issue_targets = normalized.get("explicit_issue_targets")
if explicit_issue_targets is not None:
normalized["explicit_issue_targets"] = sorted(
int(target) for target in explicit_issue_targets
)
return normalized
def _normalize_soft_pair_for_hash(pair: dict[str, Any]) -> dict[str, Any]:
normalized = dict(pair)
evidence_types = normalized.get("evidence_types")
if evidence_types is not None:
normalized["evidence_types"] = sorted(str(value) for value in evidence_types)
shared_targets = normalized.get("shared_targets")
if shared_targets is not None:
normalized["shared_targets"] = sorted(int(target) for target in shared_targets)
shared_filenames = normalized.get("shared_filenames")
if shared_filenames is not None:
normalized["shared_filenames"] = sorted(str(filename) for filename in shared_filenames)
return normalized
def _normalize_prepared_review_unit_for_hash(
prepared_review_unit: dict[str, Any],
) -> dict[str, Any]:
normalized = dict(prepared_review_unit)
packet = dict(normalized.get("packet") or {})
packet["nodes"] = sorted(str(node) for node in packet.get("nodes") or [])
packet["items"] = sorted(
(_normalize_review_item_for_hash(dict(item)) for item in packet.get("items") or []),
key=lambda item: str(item.get("node_id") or ""),
)
packet["pair_evidence"] = {
str(pair): sorted(str(value) for value in values)
for pair, values in sorted(dict(packet.get("pair_evidence") or {}).items())
}
packet["soft_pairs"] = sorted(
(_normalize_soft_pair_for_hash(dict(pair)) for pair in packet.get("soft_pairs") or []),
key=lambda pair: (
str(pair.get("left") or ""),
str(pair.get("right") or ""),
),
)
normalized["packet"] = packet
return normalized
@dataclass(frozen=True, slots=True)
class HybridReviewSettingsFingerprint:
llm_max_input_tokens: int
llm_max_nodes_per_packet: int
llm_max_soft_pairs_per_packet: int
llm_max_diff_chars_per_item: int
llm_max_filenames_per_item: int
llm_skip_evaluator_above_tokens: int
llm_overflow_policy: str
@property
def value(self) -> str:
return hashlib.sha256(_canonical_json_bytes(self.to_json())).hexdigest()
def to_json(self) -> dict[str, Any]:
return {
"llm_max_input_tokens": self.llm_max_input_tokens,
"llm_max_nodes_per_packet": self.llm_max_nodes_per_packet,
"llm_max_soft_pairs_per_packet": self.llm_max_soft_pairs_per_packet,
"llm_max_diff_chars_per_item": self.llm_max_diff_chars_per_item,
"llm_max_filenames_per_item": self.llm_max_filenames_per_item,
"llm_skip_evaluator_above_tokens": self.llm_skip_evaluator_above_tokens,
"llm_overflow_policy": self.llm_overflow_policy,
}
@classmethod
def from_json(cls, payload: dict[str, Any]) -> HybridReviewSettingsFingerprint:
return cls(
llm_max_input_tokens=int(payload["llm_max_input_tokens"]),
llm_max_nodes_per_packet=int(payload["llm_max_nodes_per_packet"]),
llm_max_soft_pairs_per_packet=int(payload["llm_max_soft_pairs_per_packet"]),
llm_max_diff_chars_per_item=int(payload["llm_max_diff_chars_per_item"]),
llm_max_filenames_per_item=int(payload["llm_max_filenames_per_item"]),
llm_skip_evaluator_above_tokens=int(payload["llm_skip_evaluator_above_tokens"]),
llm_overflow_policy=str(payload["llm_overflow_policy"]),
)
@dataclass(frozen=True, slots=True)
class HybridReviewCacheManifest:
cache_schema_version: str
prepared_review_unit_schema_version: str
analyst_prompt_version: str
evaluator_prompt_version: str
hybrid_review_settings: HybridReviewSettingsFingerprint
@property
def hybrid_review_settings_fingerprint(self) -> str:
return self.hybrid_review_settings.value
def to_json(self) -> dict[str, Any]:
return {
"cache_schema_version": self.cache_schema_version,
"prepared_review_unit_schema_version": self.prepared_review_unit_schema_version,
"analyst_prompt_version": self.analyst_prompt_version,
"evaluator_prompt_version": self.evaluator_prompt_version,
"hybrid_review_settings": self.hybrid_review_settings.to_json(),
"hybrid_review_settings_fingerprint": self.hybrid_review_settings_fingerprint,
}
@classmethod
def from_json(cls, payload: dict[str, Any]) -> HybridReviewCacheManifest:
return cls(
cache_schema_version=str(payload["cache_schema_version"]),
prepared_review_unit_schema_version=str(payload["prepared_review_unit_schema_version"]),
analyst_prompt_version=str(payload["analyst_prompt_version"]),
evaluator_prompt_version=str(payload["evaluator_prompt_version"]),
hybrid_review_settings=HybridReviewSettingsFingerprint.from_json(
payload["hybrid_review_settings"]
),
)
@dataclass(frozen=True, slots=True)
class HybridReviewCacheKey:
cache_schema_version: str
prepared_review_unit_schema_version: str
analyst_prompt_version: str
evaluator_prompt_version: str
hybrid_review_settings_fingerprint: str
model: str
prepared_review_unit_hash: str
def to_json(self) -> dict[str, Any]:
return {
"cache_schema_version": self.cache_schema_version,
"prepared_review_unit_schema_version": self.prepared_review_unit_schema_version,
"analyst_prompt_version": self.analyst_prompt_version,
"evaluator_prompt_version": self.evaluator_prompt_version,
"hybrid_review_settings_fingerprint": self.hybrid_review_settings_fingerprint,
"model": self.model,
"prepared_review_unit_hash": self.prepared_review_unit_hash,
}
@classmethod
def from_json(cls, payload: dict[str, Any]) -> HybridReviewCacheKey:
return cls(
cache_schema_version=str(payload["cache_schema_version"]),
prepared_review_unit_schema_version=str(payload["prepared_review_unit_schema_version"]),
analyst_prompt_version=str(payload["analyst_prompt_version"]),
evaluator_prompt_version=str(payload["evaluator_prompt_version"]),
hybrid_review_settings_fingerprint=str(payload["hybrid_review_settings_fingerprint"]),
model=str(payload["model"]),
prepared_review_unit_hash=str(payload["prepared_review_unit_hash"]),
)
@dataclass(frozen=True, slots=True)
class HybridReviewCacheEntry:
key: HybridReviewCacheKey
result: dict[str, Any]
cached_at: str
nodes: tuple[str, ...] = ()
soft_pairs: tuple[str, ...] = ()
budget: dict[str, int] | None = None
split: bool = False
trimmed: bool = False
aggressively_trimmed: bool = False
def to_json(self) -> dict[str, Any]:
return {
"key": self.key.to_json(),
"result": self.result,
"cached_at": self.cached_at,
"nodes": list(self.nodes),
"soft_pairs": list(self.soft_pairs),
"budget": self.budget,
"split": self.split,
"trimmed": self.trimmed,
"aggressively_trimmed": self.aggressively_trimmed,
}
@classmethod
def from_json(cls, payload: dict[str, Any]) -> HybridReviewCacheEntry:
return cls(
key=HybridReviewCacheKey.from_json(payload["key"]),
result=dict(payload["result"]),
cached_at=str(payload["cached_at"]),
nodes=tuple(str(node) for node in payload.get("nodes") or []),
soft_pairs=tuple(str(pair) for pair in payload.get("soft_pairs") or []),
budget=(
None
if payload.get("budget") is None
else {str(key): int(value) for key, value in dict(payload["budget"]).items()}
),
split=bool(payload.get("split", False)),
trimmed=bool(payload.get("trimmed", False)),
aggressively_trimmed=bool(payload.get("aggressively_trimmed", False)),
)
def prepared_review_unit_hash(prepared_review_unit: dict[str, Any]) -> str:
normalized = _normalize_prepared_review_unit_for_hash(prepared_review_unit)
return hashlib.sha256(_canonical_json_bytes(normalized)).hexdigest()
def build_hybrid_review_cache_key(
*,
manifest: HybridReviewCacheManifest,
model: str,
prepared_review_unit: dict[str, Any],
) -> HybridReviewCacheKey:
return HybridReviewCacheKey(
cache_schema_version=manifest.cache_schema_version,
prepared_review_unit_schema_version=manifest.prepared_review_unit_schema_version,
analyst_prompt_version=manifest.analyst_prompt_version,
evaluator_prompt_version=manifest.evaluator_prompt_version,
hybrid_review_settings_fingerprint=manifest.hybrid_review_settings_fingerprint,
model=model,
prepared_review_unit_hash=prepared_review_unit_hash(prepared_review_unit),
)
def hybrid_review_cache_dir(snapshot_dir: Path) -> Path:
return snapshot_dir / "analysis-state"
class HybridReviewCacheStore:
def __init__(
self,
cache_dir: Path,
manifest: HybridReviewCacheManifest,
*,
enabled: bool = True,
) -> None:
self.cache_dir = cache_dir
self.manifest = manifest
self.enabled = enabled
self.invalidation_reason: str | None = None
self._entries: dict[HybridReviewCacheKey, HybridReviewCacheEntry] = {}
self._needs_reset = False
if self.enabled:
self._load()
@property
def manifest_path(self) -> Path:
return self.cache_dir / HYBRID_REVIEW_CACHE_MANIFEST_FILENAME
@property
def entries_path(self) -> Path:
return self.cache_dir / HYBRID_REVIEW_CACHE_ENTRIES_FILENAME
@property
def has_entries(self) -> bool:
return bool(self._entries)
def get(self, key: HybridReviewCacheKey) -> HybridReviewCacheEntry | None:
if not self.enabled:
return None
return self._entries.get(key)
def put(self, entry: HybridReviewCacheEntry) -> None:
if not self.enabled or entry.key in self._entries:
return
self._prepare_for_write()
with self.entries_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(entry.to_json(), sort_keys=True) + "\n")
self._entries[entry.key] = entry
def _load(self) -> None:
if not self.manifest_path.exists():
if self.entries_path.exists():
self.invalidation_reason = "missing_manifest"
self._needs_reset = True
return
try:
existing_manifest = HybridReviewCacheManifest.from_json(read_json(self.manifest_path))
except Exception:
self.invalidation_reason = "invalid_manifest"
self._needs_reset = True
return
if existing_manifest != self.manifest:
self.invalidation_reason = "manifest_mismatch"
self._needs_reset = True
return
if not self.entries_path.exists():
return
try:
with self.entries_path.open("r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
entry = HybridReviewCacheEntry.from_json(json.loads(line))
self._entries[entry.key] = entry
except Exception:
self._entries.clear()
self.invalidation_reason = "invalid_entries"
self._needs_reset = True
def _prepare_for_write(self) -> None:
self.cache_dir.mkdir(parents=True, exist_ok=True)
if self._needs_reset:
self.entries_path.write_text("", encoding="utf-8")
elif not self.entries_path.exists():
self.entries_path.touch()
if self._needs_reset or not self.manifest_path.exists():
write_json(self.manifest.to_json(), self.manifest_path)
self._needs_reset = False