AI-Trend-Scout/tests/storage/test_chroma_store.py
Artur Mukhamadiev 65fccbc614 feat(storage): implement hybrid search and fix async chroma i/o
- Add ADR 001 for Hybrid Search Architecture
- Implement Phase 1 (Exact Match) and Phase 2 (Semantic Fallback) in ChromaStore
- Wrap blocking ChromaDB calls in asyncio.to_thread
- Update IVectorStore interface to support category filtering and thresholds
- Add comprehensive tests for hybrid search logic
2026-03-16 00:11:07 +03:00

286 lines
8.7 KiB
Python

import pytest
import asyncio
import uuid
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
from typing import Dict, Any
from src.processor.dto import EnrichedNewsItemDTO
from src.storage.chroma_store import ChromaStore
@pytest.fixture
def mock_client():
return MagicMock()
@pytest.fixture
def mock_collection():
return MagicMock()
@pytest.fixture
def chroma_store(mock_client, mock_collection):
mock_client.get_or_create_collection.return_value = mock_collection
return ChromaStore(client=mock_client, collection_name="test_collection")
@pytest.mark.asyncio
async def test_store(chroma_store, mock_collection):
# Arrange
item = EnrichedNewsItemDTO(
title="Test Title",
url="https://example.com/test",
content_text="Test Content",
source="Test Source",
timestamp=datetime(2023, 1, 1, tzinfo=timezone.utc),
relevance_score=8,
summary_ru="Тест",
category="Tech",
anomalies_detected=["A1", "A2"]
)
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
# Act
await chroma_store.store(item)
# Assert
mock_collection.upsert.assert_called_once()
args, kwargs = mock_collection.upsert.call_args
assert kwargs['ids'] == [doc_id]
assert kwargs['documents'] == ["Test Content"]
assert kwargs['metadatas'][0]['title'] == "Test Title"
assert kwargs['metadatas'][0]['category'] == "Tech"
assert kwargs['metadatas'][0]['anomalies_detected'] == "A1,A2"
@pytest.mark.asyncio
async def test_get_by_id_found(chroma_store, mock_collection):
# Arrange
item_id = "some-id"
mock_collection.get.return_value = {
"metadatas": [{
"title": "Title",
"url": "https://url.com",
"source": "Source",
"timestamp": "2023-01-01T00:00:00",
"relevance_score": 5.0,
"summary_ru": "Сводка",
"category": "Cat",
"anomalies_detected": "A1"
}],
"documents": ["Content"]
}
# Act
result = await chroma_store.get_by_id(item_id)
# Assert
assert result is not None
assert result.title == "Title"
assert result.content_text == "Content"
assert result.anomalies_detected == ["A1"]
mock_collection.get.assert_called_once_with(ids=[item_id])
@pytest.mark.asyncio
async def test_get_by_id_not_found(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {"metadatas": [], "documents": []}
# Act
result = await chroma_store.get_by_id("none")
# Assert
assert result is None
@pytest.mark.asyncio
async def test_exists(chroma_store, mock_collection):
# Arrange
url = "https://example.com"
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, url))
mock_collection.get.return_value = {"ids": [doc_id]}
# Act
exists = await chroma_store.exists(url)
# Assert
assert exists is True
mock_collection.get.assert_called_once_with(ids=[doc_id])
@pytest.mark.asyncio
async def test_get_stats(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {
"metadatas": [
{"category": "Tech"},
{"category": "Tech"},
{"category": "Science"},
None,
{"other": "data"}
]
}
# Act
stats = await chroma_store.get_stats()
# Assert
assert stats["total_count"] == 5
assert stats["category_Tech"] == 2
assert stats["category_Science"] == 1
assert stats["category_Uncategorized"] == 1 # for the dict without category
@pytest.mark.asyncio
async def test_get_latest(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {
"metadatas": [
{"title": "Old", "timestamp": "2023-01-01T00:00:00", "url": "u1", "relevance_score": 1},
{"title": "New", "timestamp": "2023-01-02T00:00:00", "url": "u2", "relevance_score": 1},
],
"documents": ["doc1", "doc2"]
}
# Act
results = await chroma_store.get_latest(limit=10, category="Tech")
# Assert
assert len(results) == 2
assert results[0].title == "New"
assert results[1].title == "Old"
mock_collection.get.assert_called_once_with(
include=["metadatas", "documents"],
where={"category": "Tech"}
)
@pytest.mark.asyncio
async def test_get_top_ranked(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {
"metadatas": [
{"title": "Low", "timestamp": "2023-01-01T00:00:00", "url": "u1", "relevance_score": 2},
{"title": "High", "timestamp": "2023-01-01T00:00:00", "url": "u2", "relevance_score": 10},
],
"documents": ["doc1", "doc2"]
}
# Act
results = await chroma_store.get_top_ranked(limit=1, category="Tech")
# Assert
assert len(results) == 1
assert results[0].title == "High"
mock_collection.get.assert_called_once_with(
include=["metadatas", "documents"],
where={"category": "Tech"}
)
@pytest.mark.asyncio
async def test_search_hybrid_exact_match_fills_limit(chroma_store, mock_collection):
# Arrange
query = "Apple"
mock_collection.get.return_value = {
"metadatas": [
{"title": "Apple M4", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10},
{"title": "Apple Vision", "url": "u2", "timestamp": "2023-01-01T00:00:00", "relevance_score": 9},
],
"documents": ["doc1", "doc2"]
}
# Act
results = await chroma_store.search(query, limit=2)
# Assert
assert len(results) == 2
assert results[0].title == "Apple M4"
assert results[1].title == "Apple Vision"
mock_collection.get.assert_called_once()
mock_collection.query.assert_not_called()
@pytest.mark.asyncio
async def test_search_hybrid_falls_back_to_semantic(chroma_store, mock_collection):
# Arrange
query = "Apple"
# Exact match finds 1 item
mock_collection.get.return_value = {
"metadatas": [{"title": "Apple M4", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10}],
"documents": ["doc1"]
}
# Semantic match finds more items, including the same one
mock_collection.query.return_value = {
"metadatas": [[
{"title": "Apple M4", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10},
{"title": "M3 Chip", "url": "u2", "timestamp": "2023-01-01T00:00:00", "relevance_score": 8},
]],
"documents": [["doc1", "doc2"]],
"distances": [[0.1, 0.5]]
}
# Act
results = await chroma_store.search(query, limit=2)
# Assert
assert len(results) == 2
assert results[0].title == "Apple M4"
assert results[1].title == "M3 Chip"
assert mock_collection.get.called
assert mock_collection.query.called
@pytest.mark.asyncio
async def test_search_with_category_and_threshold(chroma_store, mock_collection):
# Arrange
query = "AI"
mock_collection.get.return_value = {"metadatas": [], "documents": []}
mock_collection.query.return_value = {
"metadatas": [[
{"title": "Good match", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10},
{"title": "Bad match", "url": "u2", "timestamp": "2023-01-01T00:00:00", "relevance_score": 5},
]],
"documents": [["doc1", "doc2"]],
"distances": [[0.2, 0.8]]
}
# Act
results = await chroma_store.search(query, limit=5, category="Tech", threshold=0.5)
# Assert
assert len(results) == 1
assert results[0].title == "Good match"
mock_collection.get.assert_called_with(
where_document={"$contains": "AI"},
where={"category": "Tech"},
include=["metadatas", "documents"]
)
mock_collection.query.assert_called_with(
query_texts=["AI"],
n_results=5,
where={"category": "Tech"}
)
@pytest.mark.asyncio
async def test_search_exception_handling(chroma_store, mock_collection):
# Arrange
mock_collection.get.side_effect = Exception("Get failed")
mock_collection.query.side_effect = Exception("Query failed")
# Act
results = await chroma_store.search("query")
# Assert
assert results == [] # Should not crash
@pytest.mark.asyncio
async def test_search_empty_query(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {"metadatas": [], "documents": []}
mock_collection.query.return_value = {"metadatas": [[]], "documents": [[]], "distances": [[]]}
# Act
await chroma_store.search("")
# Assert
mock_collection.get.assert_called_with(
where_document=None,
where=None,
include=["metadatas", "documents"]
)
mock_collection.query.assert_called_with(
query_texts=["*"],
n_results=5,
where=None
)