Spaces:
Running
Running
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any | |
| from slop_farmer.data.parquet_io import read_json, write_json | |
| HYBRID_REVIEW_CACHE_MANIFEST_FILENAME = "hybrid-review-cache-manifest.json" | |
| HYBRID_REVIEW_CACHE_ENTRIES_FILENAME = "hybrid-review-cache.jsonl" | |
| HYBRID_REVIEW_CACHE_SCHEMA_VERSION = "1.0" | |
| PREPARED_REVIEW_UNIT_SCHEMA_VERSION = "1.0" | |
| def _canonical_json_bytes(data: Any) -> bytes: | |
| return json.dumps( | |
| data, | |
| ensure_ascii=False, | |
| separators=(",", ":"), | |
| sort_keys=True, | |
| ).encode("utf-8") | |
| def _normalize_review_item_for_hash(item: dict[str, Any]) -> dict[str, Any]: | |
| normalized = dict(item) | |
| filenames = normalized.get("filenames") | |
| if filenames is not None: | |
| normalized["filenames"] = sorted(str(filename) for filename in filenames) | |
| explicit_issue_targets = normalized.get("explicit_issue_targets") | |
| if explicit_issue_targets is not None: | |
| normalized["explicit_issue_targets"] = sorted( | |
| int(target) for target in explicit_issue_targets | |
| ) | |
| return normalized | |
| def _normalize_soft_pair_for_hash(pair: dict[str, Any]) -> dict[str, Any]: | |
| normalized = dict(pair) | |
| evidence_types = normalized.get("evidence_types") | |
| if evidence_types is not None: | |
| normalized["evidence_types"] = sorted(str(value) for value in evidence_types) | |
| shared_targets = normalized.get("shared_targets") | |
| if shared_targets is not None: | |
| normalized["shared_targets"] = sorted(int(target) for target in shared_targets) | |
| shared_filenames = normalized.get("shared_filenames") | |
| if shared_filenames is not None: | |
| normalized["shared_filenames"] = sorted(str(filename) for filename in shared_filenames) | |
| return normalized | |
| def _normalize_prepared_review_unit_for_hash( | |
| prepared_review_unit: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| normalized = dict(prepared_review_unit) | |
| packet = dict(normalized.get("packet") or {}) | |
| packet["nodes"] = sorted(str(node) for node in packet.get("nodes") or []) | |
| packet["items"] = sorted( | |
| (_normalize_review_item_for_hash(dict(item)) for item in packet.get("items") or []), | |
| key=lambda item: str(item.get("node_id") or ""), | |
| ) | |
| packet["pair_evidence"] = { | |
| str(pair): sorted(str(value) for value in values) | |
| for pair, values in sorted(dict(packet.get("pair_evidence") or {}).items()) | |
| } | |
| packet["soft_pairs"] = sorted( | |
| (_normalize_soft_pair_for_hash(dict(pair)) for pair in packet.get("soft_pairs") or []), | |
| key=lambda pair: ( | |
| str(pair.get("left") or ""), | |
| str(pair.get("right") or ""), | |
| ), | |
| ) | |
| normalized["packet"] = packet | |
| return normalized | |
| class HybridReviewSettingsFingerprint: | |
| llm_max_input_tokens: int | |
| llm_max_nodes_per_packet: int | |
| llm_max_soft_pairs_per_packet: int | |
| llm_max_diff_chars_per_item: int | |
| llm_max_filenames_per_item: int | |
| llm_skip_evaluator_above_tokens: int | |
| llm_overflow_policy: str | |
| def value(self) -> str: | |
| return hashlib.sha256(_canonical_json_bytes(self.to_json())).hexdigest() | |
| def to_json(self) -> dict[str, Any]: | |
| return { | |
| "llm_max_input_tokens": self.llm_max_input_tokens, | |
| "llm_max_nodes_per_packet": self.llm_max_nodes_per_packet, | |
| "llm_max_soft_pairs_per_packet": self.llm_max_soft_pairs_per_packet, | |
| "llm_max_diff_chars_per_item": self.llm_max_diff_chars_per_item, | |
| "llm_max_filenames_per_item": self.llm_max_filenames_per_item, | |
| "llm_skip_evaluator_above_tokens": self.llm_skip_evaluator_above_tokens, | |
| "llm_overflow_policy": self.llm_overflow_policy, | |
| } | |
| def from_json(cls, payload: dict[str, Any]) -> HybridReviewSettingsFingerprint: | |
| return cls( | |
| llm_max_input_tokens=int(payload["llm_max_input_tokens"]), | |
| llm_max_nodes_per_packet=int(payload["llm_max_nodes_per_packet"]), | |
| llm_max_soft_pairs_per_packet=int(payload["llm_max_soft_pairs_per_packet"]), | |
| llm_max_diff_chars_per_item=int(payload["llm_max_diff_chars_per_item"]), | |
| llm_max_filenames_per_item=int(payload["llm_max_filenames_per_item"]), | |
| llm_skip_evaluator_above_tokens=int(payload["llm_skip_evaluator_above_tokens"]), | |
| llm_overflow_policy=str(payload["llm_overflow_policy"]), | |
| ) | |
| class HybridReviewCacheManifest: | |
| cache_schema_version: str | |
| prepared_review_unit_schema_version: str | |
| analyst_prompt_version: str | |
| evaluator_prompt_version: str | |
| hybrid_review_settings: HybridReviewSettingsFingerprint | |
| def hybrid_review_settings_fingerprint(self) -> str: | |
| return self.hybrid_review_settings.value | |
| def to_json(self) -> dict[str, Any]: | |
| return { | |
| "cache_schema_version": self.cache_schema_version, | |
| "prepared_review_unit_schema_version": self.prepared_review_unit_schema_version, | |
| "analyst_prompt_version": self.analyst_prompt_version, | |
| "evaluator_prompt_version": self.evaluator_prompt_version, | |
| "hybrid_review_settings": self.hybrid_review_settings.to_json(), | |
| "hybrid_review_settings_fingerprint": self.hybrid_review_settings_fingerprint, | |
| } | |
| def from_json(cls, payload: dict[str, Any]) -> HybridReviewCacheManifest: | |
| return cls( | |
| cache_schema_version=str(payload["cache_schema_version"]), | |
| prepared_review_unit_schema_version=str(payload["prepared_review_unit_schema_version"]), | |
| analyst_prompt_version=str(payload["analyst_prompt_version"]), | |
| evaluator_prompt_version=str(payload["evaluator_prompt_version"]), | |
| hybrid_review_settings=HybridReviewSettingsFingerprint.from_json( | |
| payload["hybrid_review_settings"] | |
| ), | |
| ) | |
| class HybridReviewCacheKey: | |
| cache_schema_version: str | |
| prepared_review_unit_schema_version: str | |
| analyst_prompt_version: str | |
| evaluator_prompt_version: str | |
| hybrid_review_settings_fingerprint: str | |
| model: str | |
| prepared_review_unit_hash: str | |
| def to_json(self) -> dict[str, Any]: | |
| return { | |
| "cache_schema_version": self.cache_schema_version, | |
| "prepared_review_unit_schema_version": self.prepared_review_unit_schema_version, | |
| "analyst_prompt_version": self.analyst_prompt_version, | |
| "evaluator_prompt_version": self.evaluator_prompt_version, | |
| "hybrid_review_settings_fingerprint": self.hybrid_review_settings_fingerprint, | |
| "model": self.model, | |
| "prepared_review_unit_hash": self.prepared_review_unit_hash, | |
| } | |
| def from_json(cls, payload: dict[str, Any]) -> HybridReviewCacheKey: | |
| return cls( | |
| cache_schema_version=str(payload["cache_schema_version"]), | |
| prepared_review_unit_schema_version=str(payload["prepared_review_unit_schema_version"]), | |
| analyst_prompt_version=str(payload["analyst_prompt_version"]), | |
| evaluator_prompt_version=str(payload["evaluator_prompt_version"]), | |
| hybrid_review_settings_fingerprint=str(payload["hybrid_review_settings_fingerprint"]), | |
| model=str(payload["model"]), | |
| prepared_review_unit_hash=str(payload["prepared_review_unit_hash"]), | |
| ) | |
| class HybridReviewCacheEntry: | |
| key: HybridReviewCacheKey | |
| result: dict[str, Any] | |
| cached_at: str | |
| nodes: tuple[str, ...] = () | |
| soft_pairs: tuple[str, ...] = () | |
| budget: dict[str, int] | None = None | |
| split: bool = False | |
| trimmed: bool = False | |
| aggressively_trimmed: bool = False | |
| def to_json(self) -> dict[str, Any]: | |
| return { | |
| "key": self.key.to_json(), | |
| "result": self.result, | |
| "cached_at": self.cached_at, | |
| "nodes": list(self.nodes), | |
| "soft_pairs": list(self.soft_pairs), | |
| "budget": self.budget, | |
| "split": self.split, | |
| "trimmed": self.trimmed, | |
| "aggressively_trimmed": self.aggressively_trimmed, | |
| } | |
| def from_json(cls, payload: dict[str, Any]) -> HybridReviewCacheEntry: | |
| return cls( | |
| key=HybridReviewCacheKey.from_json(payload["key"]), | |
| result=dict(payload["result"]), | |
| cached_at=str(payload["cached_at"]), | |
| nodes=tuple(str(node) for node in payload.get("nodes") or []), | |
| soft_pairs=tuple(str(pair) for pair in payload.get("soft_pairs") or []), | |
| budget=( | |
| None | |
| if payload.get("budget") is None | |
| else {str(key): int(value) for key, value in dict(payload["budget"]).items()} | |
| ), | |
| split=bool(payload.get("split", False)), | |
| trimmed=bool(payload.get("trimmed", False)), | |
| aggressively_trimmed=bool(payload.get("aggressively_trimmed", False)), | |
| ) | |
| def prepared_review_unit_hash(prepared_review_unit: dict[str, Any]) -> str: | |
| normalized = _normalize_prepared_review_unit_for_hash(prepared_review_unit) | |
| return hashlib.sha256(_canonical_json_bytes(normalized)).hexdigest() | |
| def build_hybrid_review_cache_key( | |
| *, | |
| manifest: HybridReviewCacheManifest, | |
| model: str, | |
| prepared_review_unit: dict[str, Any], | |
| ) -> HybridReviewCacheKey: | |
| return HybridReviewCacheKey( | |
| cache_schema_version=manifest.cache_schema_version, | |
| prepared_review_unit_schema_version=manifest.prepared_review_unit_schema_version, | |
| analyst_prompt_version=manifest.analyst_prompt_version, | |
| evaluator_prompt_version=manifest.evaluator_prompt_version, | |
| hybrid_review_settings_fingerprint=manifest.hybrid_review_settings_fingerprint, | |
| model=model, | |
| prepared_review_unit_hash=prepared_review_unit_hash(prepared_review_unit), | |
| ) | |
| def hybrid_review_cache_dir(snapshot_dir: Path) -> Path: | |
| return snapshot_dir / "analysis-state" | |
| class HybridReviewCacheStore: | |
| def __init__( | |
| self, | |
| cache_dir: Path, | |
| manifest: HybridReviewCacheManifest, | |
| *, | |
| enabled: bool = True, | |
| ) -> None: | |
| self.cache_dir = cache_dir | |
| self.manifest = manifest | |
| self.enabled = enabled | |
| self.invalidation_reason: str | None = None | |
| self._entries: dict[HybridReviewCacheKey, HybridReviewCacheEntry] = {} | |
| self._needs_reset = False | |
| if self.enabled: | |
| self._load() | |
| def manifest_path(self) -> Path: | |
| return self.cache_dir / HYBRID_REVIEW_CACHE_MANIFEST_FILENAME | |
| def entries_path(self) -> Path: | |
| return self.cache_dir / HYBRID_REVIEW_CACHE_ENTRIES_FILENAME | |
| def has_entries(self) -> bool: | |
| return bool(self._entries) | |
| def get(self, key: HybridReviewCacheKey) -> HybridReviewCacheEntry | None: | |
| if not self.enabled: | |
| return None | |
| return self._entries.get(key) | |
| def put(self, entry: HybridReviewCacheEntry) -> None: | |
| if not self.enabled or entry.key in self._entries: | |
| return | |
| self._prepare_for_write() | |
| with self.entries_path.open("a", encoding="utf-8") as handle: | |
| handle.write(json.dumps(entry.to_json(), sort_keys=True) + "\n") | |
| self._entries[entry.key] = entry | |
| def _load(self) -> None: | |
| if not self.manifest_path.exists(): | |
| if self.entries_path.exists(): | |
| self.invalidation_reason = "missing_manifest" | |
| self._needs_reset = True | |
| return | |
| try: | |
| existing_manifest = HybridReviewCacheManifest.from_json(read_json(self.manifest_path)) | |
| except Exception: | |
| self.invalidation_reason = "invalid_manifest" | |
| self._needs_reset = True | |
| return | |
| if existing_manifest != self.manifest: | |
| self.invalidation_reason = "manifest_mismatch" | |
| self._needs_reset = True | |
| return | |
| if not self.entries_path.exists(): | |
| return | |
| try: | |
| with self.entries_path.open("r", encoding="utf-8") as handle: | |
| for line in handle: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| entry = HybridReviewCacheEntry.from_json(json.loads(line)) | |
| self._entries[entry.key] = entry | |
| except Exception: | |
| self._entries.clear() | |
| self.invalidation_reason = "invalid_entries" | |
| self._needs_reset = True | |
| def _prepare_for_write(self) -> None: | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| if self._needs_reset: | |
| self.entries_path.write_text("", encoding="utf-8") | |
| elif not self.entries_path.exists(): | |
| self.entries_path.touch() | |
| if self._needs_reset or not self.manifest_path.exists(): | |
| write_json(self.manifest.to_json(), self.manifest_path) | |
| self._needs_reset = False | |