Jayce-Ping's picture
Add files using upload-large-folder tool
7cdb0ca verified
"""
Sequence Prediction Evaluation with QwenImageEditPlusPipeline / Flux2KleinPipeline.
Evaluates the model's ability to predict the next number in a sequence
by generating images and extracting answers via OCR.
"""
import json
import re
from pathlib import Path
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
import torch
from PIL import Image
from tqdm import tqdm
class ModelType(str, Enum):
QWEN_IMAGE_EDIT = "qwen"
FLUX2_KLEIN = "flux2-klein"
@dataclass
class EvalConfig:
"""Evaluation configuration."""
dataset_dir: str = "sequence_dataset"
output_dir: str = "eval_results"
# Model selection
model_type: ModelType = ModelType.QWEN_IMAGE_EDIT
model_id: str = "" # Auto-set based on model_type if empty
# Prompts
prompt: str = (
"Based on the number patterns shown in the previous images, "
"fill in the missing number in the empty cell of the last image."
)
negative_prompt: str = ""
# Generation params
num_inference_steps: int = 5
guidance_scale: float = 1.0
true_cfg_scale: float = 4.0 # For Qwen
height: int = 210
width: int = 750
seed: int = 42
device: str = "cuda"
dtype: torch.dtype = field(default_factory=lambda: torch.bfloat16)
def __post_init__(self):
"""Set default model_id based on model_type."""
if not self.model_id:
if self.model_type == ModelType.QWEN_IMAGE_EDIT:
self.model_id = "Qwen/Qwen-Image-Edit-2509"
elif self.model_type == ModelType.FLUX2_KLEIN:
self.model_id = "black-forest-labs/FLUX.2-klein-9B"
class OCRExtractor:
"""Extract numbers from grid images using OCR."""
def __init__(self, backend: str = "easyocr"):
"""
Args:
backend: OCR backend ("easyocr" or "pytesseract").
"""
self.backend = backend
if backend == "easyocr":
import easyocr
self.reader = easyocr.Reader(['en'], gpu=torch.cuda.is_available())
elif backend == "pytesseract":
import pytesseract
self.pytesseract = pytesseract
else:
raise ValueError(f"Unknown backend: {backend}")
def extract_last_number(self, image: Image.Image) -> int | None:
"""
Extract the last (rightmost) number from a grid image.
Args:
image: PIL Image of the number grid.
Returns:
Extracted number or None if extraction fails.
"""
w, h = image.size
cell_crop = image.crop((w * 3 // 4, 0, w, h))
cell_array = np.array(cell_crop)
if self.backend == "easyocr":
results = self.reader.readtext(cell_array)
for _, text, conf in results:
digits = re.findall(r'-?\d+', text)
if digits:
return int(digits[0])
elif self.backend == "pytesseract":
text = self.pytesseract.image_to_string(
cell_crop, config='--psm 7 -c tessedit_char_whitelist=0123456789-'
)
digits = re.findall(r'-?\d+', text)
if digits:
return int(digits[0])
return None
def extract_all_numbers(self, image: Image.Image, num_cells: int = 4) -> list[int | None]:
"""Extract all numbers from a grid image."""
w, h = image.size
cell_width = w // num_cells
numbers = []
for i in range(num_cells):
cell_crop = image.crop((i * cell_width, 0, (i + 1) * cell_width, h))
cell_array = np.array(cell_crop)
if self.backend == "easyocr":
results = self.reader.readtext(cell_array)
num = None
for _, text, conf in results:
digits = re.findall(r'-?\d+', text)
if digits:
num = int(digits[0])
break
numbers.append(num)
elif self.backend == "pytesseract":
text = self.pytesseract.image_to_string(
cell_crop, config='--psm 7 -c tessedit_char_whitelist=0123456789-'
)
digits = re.findall(r'-?\d+', text)
numbers.append(int(digits[0]) if digits else None)
return numbers
class SequenceEvaluator:
"""Evaluator for sequence prediction task."""
def __init__(self, config: EvalConfig):
self.config = config
self.output_dir = Path(config.output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
# Load pipeline based on model type
self.pipeline = self._load_pipeline()
# Initialize OCR
self.ocr = OCRExtractor(backend="easyocr")
def _load_pipeline(self):
"""Load pipeline based on model type."""
if self.config.model_type == ModelType.QWEN_IMAGE_EDIT:
return self._load_qwen_pipeline()
elif self.config.model_type == ModelType.FLUX2_KLEIN:
return self._load_flux2_klein_pipeline()
else:
raise ValueError(f"Unknown model type: {self.config.model_type}")
def _load_qwen_pipeline(self):
"""Load QwenImageEditPlusPipeline."""
from diffusers import QwenImageEditPlusPipeline
pipeline = QwenImageEditPlusPipeline.from_pretrained(
self.config.model_id,
torch_dtype=self.config.dtype,
)
pipeline.to(self.config.device)
pipeline.set_progress_bar_config(disable=True)
return pipeline
def _load_flux2_klein_pipeline(self):
"""Load Flux2KleinPipeline."""
from diffusers import Flux2KleinPipeline
pipeline = Flux2KleinPipeline.from_pretrained(
self.config.model_id,
torch_dtype=self.config.dtype,
)
pipeline.enable_model_cpu_offload()
pipeline.set_progress_bar_config(disable=True)
return pipeline
def _load_images(self, image_paths: list[str], image_dir: Path) -> list[Image.Image]:
"""Load images from paths."""
return [Image.open(image_dir / p).convert("RGB") for p in image_paths]
def predict(self, images: list[Image.Image]) -> Image.Image:
"""
Generate prediction image given input images.
Args:
images: List of input images (context + query).
Returns:
Generated image with predicted number.
"""
generator = torch.Generator(device=self.config.device).manual_seed(self.config.seed)
if self.config.model_type == ModelType.QWEN_IMAGE_EDIT:
inputs = {
"image": images,
"prompt": self.config.prompt,
"generator": generator,
"true_cfg_scale": self.config.true_cfg_scale,
"negative_prompt": self.config.negative_prompt,
"num_inference_steps": self.config.num_inference_steps,
}
elif self.config.model_type == ModelType.FLUX2_KLEIN:
# Flux2Klein uses image parameter for multi-image editing
inputs = {
"image": images,
"prompt": self.config.prompt,
"generator": generator,
"guidance_scale": self.config.guidance_scale,
"num_inference_steps": self.config.num_inference_steps,
"height": self.config.height,
"width": self.config.width,
}
with torch.inference_mode():
output = self.pipeline(**inputs)
return output.images[0]
def evaluate_sample(self, sample: dict, image_dir: Path) -> dict:
"""
Evaluate a single sample.
Args:
sample: Sample metadata dict.
image_dir: Directory containing images.
Returns:
Evaluation result dict.
"""
# Load input images (all available in test set)
images = self._load_images(sample["images"], image_dir)
# Generate prediction
pred_image = self.predict(images)
# Save prediction image
pred_path = self.output_dir / f"{sample['id']:05d}_pred.png"
pred_image.save(pred_path)
# Extract predicted number via OCR
pred_number = self.ocr.extract_last_number(pred_image)
# Get ground truth
gt_number = sample["answer"]
# Check correctness
correct = pred_number == gt_number
return {
"id": sample["id"],
"seq_type": sample["seq_type"],
"gt_answer": gt_number,
"pred_answer": pred_number,
"correct": correct,
"pred_image": str(pred_path),
}
def evaluate(self, split: str = "test") -> dict:
"""
Evaluate on entire dataset split.
Args:
split: Dataset split ("train" or "test").
Returns:
Evaluation results summary.
"""
dataset_dir = Path(self.config.dataset_dir)
# Load metadata
with open(dataset_dir / f"{split}.json") as f:
samples = json.load(f)
image_dir = dataset_dir / split / "images"
results = []
for sample in tqdm(samples, desc=f"Evaluating {split}"):
result = self.evaluate_sample(sample, image_dir)
results.append(result)
# Compute metrics
total = len(results)
correct = sum(r["correct"] for r in results)
accuracy = correct / total if total > 0 else 0.0
# Per-type accuracy
type_stats = {}
for r in results:
seq_type = r["seq_type"]
if seq_type not in type_stats:
type_stats[seq_type] = {"correct": 0, "total": 0}
type_stats[seq_type]["total"] += 1
if r["correct"]:
type_stats[seq_type]["correct"] += 1
type_accuracy = {
k: v["correct"] / v["total"] for k, v in type_stats.items()
}
summary = {
"split": split,
"model_type": self.config.model_type.value,
"model_id": self.config.model_id,
"total": total,
"correct": correct,
"accuracy": accuracy,
"type_accuracy": type_accuracy,
"results": results,
}
# Save results
with open(self.output_dir / f"{split}_results.json", "w") as f:
json.dump(summary, f, indent=2)
return summary
def main():
"""Run evaluation."""
import argparse
parser = argparse.ArgumentParser(description="Sequence Prediction Evaluation")
parser.add_argument("--model", type=str, default="flux2-klein",
choices=["qwen", "flux2-klein"],
help="Model type to use")
parser.add_argument("--model-id", type=str, default="",
help="Custom model ID (optional)")
parser.add_argument("--dataset-dir", type=str, default="sequence_dataset",
help="Dataset directory")
parser.add_argument("--output-dir", type=str, default="eval_results",
help="Output directory")
parser.add_argument("--steps", type=int, default=50,
help="Number of inference steps")
parser.add_argument("--seed", type=int, default=42,
help="Random seed")
args = parser.parse_args()
config = EvalConfig(
dataset_dir=args.dataset_dir,
output_dir=args.output_dir,
model_type=ModelType(args.model),
model_id=args.model_id,
num_inference_steps=args.steps,
seed=args.seed,
)
print(f"Model: {config.model_type.value} ({config.model_id})")
evaluator = SequenceEvaluator(config)
results = evaluator.evaluate("test")
print(f"\n{'='*50}")
print(f"Evaluation Results ({config.model_type.value})")
print(f"{'='*50}")
print(f"Total samples: {results['total']}")
print(f"Correct: {results['correct']}")
print(f"Accuracy: {results['accuracy']:.2%}")
print(f"\nPer-type accuracy:")
for seq_type, acc in sorted(results["type_accuracy"].items()):
print(f" {seq_type}: {acc:.2%}")
if __name__ == "__main__":
main()