from __future__ import annotations import hashlib import json from dataclasses import dataclass from pathlib import Path from typing import Any from slop_farmer.data.parquet_io import read_json, write_json HYBRID_REVIEW_CACHE_MANIFEST_FILENAME = "hybrid-review-cache-manifest.json" HYBRID_REVIEW_CACHE_ENTRIES_FILENAME = "hybrid-review-cache.jsonl" HYBRID_REVIEW_CACHE_SCHEMA_VERSION = "1.0" PREPARED_REVIEW_UNIT_SCHEMA_VERSION = "1.0" def _canonical_json_bytes(data: Any) -> bytes: return json.dumps( data, ensure_ascii=False, separators=(",", ":"), sort_keys=True, ).encode("utf-8") def _normalize_review_item_for_hash(item: dict[str, Any]) -> dict[str, Any]: normalized = dict(item) filenames = normalized.get("filenames") if filenames is not None: normalized["filenames"] = sorted(str(filename) for filename in filenames) explicit_issue_targets = normalized.get("explicit_issue_targets") if explicit_issue_targets is not None: normalized["explicit_issue_targets"] = sorted( int(target) for target in explicit_issue_targets ) return normalized def _normalize_soft_pair_for_hash(pair: dict[str, Any]) -> dict[str, Any]: normalized = dict(pair) evidence_types = normalized.get("evidence_types") if evidence_types is not None: normalized["evidence_types"] = sorted(str(value) for value in evidence_types) shared_targets = normalized.get("shared_targets") if shared_targets is not None: normalized["shared_targets"] = sorted(int(target) for target in shared_targets) shared_filenames = normalized.get("shared_filenames") if shared_filenames is not None: normalized["shared_filenames"] = sorted(str(filename) for filename in shared_filenames) return normalized def _normalize_prepared_review_unit_for_hash( prepared_review_unit: dict[str, Any], ) -> dict[str, Any]: normalized = dict(prepared_review_unit) packet = dict(normalized.get("packet") or {}) packet["nodes"] = sorted(str(node) for node in packet.get("nodes") or []) packet["items"] = sorted( (_normalize_review_item_for_hash(dict(item)) for item in packet.get("items") or []), key=lambda item: str(item.get("node_id") or ""), ) packet["pair_evidence"] = { str(pair): sorted(str(value) for value in values) for pair, values in sorted(dict(packet.get("pair_evidence") or {}).items()) } packet["soft_pairs"] = sorted( (_normalize_soft_pair_for_hash(dict(pair)) for pair in packet.get("soft_pairs") or []), key=lambda pair: ( str(pair.get("left") or ""), str(pair.get("right") or ""), ), ) normalized["packet"] = packet return normalized @dataclass(frozen=True, slots=True) class HybridReviewSettingsFingerprint: llm_max_input_tokens: int llm_max_nodes_per_packet: int llm_max_soft_pairs_per_packet: int llm_max_diff_chars_per_item: int llm_max_filenames_per_item: int llm_skip_evaluator_above_tokens: int llm_overflow_policy: str @property def value(self) -> str: return hashlib.sha256(_canonical_json_bytes(self.to_json())).hexdigest() def to_json(self) -> dict[str, Any]: return { "llm_max_input_tokens": self.llm_max_input_tokens, "llm_max_nodes_per_packet": self.llm_max_nodes_per_packet, "llm_max_soft_pairs_per_packet": self.llm_max_soft_pairs_per_packet, "llm_max_diff_chars_per_item": self.llm_max_diff_chars_per_item, "llm_max_filenames_per_item": self.llm_max_filenames_per_item, "llm_skip_evaluator_above_tokens": self.llm_skip_evaluator_above_tokens, "llm_overflow_policy": self.llm_overflow_policy, } @classmethod def from_json(cls, payload: dict[str, Any]) -> HybridReviewSettingsFingerprint: return cls( llm_max_input_tokens=int(payload["llm_max_input_tokens"]), llm_max_nodes_per_packet=int(payload["llm_max_nodes_per_packet"]), llm_max_soft_pairs_per_packet=int(payload["llm_max_soft_pairs_per_packet"]), llm_max_diff_chars_per_item=int(payload["llm_max_diff_chars_per_item"]), llm_max_filenames_per_item=int(payload["llm_max_filenames_per_item"]), llm_skip_evaluator_above_tokens=int(payload["llm_skip_evaluator_above_tokens"]), llm_overflow_policy=str(payload["llm_overflow_policy"]), ) @dataclass(frozen=True, slots=True) class HybridReviewCacheManifest: cache_schema_version: str prepared_review_unit_schema_version: str analyst_prompt_version: str evaluator_prompt_version: str hybrid_review_settings: HybridReviewSettingsFingerprint @property def hybrid_review_settings_fingerprint(self) -> str: return self.hybrid_review_settings.value def to_json(self) -> dict[str, Any]: return { "cache_schema_version": self.cache_schema_version, "prepared_review_unit_schema_version": self.prepared_review_unit_schema_version, "analyst_prompt_version": self.analyst_prompt_version, "evaluator_prompt_version": self.evaluator_prompt_version, "hybrid_review_settings": self.hybrid_review_settings.to_json(), "hybrid_review_settings_fingerprint": self.hybrid_review_settings_fingerprint, } @classmethod def from_json(cls, payload: dict[str, Any]) -> HybridReviewCacheManifest: return cls( cache_schema_version=str(payload["cache_schema_version"]), prepared_review_unit_schema_version=str(payload["prepared_review_unit_schema_version"]), analyst_prompt_version=str(payload["analyst_prompt_version"]), evaluator_prompt_version=str(payload["evaluator_prompt_version"]), hybrid_review_settings=HybridReviewSettingsFingerprint.from_json( payload["hybrid_review_settings"] ), ) @dataclass(frozen=True, slots=True) class HybridReviewCacheKey: cache_schema_version: str prepared_review_unit_schema_version: str analyst_prompt_version: str evaluator_prompt_version: str hybrid_review_settings_fingerprint: str model: str prepared_review_unit_hash: str def to_json(self) -> dict[str, Any]: return { "cache_schema_version": self.cache_schema_version, "prepared_review_unit_schema_version": self.prepared_review_unit_schema_version, "analyst_prompt_version": self.analyst_prompt_version, "evaluator_prompt_version": self.evaluator_prompt_version, "hybrid_review_settings_fingerprint": self.hybrid_review_settings_fingerprint, "model": self.model, "prepared_review_unit_hash": self.prepared_review_unit_hash, } @classmethod def from_json(cls, payload: dict[str, Any]) -> HybridReviewCacheKey: return cls( cache_schema_version=str(payload["cache_schema_version"]), prepared_review_unit_schema_version=str(payload["prepared_review_unit_schema_version"]), analyst_prompt_version=str(payload["analyst_prompt_version"]), evaluator_prompt_version=str(payload["evaluator_prompt_version"]), hybrid_review_settings_fingerprint=str(payload["hybrid_review_settings_fingerprint"]), model=str(payload["model"]), prepared_review_unit_hash=str(payload["prepared_review_unit_hash"]), ) @dataclass(frozen=True, slots=True) class HybridReviewCacheEntry: key: HybridReviewCacheKey result: dict[str, Any] cached_at: str nodes: tuple[str, ...] = () soft_pairs: tuple[str, ...] = () budget: dict[str, int] | None = None split: bool = False trimmed: bool = False aggressively_trimmed: bool = False def to_json(self) -> dict[str, Any]: return { "key": self.key.to_json(), "result": self.result, "cached_at": self.cached_at, "nodes": list(self.nodes), "soft_pairs": list(self.soft_pairs), "budget": self.budget, "split": self.split, "trimmed": self.trimmed, "aggressively_trimmed": self.aggressively_trimmed, } @classmethod def from_json(cls, payload: dict[str, Any]) -> HybridReviewCacheEntry: return cls( key=HybridReviewCacheKey.from_json(payload["key"]), result=dict(payload["result"]), cached_at=str(payload["cached_at"]), nodes=tuple(str(node) for node in payload.get("nodes") or []), soft_pairs=tuple(str(pair) for pair in payload.get("soft_pairs") or []), budget=( None if payload.get("budget") is None else {str(key): int(value) for key, value in dict(payload["budget"]).items()} ), split=bool(payload.get("split", False)), trimmed=bool(payload.get("trimmed", False)), aggressively_trimmed=bool(payload.get("aggressively_trimmed", False)), ) def prepared_review_unit_hash(prepared_review_unit: dict[str, Any]) -> str: normalized = _normalize_prepared_review_unit_for_hash(prepared_review_unit) return hashlib.sha256(_canonical_json_bytes(normalized)).hexdigest() def build_hybrid_review_cache_key( *, manifest: HybridReviewCacheManifest, model: str, prepared_review_unit: dict[str, Any], ) -> HybridReviewCacheKey: return HybridReviewCacheKey( cache_schema_version=manifest.cache_schema_version, prepared_review_unit_schema_version=manifest.prepared_review_unit_schema_version, analyst_prompt_version=manifest.analyst_prompt_version, evaluator_prompt_version=manifest.evaluator_prompt_version, hybrid_review_settings_fingerprint=manifest.hybrid_review_settings_fingerprint, model=model, prepared_review_unit_hash=prepared_review_unit_hash(prepared_review_unit), ) def hybrid_review_cache_dir(snapshot_dir: Path) -> Path: return snapshot_dir / "analysis-state" class HybridReviewCacheStore: def __init__( self, cache_dir: Path, manifest: HybridReviewCacheManifest, *, enabled: bool = True, ) -> None: self.cache_dir = cache_dir self.manifest = manifest self.enabled = enabled self.invalidation_reason: str | None = None self._entries: dict[HybridReviewCacheKey, HybridReviewCacheEntry] = {} self._needs_reset = False if self.enabled: self._load() @property def manifest_path(self) -> Path: return self.cache_dir / HYBRID_REVIEW_CACHE_MANIFEST_FILENAME @property def entries_path(self) -> Path: return self.cache_dir / HYBRID_REVIEW_CACHE_ENTRIES_FILENAME @property def has_entries(self) -> bool: return bool(self._entries) def get(self, key: HybridReviewCacheKey) -> HybridReviewCacheEntry | None: if not self.enabled: return None return self._entries.get(key) def put(self, entry: HybridReviewCacheEntry) -> None: if not self.enabled or entry.key in self._entries: return self._prepare_for_write() with self.entries_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(entry.to_json(), sort_keys=True) + "\n") self._entries[entry.key] = entry def _load(self) -> None: if not self.manifest_path.exists(): if self.entries_path.exists(): self.invalidation_reason = "missing_manifest" self._needs_reset = True return try: existing_manifest = HybridReviewCacheManifest.from_json(read_json(self.manifest_path)) except Exception: self.invalidation_reason = "invalid_manifest" self._needs_reset = True return if existing_manifest != self.manifest: self.invalidation_reason = "manifest_mismatch" self._needs_reset = True return if not self.entries_path.exists(): return try: with self.entries_path.open("r", encoding="utf-8") as handle: for line in handle: line = line.strip() if not line: continue entry = HybridReviewCacheEntry.from_json(json.loads(line)) self._entries[entry.key] = entry except Exception: self._entries.clear() self.invalidation_reason = "invalid_entries" self._needs_reset = True def _prepare_for_write(self) -> None: self.cache_dir.mkdir(parents=True, exist_ok=True) if self._needs_reset: self.entries_path.write_text("", encoding="utf-8") elif not self.entries_path.exists(): self.entries_path.touch() if self._needs_reset or not self.manifest_path.exists(): write_json(self.manifest.to_json(), self.manifest_path) self._needs_reset = False