| """
|
| API Security Limits Tests
|
| ========================
|
| Comprehensive tests for input validation and rate limiting.
|
| """
|
|
|
| import pytest
|
| from fastapi.testclient import TestClient
|
| from unittest.mock import MagicMock, patch, AsyncMock
|
| import sys
|
| import os
|
|
|
|
|
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
| from mnemocore.core.config import reset_config
|
|
|
| API_KEY = "test-key"
|
|
|
|
|
| mock_engine_cls = MagicMock()
|
| mock_engine_instance = MagicMock()
|
| mock_engine_instance.get_stats = AsyncMock(return_value={"status": "ok"})
|
| mock_engine_instance.get_memory = AsyncMock(return_value=None)
|
| mock_engine_instance.delete_memory = AsyncMock(return_value=True)
|
| mock_engine_instance.store = AsyncMock(return_value="mem_id_123")
|
| mock_engine_instance.query = AsyncMock(return_value=[("mem_id_123", 0.9)])
|
| mock_engine_instance.initialize = AsyncMock(return_value=None)
|
| mock_engine_instance.close = AsyncMock(return_value=None)
|
| mock_engine_instance.define_concept = AsyncMock(return_value=None)
|
| mock_engine_instance.reason_by_analogy = AsyncMock(return_value=[("result1", 0.8)])
|
| mock_engine_cls.return_value = mock_engine_instance
|
|
|
|
|
| mock_container = MagicMock()
|
| mock_container.redis_storage = AsyncMock()
|
| mock_container.redis_storage.check_health = AsyncMock(return_value=True)
|
| mock_container.redis_storage.store_memory = AsyncMock()
|
| mock_container.redis_storage.publish_event = AsyncMock()
|
| mock_container.redis_storage.retrieve_memory = AsyncMock(return_value=None)
|
| mock_container.redis_storage.delete_memory = AsyncMock()
|
| mock_container.redis_storage.close = AsyncMock()
|
| mock_container.qdrant_store = MagicMock()
|
|
|
|
|
| mock_pipeline = MagicMock()
|
| mock_pipeline.__aenter__ = AsyncMock(return_value=mock_pipeline)
|
| mock_pipeline.__aexit__ = AsyncMock(return_value=None)
|
| mock_pipeline.incr = MagicMock()
|
| mock_pipeline.expire = MagicMock()
|
| mock_pipeline.execute = AsyncMock(return_value=[1, True])
|
|
|
| mock_redis_client = MagicMock()
|
| mock_redis_client.pipeline.return_value = mock_pipeline
|
| mock_container.redis_storage.redis_client = mock_redis_client
|
|
|
|
|
| patcher1 = patch("mnemocore.api.main.HAIMEngine", mock_engine_cls)
|
| patcher2 = patch("mnemocore.api.main.build_container", return_value=mock_container)
|
| patcher1.start()
|
| patcher2.start()
|
|
|
| from mnemocore.api.main import app
|
|
|
|
|
| @pytest.fixture(autouse=True)
|
| def setup_env(monkeypatch):
|
| monkeypatch.setenv("HAIM_API_KEY", API_KEY)
|
| reset_config()
|
|
|
| app.state.engine = mock_engine_instance
|
| app.state.container = mock_container
|
| yield
|
| reset_config()
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_store_content_too_large():
|
| """Verify that content larger than 100,000 chars is rejected."""
|
| with TestClient(app) as client:
|
| large_content = "a" * 100001
|
| response = client.post(
|
| "/store",
|
| json={"content": large_content},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "String should have at most 100000 characters" in response.text
|
|
|
|
|
| def test_store_content_valid():
|
| """Verify that content within limit is accepted."""
|
| mock_memory = MagicMock(
|
| id="mem_1", content="a" * 1000, metadata={}, ltp_strength=0.5,
|
| created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00"))
|
| )
|
| mock_engine_instance.get_memory.return_value = mock_memory
|
| mock_engine_instance.store.return_value = "mem_1"
|
|
|
| with TestClient(app) as client:
|
| valid_content = "a" * 1000
|
| response = client.post(
|
| "/store",
|
| json={"content": valid_content},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 200
|
|
|
|
|
| def test_store_content_empty():
|
| """Verify that empty content is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/store",
|
| json={"content": ""},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_store_content_whitespace_only():
|
| """Verify that whitespace-only content is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/store",
|
| json={"content": " \n\t "},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_store_metadata_too_many_keys():
|
| """Verify that metadata with too many keys is rejected."""
|
| with TestClient(app) as client:
|
| many_metadata = {f"k{i}": "v" for i in range(51)}
|
| response = client.post(
|
| "/store",
|
| json={"content": "foo", "metadata": many_metadata},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "Too many metadata keys" in response.text
|
|
|
|
|
| def test_store_metadata_key_too_long():
|
| """Verify that metadata key longer than 64 chars is rejected."""
|
| with TestClient(app) as client:
|
| long_key = "k" * 65
|
| response = client.post(
|
| "/store",
|
| json={"content": "foo", "metadata": {long_key: "val"}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "too long" in response.text
|
|
|
|
|
| def test_store_metadata_value_too_long():
|
| """Verify that metadata value longer than 1000 chars is rejected."""
|
| with TestClient(app) as client:
|
| long_value = "v" * 1001
|
| response = client.post(
|
| "/store",
|
| json={"content": "foo", "metadata": {"key": long_value}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "too long" in response.text
|
|
|
|
|
| def test_store_metadata_invalid_key_characters():
|
| """Verify that metadata key with invalid characters is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/store",
|
| json={"content": "foo", "metadata": {"key$invalid": "val"}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "invalid characters" in response.text
|
|
|
|
|
| def test_store_metadata_nested_structure():
|
| """Verify that nested metadata values are rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/store",
|
| json={"content": "foo", "metadata": {"nested": {"key": "value"}}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "primitive type" in response.text
|
|
|
|
|
| def test_store_agent_id_too_long():
|
| """Verify that agent_id longer than 256 chars is rejected."""
|
| with TestClient(app) as client:
|
| long_agent_id = "a" * 257
|
| response = client.post(
|
| "/store",
|
| json={"content": "foo", "agent_id": long_agent_id},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_store_agent_id_invalid_characters():
|
| """Verify that agent_id with invalid characters is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/store",
|
| json={"content": "foo", "agent_id": "agent$invalid"},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_store_ttl_out_of_range():
|
| """Verify that TTL outside valid range is rejected."""
|
| with TestClient(app) as client:
|
|
|
| response = client.post(
|
| "/store",
|
| json={"content": "foo", "ttl": 0},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| response = client.post(
|
| "/store",
|
| json={"content": "foo", "ttl": 86400 * 365 + 1},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_query_too_large():
|
| """Verify query string limits."""
|
| with TestClient(app) as client:
|
| large_query = "q" * 20000
|
| response = client.post(
|
| "/query",
|
| json={"query": large_query},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "String should have at most 10000 characters" in response.text
|
|
|
|
|
| def test_query_valid():
|
| """Verify query within limit is accepted."""
|
|
|
| mock_memory = MagicMock(
|
| id="mem_1", content="test result", metadata={}, ltp_strength=0.5,
|
| created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00")),
|
| tier="hot"
|
| )
|
| mock_engine_instance.get_memory.return_value = mock_memory
|
| mock_engine_instance.query.return_value = [("mem_1", 0.9)]
|
|
|
| with TestClient(app) as client:
|
| valid_query = "hello world"
|
| response = client.post(
|
| "/query",
|
| json={"query": valid_query},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code != 422
|
|
|
|
|
| def test_query_empty():
|
| """Verify that empty query is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/query",
|
| json={"query": ""},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_query_whitespace_only():
|
| """Verify that whitespace-only query is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/query",
|
| json={"query": " \n\t "},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_query_top_k_out_of_range():
|
| """Verify that top_k outside valid range is rejected."""
|
| with TestClient(app) as client:
|
|
|
| response = client.post(
|
| "/query",
|
| json={"query": "test", "top_k": 0},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| response = client.post(
|
| "/query",
|
| json={"query": "test", "top_k": 101},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_concept_name_too_large():
|
| """Verify concept name limit."""
|
| with TestClient(app) as client:
|
| large_name = "n" * 10000
|
| response = client.post(
|
| "/concept",
|
| json={"name": large_name, "attributes": {"key": "value"}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "String should have at most 256 characters" in response.text
|
|
|
|
|
| def test_concept_name_empty():
|
| """Verify that empty concept name is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/concept",
|
| json={"name": "", "attributes": {"key": "value"}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_concept_name_invalid_characters():
|
| """Verify that concept name with invalid characters is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/concept",
|
| json={"name": "concept$invalid", "attributes": {"key": "value"}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_concept_attributes_too_many():
|
| """Verify that too many attributes are rejected."""
|
| with TestClient(app) as client:
|
| many_attributes = {f"k{i}": "v" for i in range(51)}
|
| response = client.post(
|
| "/concept",
|
| json={"name": "test", "attributes": many_attributes},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "Too many attributes" in response.text
|
|
|
|
|
| def test_concept_attributes_empty():
|
| """Verify that empty attributes are rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/concept",
|
| json={"name": "test", "attributes": {}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_concept_attribute_key_too_long():
|
| """Verify that attribute key longer than 64 chars is rejected."""
|
| with TestClient(app) as client:
|
| long_key = "k" * 65
|
| response = client.post(
|
| "/concept",
|
| json={"name": "test", "attributes": {long_key: "val"}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "too long" in response.text
|
|
|
|
|
| def test_concept_attribute_key_invalid_characters():
|
| """Verify that attribute key with invalid characters is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/concept",
|
| json={"name": "test", "attributes": {"key$invalid": "val"}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_concept_attribute_value_too_long():
|
| """Verify that attribute value longer than 1000 chars is rejected."""
|
| with TestClient(app) as client:
|
| long_value = "v" * 1001
|
| response = client.post(
|
| "/concept",
|
| json={"name": "test", "attributes": {"key": long_value}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_analogy_source_concept_too_large():
|
| """Verify analogy source concept limit."""
|
| with TestClient(app) as client:
|
| large_str = "a" * 10000
|
| response = client.post(
|
| "/analogy",
|
| json={
|
| "source_concept": large_str,
|
| "source_value": "val",
|
| "target_concept": "target"
|
| },
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
| assert "String should have at most 256 characters" in response.text
|
|
|
|
|
| def test_analogy_empty_concept():
|
| """Verify that empty concept is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/analogy",
|
| json={
|
| "source_concept": "",
|
| "source_value": "val",
|
| "target_concept": "target"
|
| },
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_analogy_empty_value():
|
| """Verify that empty value is rejected."""
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/analogy",
|
| json={
|
| "source_concept": "source",
|
| "source_value": "",
|
| "target_concept": "target"
|
| },
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
| def test_analogy_target_concept_too_large():
|
| """Verify analogy target concept limit."""
|
| with TestClient(app) as client:
|
| large_str = "a" * 10000
|
| response = client.post(
|
| "/analogy",
|
| json={
|
| "source_concept": "source",
|
| "source_value": "val",
|
| "target_concept": large_str
|
| },
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 422
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_store_rate_limiter_within_limit():
|
| """Verify store requests within limit succeed."""
|
|
|
| mock_pipeline.execute.return_value = [1, True]
|
|
|
| mock_memory = MagicMock(
|
| id="mem_1", content="test", metadata={}, ltp_strength=0.5,
|
| created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00"))
|
| )
|
| mock_engine_instance.get_memory.return_value = mock_memory
|
| mock_engine_instance.store.return_value = "mem_1"
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/store",
|
| json={"content": "test"},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code == 200
|
| assert response.json()["ok"] is True
|
|
|
|
|
| def test_store_rate_limiter_exceeded():
|
| """Verify store rate limit returns 429 with Retry-After header."""
|
|
|
| mock_pipeline.execute.return_value = [101, True]
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/store",
|
| json={"content": "test"},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code == 429
|
| assert "Rate limit exceeded" in response.json()["detail"]
|
| assert "Retry-After" in response.headers
|
|
|
|
|
| def test_store_rate_limiter_retry_after_value():
|
| """Verify Retry-After header contains valid seconds."""
|
| mock_pipeline.execute.return_value = [101, True]
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/store",
|
| json={"content": "test"},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code == 429
|
| retry_after = response.headers.get("Retry-After")
|
| assert retry_after is not None
|
|
|
| assert int(retry_after) > 0
|
| assert int(retry_after) <= 60
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_query_rate_limiter_within_limit():
|
| """Verify query requests within limit succeed."""
|
|
|
| mock_pipeline.execute.return_value = [100, True]
|
|
|
|
|
| mock_memory = MagicMock(
|
| id="mem_1", content="test result", metadata={}, ltp_strength=0.5,
|
| created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00")),
|
| tier="hot"
|
| )
|
| mock_engine_instance.get_memory.return_value = mock_memory
|
| mock_engine_instance.query.return_value = [("mem_1", 0.9)]
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/query",
|
| json={"query": "test"},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
|
|
| assert response.status_code != 429
|
|
|
|
|
| def test_query_rate_limiter_exceeded():
|
| """Verify query rate limit returns 429 with Retry-After header."""
|
|
|
| mock_pipeline.execute.return_value = [501, True]
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/query",
|
| json={"query": "test"},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code == 429
|
| assert "Rate limit exceeded" in response.json()["detail"]
|
| assert "Retry-After" in response.headers
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_concept_rate_limiter_within_limit():
|
| """Verify concept requests within limit succeed."""
|
| mock_pipeline.execute.return_value = [50, True]
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/concept",
|
| json={"name": "test", "attributes": {"key": "value"}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code != 429
|
|
|
|
|
| def test_concept_rate_limiter_exceeded():
|
| """Verify concept rate limit returns 429."""
|
| mock_pipeline.execute.return_value = [101, True]
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/concept",
|
| json={"name": "test", "attributes": {"key": "value"}},
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code == 429
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_analogy_rate_limiter_within_limit():
|
| """Verify analogy requests within limit succeed."""
|
| mock_pipeline.execute.return_value = [50, True]
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/analogy",
|
| json={
|
| "source_concept": "source",
|
| "source_value": "val",
|
| "target_concept": "target"
|
| },
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code != 429
|
|
|
|
|
| def test_analogy_rate_limiter_exceeded():
|
| """Verify analogy rate limit returns 429."""
|
| mock_pipeline.execute.return_value = [101, True]
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/analogy",
|
| json={
|
| "source_concept": "source",
|
| "source_value": "val",
|
| "target_concept": "target"
|
| },
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code == 429
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_rate_limit_different_categories():
|
| """Verify that different endpoints have different rate limits."""
|
| with TestClient(app) as client:
|
|
|
| response = client.get("/rate-limits")
|
| assert response.status_code == 200
|
|
|
| limits = response.json()["limits"]
|
|
|
|
|
| assert limits["store"]["requests"] == 100
|
| assert limits["store"]["window_seconds"] == 60
|
|
|
|
|
| assert limits["query"]["requests"] == 500
|
| assert limits["query"]["window_seconds"] == 60
|
|
|
|
|
| assert limits["concept"]["requests"] == 100
|
| assert limits["concept"]["window_seconds"] == 60
|
|
|
|
|
| assert limits["analogy"]["requests"] == 100
|
| assert limits["analogy"]["window_seconds"] == 60
|
|
|
|
|
| def test_rate_limit_x_forwarded_for():
|
| """Verify that X-Forwarded-For header is used for client IP."""
|
| mock_pipeline.execute.return_value = [1, True]
|
|
|
| mock_memory = MagicMock(
|
| id="mem_1", content="test", metadata={}, ltp_strength=0.5,
|
| created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00"))
|
| )
|
| mock_engine_instance.get_memory.return_value = mock_memory
|
| mock_engine_instance.store.return_value = "mem_1"
|
|
|
| with TestClient(app) as client:
|
| response = client.post(
|
| "/store",
|
| json={"content": "test"},
|
| headers={
|
| "X-API-Key": API_KEY,
|
| "X-Forwarded-For": "10.0.0.1, 192.168.1.1"
|
| }
|
| )
|
|
|
|
|
| assert response.status_code == 200
|
|
|
|
|
|
|
|
|
|
|
|
|
| def test_get_memory_invalid_id_empty():
|
| """Verify that empty memory_id is rejected."""
|
| with TestClient(app) as client:
|
| response = client.get(
|
| "/memory/",
|
| headers={"X-API-Key": API_KEY}
|
| )
|
|
|
| assert response.status_code in [404, 405]
|
|
|
|
|
| def test_get_memory_invalid_id_too_long():
|
| """Verify that memory_id longer than 256 chars is rejected."""
|
| with TestClient(app) as client:
|
| long_id = "a" * 300
|
| response = client.get(
|
| f"/memory/{long_id}",
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 400
|
|
|
|
|
| def test_delete_memory_invalid_id_too_long():
|
| """Verify that memory_id longer than 256 chars is rejected for delete."""
|
| with TestClient(app) as client:
|
| long_id = "a" * 300
|
| response = client.delete(
|
| f"/memory/{long_id}",
|
| headers={"X-API-Key": API_KEY}
|
| )
|
| assert response.status_code == 400
|
|
|