File size: 6,860 Bytes
ccbe063 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | """Tests for sf_cluster: shapes, determinism, in-pool guarantee."""
from __future__ import annotations
import os
import sys
from pathlib import Path
import numpy as np
import pytest
# Allow `python -m pytest tests/` from the repo root before installing.
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from sf_cluster import ( # noqa: E402
contrast_hvlv,
high_variance_mask,
method_gradient,
method_mosaic,
pool_msa,
read_a3m,
write_a3m,
)
from sf_cluster.methods import N_SUBSETS, TARGET_SIZE # noqa: E402
# ---------------------------------------------------------------------------
# fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def synthetic_pool(tmp_path):
"""Synthetic A3M + FI matrix written to disk; returns paths."""
rng = np.random.default_rng(0)
N, L = 200, 50
alphabet = np.array(list("ACDEFGHIKLMNPQRSTVWY-"))
seqs = rng.choice(alphabet, size=(N, L))
a3m_path = tmp_path / "syn.a3m"
with open(a3m_path, "w") as f:
f.write(f"#{L}\t1\n")
for i, row in enumerate(seqs):
tag = "query" if i == 0 else f"seq{i:04d}"
f.write(f">{tag}\n{''.join(row)}\n")
fi = rng.normal(0, 0.3, size=(N, L)).astype(np.float64)
hv_cols = rng.choice(L, size=L // 5, replace=False)
fi[:, hv_cols] += rng.normal(0, 1.5, size=(N, len(hv_cols)))
fi_path = tmp_path / "syn_fi.npy"
np.save(fi_path, fi)
return a3m_path, fi_path, N, L
# ---------------------------------------------------------------------------
# pool / a3m
# ---------------------------------------------------------------------------
def test_a3m_roundtrip(tmp_path):
p = tmp_path / "rt.a3m"
write_a3m(p, "#5\t1", [("query", "ACDEF"), ("h2 desc", "ACDef")])
hl, seqs = read_a3m(p)
assert hl == "#5\t1"
assert seqs == [("query", "ACDEF"), ("h2 desc", "ACDef")]
def test_pool_shapes(synthetic_pool):
a3m, fi, N, L = synthetic_pool
pool = pool_msa(a3m, fi)
assert pool.n_seq == N
assert pool.n_cols == L
assert pool.fi_matrix.shape == (N, L)
assert len(pool.sequences) == N
assert pool.headers[0] == "query"
def test_pool_rejects_shape_mismatch(tmp_path, synthetic_pool):
a3m, fi, N, L = synthetic_pool
bad = tmp_path / "bad_fi.npy"
np.save(bad, np.zeros((N + 1, L)))
with pytest.raises(ValueError, match="FI rows"):
pool_msa(a3m, bad)
# ---------------------------------------------------------------------------
# score
# ---------------------------------------------------------------------------
def test_hv_mask_fraction():
rng = np.random.default_rng(1)
F = rng.normal(size=(100, 50))
hv = high_variance_mask(F, percentile=80)
# At p=80 we expect ~20% True (allow some slack since percentile is a
# threshold, not an exact split).
frac = hv.mean()
assert 0.1 <= frac <= 0.4
def test_contrast_hvlv_shape_and_finite(synthetic_pool):
a3m, fi, N, L = synthetic_pool
pool = pool_msa(a3m, fi)
score = contrast_hvlv(pool.fi_matrix)
assert score.shape == (N,)
assert np.all(np.isfinite(score))
# ---------------------------------------------------------------------------
# methods: mosaic
# ---------------------------------------------------------------------------
def test_mosaic_shapes(synthetic_pool):
a3m, fi, N, _ = synthetic_pool
pool = pool_msa(a3m, fi)
score = contrast_hvlv(pool.fi_matrix)
subs = method_mosaic(score)
assert len(subs) == N_SUBSETS
for s in subs:
assert len(s) == TARGET_SIZE
def test_mosaic_determinism(synthetic_pool):
a3m, fi, _, _ = synthetic_pool
pool = pool_msa(a3m, fi)
score = contrast_hvlv(pool.fi_matrix)
a = method_mosaic(score)
b = method_mosaic(score)
assert a == b
def test_mosaic_in_pool(synthetic_pool):
a3m, fi, N, _ = synthetic_pool
pool = pool_msa(a3m, fi)
score = contrast_hvlv(pool.fi_matrix)
subs = method_mosaic(score)
for s in subs:
assert all(0 <= i < N for i in s), "out-of-pool index in mosaic subset"
def test_mosaic_tier_composition(synthetic_pool):
"""High tier draws should come from upper third of sorted score."""
a3m, fi, N, _ = synthetic_pool
pool = pool_msa(a3m, fi)
score = contrast_hvlv(pool.fi_matrix)
sorted_idx = np.argsort(score)
high_set = set(sorted_idx[2 * N // 3:].tolist())
low_set = set(sorted_idx[: N // 3].tolist())
mid_set = set(sorted_idx[N // 3: 2 * N // 3].tolist())
subs = method_mosaic(score)
# First 11 = high, next 11 = low, last 10 = mid.
for s in subs:
assert all(i in high_set for i in s[:11])
assert all(i in low_set for i in s[11:22])
assert all(i in mid_set for i in s[22:32])
# ---------------------------------------------------------------------------
# methods: gradient
# ---------------------------------------------------------------------------
def test_gradient_shapes(synthetic_pool):
a3m, fi, _, _ = synthetic_pool
pool = pool_msa(a3m, fi)
score = contrast_hvlv(pool.fi_matrix)
subs = method_gradient(score)
assert len(subs) == N_SUBSETS
for s in subs:
assert len(s) == TARGET_SIZE
def test_gradient_determinism(synthetic_pool):
a3m, fi, _, _ = synthetic_pool
pool = pool_msa(a3m, fi)
score = contrast_hvlv(pool.fi_matrix)
a = method_gradient(score)
b = method_gradient(score)
assert a == b
def test_gradient_in_pool_and_homogeneous(synthetic_pool):
a3m, fi, N, _ = synthetic_pool
pool = pool_msa(a3m, fi)
score = contrast_hvlv(pool.fi_matrix)
sorted_idx = np.argsort(score)
bins = []
for b in range(4):
bins.append(set(sorted_idx[(b * N) // 4: ((b + 1) * N) // 4].tolist()))
subs = method_gradient(score)
for grp_i in range(4):
for s_i in range(3):
sub = subs[grp_i * 3 + s_i]
assert all(0 <= i < N for i in sub), "out-of-pool index"
assert all(i in bins[grp_i] for i in sub), \
f"gradient subset {grp_i*3+s_i} leaked outside quartile {grp_i}"
# ---------------------------------------------------------------------------
# CLI smoke
# ---------------------------------------------------------------------------
def test_cli_build_smoke(tmp_path, synthetic_pool):
from sf_cluster.cli import main as cli_main
a3m, fi, _, _ = synthetic_pool
out = tmp_path / "subs_mosaic"
rc = cli_main([
"build",
"--a3m", str(a3m),
"--fi", str(fi),
"--method", "mosaic",
"--out", str(out),
])
assert rc == 0
files = sorted(out.glob("mosaic_subset_*.a3m"))
assert len(files) == N_SUBSETS
assert (out / "mosaic_subset_index.tsv").exists()
assert (out / "mosaic_meta.json").exists()
|