AI-Trend-Scout/tests/storage/test_chroma_store.py
Artur Mukhamadiev e1c7f47f8f Feature: Add /get_hottest command for exporting top trends
:Release Notes:
- Added a new Telegram command `/get_hottest <number> [format]` to export the top `N` trends as a CSV or Markdown file.

:Detailed Notes:
- Created `ITrendExporter` interface and concrete `CsvTrendExporter` and `MarkdownTrendExporter` implementations for formatting DTOs.
- Updated `src/bot/handlers.py` to include `command_get_hottest_handler` mapping to `/get_hottest`.
- Used `BufferedInputFile` to stream generated files asynchronously directly to Telegram without disk I/O.
- Fixed unrelated pipeline test failures regarding `EphemeralClient` usage with ChromaDB.

:Testing Performed:
- Implemented TDD with `pytest` for parsing parameters, exporting logic, and handling empty DB scenarios.
- Ran the full test suite (90 tests) which completed successfully.

:QA Notes:
- Fully covered the new handler using `pytest-asyncio` and `aiogram` mocked objects.

:Issues Addressed:
- Resolves request to export high-relevance parsed entries.

Change-Id: I25dd90f1e4491ba298682518d835259bffab4190
2026-03-19 14:53:20 +03:00

283 lines
8.7 KiB
Python

import pytest
import asyncio
import uuid
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
from typing import Dict, Any
from src.processor.dto import EnrichedNewsItemDTO
from src.storage.chroma_store import ChromaStore
@pytest.fixture
def mock_client():
return MagicMock()
@pytest.fixture
def mock_collection():
return MagicMock()
@pytest.fixture
def chroma_store(mock_client, mock_collection):
mock_client.get_or_create_collection.return_value = mock_collection
return ChromaStore(client=mock_client, collection_name="test_collection")
@pytest.mark.asyncio
async def test_store(chroma_store, mock_collection):
# Arrange
item = EnrichedNewsItemDTO(
title="Test Title",
url="https://example.com/test",
content_text="Test Content",
source="Test Source",
timestamp=datetime(2023, 1, 1, tzinfo=timezone.utc),
relevance_score=8,
summary_ru="Тест",
category="Tech",
anomalies_detected=["A1", "A2"]
)
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
# Act
await chroma_store.store(item)
# Assert
mock_collection.upsert.assert_called_once()
args, kwargs = mock_collection.upsert.call_args
assert kwargs['ids'] == [doc_id]
assert kwargs['documents'] == ["Test Content"]
assert kwargs['metadatas'][0]['title'] == "Test Title"
assert kwargs['metadatas'][0]['category'] == "Tech"
assert kwargs['metadatas'][0]['anomalies_detected'] == "A1,A2"
@pytest.mark.asyncio
async def test_get_by_id_found(chroma_store, mock_collection):
# Arrange
item_id = "some-id"
mock_collection.get.return_value = {
"metadatas": [{
"title": "Title",
"url": "https://url.com",
"source": "Source",
"timestamp": "2023-01-01T00:00:00",
"relevance_score": 5.0,
"summary_ru": "Сводка",
"category": "Cat",
"anomalies_detected": "A1"
}],
"documents": ["Content"]
}
# Act
result = await chroma_store.get_by_id(item_id)
# Assert
assert result is not None
assert result.title == "Title"
assert result.content_text == "Content"
assert result.anomalies_detected == ["A1"]
mock_collection.get.assert_called_once_with(ids=[item_id])
@pytest.mark.asyncio
async def test_get_by_id_not_found(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {"metadatas": [], "documents": []}
# Act
result = await chroma_store.get_by_id("none")
# Assert
assert result is None
@pytest.mark.asyncio
async def test_exists(chroma_store, mock_collection):
# Arrange
url = "https://example.com"
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, url))
mock_collection.get.return_value = {"ids": [doc_id]}
# Act
exists = await chroma_store.exists(url)
# Assert
assert exists is True
mock_collection.get.assert_called_once_with(ids=[doc_id])
@pytest.mark.asyncio
async def test_get_stats(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {
"metadatas": [
{"category": "Tech"},
{"category": "Tech"},
{"category": "Science"},
None,
{"other": "data"}
]
}
# Act
stats = await chroma_store.get_stats()
# Assert
assert stats["total_count"] == 5
assert stats["category_Tech"] == 2
assert stats["category_Science"] == 1
assert stats["category_Uncategorized"] == 1 # for the dict without category
@pytest.mark.asyncio
async def test_get_latest(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {
"metadatas": [
{"title": "Old", "timestamp": "2023-01-01T00:00:00", "url": "u1", "relevance_score": 1},
{"title": "New", "timestamp": "2023-01-02T00:00:00", "url": "u2", "relevance_score": 1},
],
"documents": ["doc1", "doc2"]
}
# Act
results = await chroma_store.get_latest(limit=10, category="Tech")
# Assert
assert len(results) == 2
assert results[0].title == "New"
assert results[1].title == "Old"
mock_collection.get.assert_called_once_with(
include=["metadatas", "documents"],
where={"category": "Tech"}
)
@pytest.mark.asyncio
async def test_get_top_ranked(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {
"metadatas": [
{"title": "Low", "timestamp": "2023-01-01T00:00:00", "url": "u1", "relevance_score": 2},
{"title": "High", "timestamp": "2023-01-01T00:00:00", "url": "u2", "relevance_score": 10},
],
"documents": ["doc1", "doc2"]
}
# Act
results = await chroma_store.get_top_ranked(limit=1, category="Tech")
# Assert
assert len(results) == 1
assert results[0].title == "High"
mock_collection.get.assert_called_once_with(
include=["metadatas", "documents"],
where={"category": "Tech"}
)
@pytest.mark.asyncio
async def test_search_hybrid_exact_match_fills_limit(chroma_store, mock_collection):
# Arrange
query = "Apple"
mock_collection.get.return_value = {
"metadatas": [
{"title": "Apple M4", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10},
{"title": "Apple Vision", "url": "u2", "timestamp": "2023-01-01T00:00:00", "relevance_score": 9},
],
"documents": ["doc1", "doc2"]
}
# Act
results = await chroma_store.search(query, limit=2)
# Assert
assert len(results) == 2
assert results[0].title == "Apple M4"
assert results[1].title == "Apple Vision"
mock_collection.get.assert_called_once()
mock_collection.query.assert_not_called()
@pytest.mark.asyncio
async def test_search_hybrid_falls_back_to_semantic(chroma_store, mock_collection):
# Arrange
query = "Apple"
# Exact match finds 1 item
mock_collection.get.return_value = {
"metadatas": [{"title": "Apple M4", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10}],
"documents": ["doc1"]
}
# Semantic match finds more items, including the same one
mock_collection.query.return_value = {
"metadatas": [[
{"title": "Apple M4", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10},
{"title": "M3 Chip", "url": "u2", "timestamp": "2023-01-01T00:00:00", "relevance_score": 8},
]],
"documents": [["doc1", "doc2"]],
"distances": [[0.1, 0.5]]
}
# Act
results = await chroma_store.search(query, limit=2)
# Assert
assert len(results) == 2
assert results[0].title == "Apple M4"
assert results[1].title == "M3 Chip"
assert mock_collection.get.called
assert mock_collection.query.called
@pytest.mark.asyncio
async def test_search_with_category_and_threshold(chroma_store, mock_collection):
# Arrange
query = "AI"
mock_collection.get.return_value = {"metadatas": [], "documents": []}
mock_collection.query.return_value = {
"metadatas": [[
{"title": "Good match", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10},
{"title": "Bad match", "url": "u2", "timestamp": "2023-01-01T00:00:00", "relevance_score": 5},
]],
"documents": [["doc1", "doc2"]],
"distances": [[0.2, 0.8]]
}
# Act
results = await chroma_store.search(query, limit=5, category="Tech", threshold=0.5)
# Assert
assert len(results) == 1
assert results[0].title == "Good match"
mock_collection.get.assert_called_with(
where_document={"$contains": "AI"},
where={"category": "Tech"},
limit=5,
include=["metadatas", "documents"]
)
mock_collection.query.assert_called_with(
query_texts=["AI"],
n_results=5,
where={"category": "Tech"}
)
@pytest.mark.asyncio
async def test_search_exception_handling(chroma_store, mock_collection):
# Arrange
mock_collection.get.side_effect = Exception("Get failed")
mock_collection.query.side_effect = Exception("Query failed")
# Act
results = await chroma_store.search("query")
# Assert
assert results == [] # Should not crash
@pytest.mark.asyncio
async def test_search_empty_query(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {"metadatas": [], "documents": []}
mock_collection.query.return_value = {"metadatas": [[]], "documents": [[]], "distances": [[]]}
# Act
await chroma_store.search("")
# Assert
mock_collection.get.assert_not_called()
mock_collection.query.assert_called_with(
query_texts=["*"],
n_results=5,
where=None
)