[tg] stats/search features

Processed data is not written back to user
This commit is contained in:
Artur Mukhamadiev 2026-03-13 12:50:49 +03:00
parent 5f093075f7
commit 9c8e4c7345
14 changed files with 568 additions and 118 deletions

View File

@ -1,12 +1,13 @@
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from src.bot.handlers import router
from src.bot.handlers import get_router
from src.storage.base import IVectorStore
def setup_bot(token: str) -> tuple[Bot, Dispatcher]:
def setup_bot(token: str, storage: IVectorStore, allowed_chat_id: str) -> tuple[Bot, Dispatcher]:
"""
Setup the aiogram Bot and Dispatcher with handlers.
"""
bot = Bot(token=token, default=DefaultBotProperties(parse_mode="HTML"))
dp = Dispatcher()
dp.include_router(router)
dp.include_router(get_router(storage, allowed_chat_id))
return bot, dp

View File

@ -1,69 +1,162 @@
import uuid
from datetime import datetime
import html
from aiogram import Router
from aiogram.filters import CommandStart, Command
from aiogram.types import Message
from typing import Optional, Callable, Dict, Any, Awaitable
from aiogram import Router, BaseMiddleware, F
from aiogram.filters import CommandStart, Command, CommandObject
from aiogram.types import Message, TelegramObject, InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery
from aiogram.utils.keyboard import InlineKeyboardBuilder
from aiogram.utils.formatting import as_list, as_marked_section, Bold, TextLink
from src.processor.dto import EnrichedNewsItemDTO
from src.storage.base import IVectorStore
router = Router(name="main_router")
class AccessMiddleware(BaseMiddleware):
def __init__(self, allowed_chat_id: str):
self.allowed_chat_id = allowed_chat_id
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
if isinstance(event, Message):
if str(event.chat.id) != self.allowed_chat_id:
await event.answer("Access Denied")
return
return await handler(event, data)
@router.message(CommandStart())
async def command_start_handler(message: Message) -> None:
"""
This handler receives messages with `/start` command
"""
user_name = html.escape(message.from_user.full_name) if message.from_user else 'user'
await message.answer(f"Welcome to Trend-Scout AI, {user_name}!")
def get_router(storage: IVectorStore, allowed_chat_id: str) -> Router:
router = Router(name="main_router")
router.message.middleware(AccessMiddleware(allowed_chat_id))
@router.message(CommandStart())
async def command_start_handler(message: Message) -> None:
"""
This handler receives messages with `/start` command
"""
user_name = html.escape(message.from_user.full_name) if message.from_user else 'user'
await message.answer(f"Welcome to Trend-Scout AI, {user_name}!")
@router.message(Command("help"))
async def command_help_handler(message: Message) -> None:
"""
This handler receives messages with `/help` command
"""
help_text = (
"Available commands:\n"
"/start - Start the bot\n"
"/help - Show this help message\n"
"/latest - Show the latest enriched news trends\n"
)
await message.answer(help_text)
@router.message(Command("help"))
async def command_help_handler(message: Message) -> None:
"""
This handler receives messages with `/help` command
"""
help_text = (
"Available commands:\n"
"/start - Start the bot\n"
"/help - Show this help message\n"
"/latest [category] - Show the latest enriched news trends\n"
"/search query - Search for news\n"
"/stats - Show database statistics\n"
)
await message.answer(help_text)
@router.message(Command("latest"))
async def command_latest_handler(message: Message, command: CommandObject) -> None:
"""
This handler receives messages with `/latest` command
"""
category = command.args if command.args else ""
items = await storage.search(query=category, limit=10)
@router.message(Command("latest"))
async def command_latest_handler(message: Message) -> None:
"""
This handler receives messages with `/latest` command
and returns a mock EnrichedNewsItemDTO
"""
mock_item = EnrichedNewsItemDTO(
title="Breakthrough in Quantum Computing",
url="https://example.com/quantum-breakthrough",
content_text="Researchers have achieved a new milestone in quantum supremacy...",
source="Example Domain News",
timestamp=datetime.now(),
relevance_score=9,
summary_ru="Исследователи достигли нового рубежа в квантовом превосходстве.",
anomalies_detected=["Quantum supremacy", "Room temperature superconductors"]
)
if not items:
await message.answer("No results found.")
return
title = html.escape(mock_item.title)
source = html.escape(mock_item.source)
summary = html.escape(mock_item.summary_ru)
anomalies = [html.escape(a) for a in mock_item.anomalies_detected]
anomalies_text = ", ".join(anomalies)
url = html.escape(mock_item.url)
builder = InlineKeyboardBuilder()
for item in items:
item_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
builder.row(InlineKeyboardButton(
text=f"[{item.relevance_score}/10] {item.title}",
callback_data=f"detail:{item_id}"
))
response_text = (
f"🌟 <b>{title}</b>\n\n"
f"<b>Source:</b> {source}\n"
f"<b>Relevance Score:</b> {mock_item.relevance_score}/10\n"
f"<b>Summary:</b> {summary}\n"
f"<b>Anomalies Detected:</b> {anomalies_text}\n\n"
f"<a href='{url}'>Read more</a>"
)
await message.answer("Latest news:", reply_markup=builder.as_markup())
await message.answer(response_text, parse_mode="HTML")
@router.message(Command("search"))
async def command_search_handler(message: Message, command: CommandObject) -> None:
"""
This handler receives messages with `/search` command
"""
query = command.args
if not query:
await message.answer("Please provide a search query. Usage: /search query")
return
items = await storage.search(query=query, limit=10)
if not items:
await message.answer("No results found.")
return
builder = InlineKeyboardBuilder()
for item in items:
item_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
builder.row(InlineKeyboardButton(
text=f"[{item.relevance_score}/10] {item.title}",
callback_data=f"detail:{item_id}"
))
await message.answer("Search results:", reply_markup=builder.as_markup())
@router.callback_query(F.data.startswith("detail:"))
async def detail_callback_handler(callback: CallbackQuery) -> None:
"""
This handler receives callback queries for news details
"""
item_id = callback.data.split(":")[1]
item = await storage.get_by_id(item_id)
if not item:
await callback.answer("Item not found.", show_alert=True)
return
title = html.escape(item.title)
source = html.escape(item.source)
summary = html.escape(item.summary_ru)
category = html.escape(item.category)
anomalies = [html.escape(a) for a in item.anomalies_detected] if item.anomalies_detected else []
anomalies_text = ", ".join(anomalies)
url = html.escape(item.url)
response_text = (
f"🌟 <b>{title}</b>\n\n"
f"<b>Source:</b> {source}\n"
f"<b>Category:</b> {category}\n"
f"<b>Relevance Score:</b> {item.relevance_score}/10\n"
f"<b>Summary:</b> {summary}\n"
)
if anomalies_text:
response_text += f"<b>Anomalies Detected:</b> {anomalies_text}\n\n"
else:
response_text += "\n"
response_text += f"<a href='{url}'>Read more</a>"
await callback.message.answer(response_text, parse_mode="HTML", disable_web_page_preview=False)
await callback.answer()
@router.message(Command("stats"))
async def command_stats_handler(message: Message) -> None:
"""
This handler receives messages with `/stats` command
"""
stats = await storage.get_stats()
total = stats.get("total", 0)
breakdown = []
for cat, count in stats.items():
if cat != "total":
breakdown.append(f"- {cat}: {count}")
response = f"📊 <b>Database Statistics</b>\n\nTotal items: {total}\n"
if breakdown:
response += "\n<b>Breakdown by category:</b>\n" + "\n".join(breakdown)
await message.answer(response, parse_mode="HTML")
return router

View File

@ -47,10 +47,7 @@ async def main():
if not chat_id or chat_id == "YOUR_CHAT_ID_HERE":
logger.warning("TELEGRAM_CHAT_ID is missing or not set. Notifications will fail.")
# 1. Initialize Bot & Dispatcher
bot, dp = setup_bot(bot_token)
# 2. Initialize Components
# 1. Initialize Components that do not depend on Bot
crawlers: List[ICrawler] = [
RSSCrawler("https://habr.com/ru/rss/hubs/artificial_intelligence/articles/?fl=ru", source="Habr AI")
]
@ -63,6 +60,11 @@ async def main():
chroma_client = chromadb.Client()
storage = ChromaStore(client=chroma_client)
# 2. Initialize Bot & Dispatcher
bot, dp = setup_bot(bot_token, storage, chat_id)
# 3. Initialize Notifier and Orchestrator
notifier = TelegramNotifier(bot, chat_id)
orchestrator = TrendScoutService(
@ -72,7 +74,7 @@ async def main():
notifier=notifier
)
# 3. Start tasks
# 4. Start tasks
logger.info("Starting TrendScout AI Bot and Background Task...")
# Create the background task

View File

@ -15,9 +15,11 @@ class TelegramNotifier(INotificationService):
anomalies_list = [html.escape(a) for a in item.anomalies_detected] if item.anomalies_detected else []
anomalies_text = ", ".join(anomalies_list) if anomalies_list else "None"
url = html.escape(item.url)
category_text = html.escape(item.category) if getattr(item, 'category', None) else "Uncategorized"
formatted_text = (
f"<b>{title}</b>\n\n"
f"Category: {category_text}\n"
f"Relevance: {item.relevance_score}/10\n"
f"Anomalies: {anomalies_text}\n\n"
f"{summary}\n\n"

View File

@ -21,8 +21,16 @@ class TrendScoutService:
for crawler in self.crawlers:
items = await crawler.fetch_latest()
for item in items:
if await self.storage.exists(item.url):
continue
enriched_item = await self.processor.analyze(item)
if enriched_item.relevance_score < 5:
enriched_item.content_text = ""
await self.storage.store(enriched_item)
if enriched_item.relevance_score >= 8 or bool(enriched_item.anomalies_detected):
await self.notifier.send_alert(enriched_item)
# Proactive alerts are currently disabled per user request
# if enriched_item.relevance_score >= 8 or bool(enriched_item.anomalies_detected):
# await self.notifier.send_alert(enriched_item)

View File

@ -6,3 +6,4 @@ class EnrichedNewsItemDTO(NewsItemDTO):
relevance_score: int = Field(ge=0, le=10)
summary_ru: str
anomalies_detected: List[str]
category: str = ""

View File

@ -13,7 +13,12 @@ class OllamaProvider(ILLMProvider):
prompt = (
f"Analyze the following article.\nTitle: {news_item.title}\n"
f"Content: {news_item.content_text}\n"
"Return JSON with 'relevance_score' (0-10), 'summary_ru' (string), and 'anomalies_detected' (list of strings)."
"Return JSON with 'relevance_score' (0-10), 'summary_ru' (string), 'anomalies_detected' (list of strings), and 'category' (string).\n"
"The 'category' must be exactly one of: 'Browsers', 'Edge AI', 'SmartTV', 'Samsung New Technologies', 'Middleware new trends', 'Competitors', 'Other'.\n"
"For 'relevance_score', prioritize and give higher scores to articles related to R&D, Chromium, NPU, and Smart TV operating systems.\n"
"Regarding 'anomalies_detected': only detect factual, conceptual, or industry-related anomalies (e.g., sudden technological shifts, unexpected competitor moves). "
"DO NOT detect technical anomalies related to the text's formatting, HTML tags, metadata, or document structure. "
"If no real anomalies are found, return an empty list."
)
payload = {
"model": os.environ.get('OLLAMA_MODEL', 'gpt-oss:120b-cloud'),
@ -52,7 +57,8 @@ class OllamaProvider(ILLMProvider):
parsed_json = {
"relevance_score": 0,
"summary_ru": "Error parsing LLM response: " + generated_text,
"anomalies_detected": []
"anomalies_detected": [],
"category": "Other"
}
return EnrichedNewsItemDTO(
@ -63,5 +69,6 @@ class OllamaProvider(ILLMProvider):
timestamp=news_item.timestamp,
relevance_score=parsed_json.get('relevance_score', 0),
summary_ru=parsed_json.get('summary_ru', ''),
anomalies_detected=parsed_json.get('anomalies_detected', [])
anomalies_detected=parsed_json.get('anomalies_detected', []),
category=parsed_json.get('category', 'Other')
)

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import List
from typing import List, Optional
from src.processor.dto import EnrichedNewsItemDTO
class IVectorStore(ABC):
@ -10,7 +10,22 @@ class IVectorStore(ABC):
"""Store an item in the vector database."""
pass
@abstractmethod
async def get_by_id(self, item_id: str) -> Optional[EnrichedNewsItemDTO]:
"""Retrieve an item from the vector database by its ID."""
pass
@abstractmethod
async def search(self, query: str, limit: int = 5) -> List[EnrichedNewsItemDTO]:
"""Search for items in the vector database."""
pass
@abstractmethod
async def exists(self, url: str) -> bool:
"""Check if an item with the given URL already exists in the vector database."""
pass
@abstractmethod
async def get_stats(self) -> dict[str, int]:
"""Get storage statistics including total count and breakdown by category."""
pass

View File

@ -1,5 +1,5 @@
import uuid
from typing import List
from typing import List, Optional, Mapping, Any
from datetime import datetime
import chromadb
@ -24,6 +24,7 @@ class ChromaStore(IVectorStore):
"timestamp": item.timestamp.isoformat(),
"relevance_score": item.relevance_score,
"summary_ru": item.summary_ru,
"category": item.category,
# Chroma accepts string, int, float or bool for metadata values
"anomalies_detected": ",".join(item.anomalies_detected) if item.anomalies_detected else ""
}
@ -34,6 +35,18 @@ class ChromaStore(IVectorStore):
metadatas=[metadata]
)
async def get_by_id(self, item_id: str) -> Optional[EnrichedNewsItemDTO]:
results = self.collection.get(ids=[item_id])
metadatas = results.get('metadatas')
if not metadatas or not metadatas[0]:
return None
documents = results.get('documents')
document = documents[0] if documents and documents[0] else ""
return self._reconstruct_dto(metadatas[0], document)
async def search(self, query: str, limit: int = 5) -> List[EnrichedNewsItemDTO]:
results = self.collection.query(
query_texts=[query],
@ -53,21 +66,47 @@ class ChromaStore(IVectorStore):
continue
document = documents[0][idx] if documents and documents[0] else ""
anomalies_str = str(metadata.get("anomalies_detected", ""))
anomalies = anomalies_str.split(",") if anomalies_str else []
anomalies = [a.strip() for a in anomalies if a.strip()]
item = EnrichedNewsItemDTO(
title=str(metadata.get("title", "")),
url=str(metadata.get("url", "")),
content_text=str(document),
source=str(metadata.get("source", "")),
timestamp=datetime.fromisoformat(str(metadata.get("timestamp", ""))),
relevance_score=int(float(str(metadata.get("relevance_score", 0)))),
summary_ru=str(metadata.get("summary_ru", "")),
anomalies_detected=anomalies
)
items.append(item)
items.append(self._reconstruct_dto(metadata, document))
return items
def _reconstruct_dto(self, metadata: Mapping[str, Any], document: str) -> EnrichedNewsItemDTO:
anomalies_str = str(metadata.get("anomalies_detected", ""))
anomalies = anomalies_str.split(",") if anomalies_str else []
anomalies = [a.strip() for a in anomalies if a.strip()]
return EnrichedNewsItemDTO(
title=str(metadata.get("title", "")),
url=str(metadata.get("url", "")),
content_text=str(document),
source=str(metadata.get("source", "")),
timestamp=datetime.fromisoformat(str(metadata.get("timestamp", ""))),
relevance_score=int(float(str(metadata.get("relevance_score", 0)))),
summary_ru=str(metadata.get("summary_ru", "")),
category=str(metadata.get("category", "")),
anomalies_detected=anomalies
)
async def exists(self, url: str) -> bool:
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, url))
result = self.collection.get(ids=[doc_id])
return len(result.get("ids", [])) > 0
async def get_stats(self) -> dict[str, int]:
# Retrieve all metadatas to calculate stats
results = self.collection.get(include=["metadatas"])
metadatas = results.get("metadatas")
if metadatas is None:
metadatas = []
stats = {
"total_count": len(metadatas)
}
for meta in metadatas:
if meta:
category = str(meta.get("category", "Uncategorized"))
key = f"category_{category}"
stats[key] = stats.get(key, 0) + 1
return stats

View File

@ -1,28 +1,78 @@
import uuid
import pytest
from unittest.mock import AsyncMock, MagicMock
from aiogram.types import Message
from aiogram.types import Message, InlineKeyboardMarkup, CallbackQuery
from aiogram.filters import CommandObject
from datetime import datetime
from src.bot.handlers import command_start_handler, command_help_handler, command_latest_handler
from src.bot.handlers import get_router, AccessMiddleware
from src.processor.dto import EnrichedNewsItemDTO
@pytest.fixture
def mock_item():
return EnrichedNewsItemDTO(
title="Breakthrough in Quantum Computing",
url="https://example.com/quantum-breakthrough",
content_text="Researchers have achieved a new milestone in quantum supremacy...",
source="Example Domain News",
timestamp=datetime.now(),
relevance_score=9,
summary_ru="Исследователи достигли нового рубежа в квантовом превосходстве.",
anomalies_detected=["Quantum supremacy", "Room temperature superconductors"],
category="Science"
)
@pytest.fixture
def mock_storage(mock_item):
storage = AsyncMock()
storage.search.return_value = [mock_item]
storage.get_by_id.return_value = mock_item
storage.get_stats.return_value = {"total": 1, "AI": 1}
return storage
@pytest.fixture
def allowed_chat_id():
return "123456789"
@pytest.fixture
def router(mock_storage, allowed_chat_id):
return get_router(mock_storage, allowed_chat_id)
def get_handler(router, callback_name):
for handler in router.message.handlers:
if handler.callback.__name__ == callback_name:
return handler.callback
raise ValueError(f"Handler {callback_name} not found")
def get_callback_handler(router, callback_name):
for handler in router.callback_query.handlers:
if handler.callback.__name__ == callback_name:
return handler.callback
raise ValueError(f"Handler {callback_name} not found")
@pytest.mark.asyncio
async def test_command_start_handler():
async def test_command_start_handler(router, allowed_chat_id):
handler = get_handler(router, "command_start_handler")
message = AsyncMock()
message.chat.id = int(allowed_chat_id)
message.from_user = MagicMock()
message.from_user.full_name = "Test User"
message.answer = AsyncMock()
await command_start_handler(message)
await handler(message)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
assert "Welcome" in args[0] or "Trend-Scout" in args[0]
@pytest.mark.asyncio
async def test_command_help_handler():
async def test_command_help_handler(router, allowed_chat_id):
handler = get_handler(router, "command_help_handler")
message = AsyncMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
await command_help_handler(message)
await handler(message)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
@ -30,16 +80,96 @@ async def test_command_help_handler():
assert "/latest" in args[0]
@pytest.mark.asyncio
async def test_command_latest_handler():
async def test_command_latest_handler(router, mock_storage, allowed_chat_id):
handler = get_handler(router, "command_latest_handler")
message = AsyncMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
command = CommandObject(prefix="/", command="latest", args=None)
await command_latest_handler(message)
await handler(message=message, command=command)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
response_text = args[0]
assert "Latest news:" in args[0]
assert "reply_markup" in kwargs
assert isinstance(kwargs["reply_markup"], InlineKeyboardMarkup)
assert len(kwargs["reply_markup"].inline_keyboard) == 1
assert "Relevance:" in response_text or "Score:" in response_text or "10" in response_text
assert "Anomalies:" in response_text or "detected" in response_text.lower()
assert "Example Domain" in response_text
@pytest.mark.asyncio
async def test_command_search_handler(router, mock_storage, allowed_chat_id):
handler = get_handler(router, "command_search_handler")
message = AsyncMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
command = CommandObject(prefix="/", command="search", args="quantum")
await handler(message=message, command=command)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
assert "Search results:" in args[0]
assert "reply_markup" in kwargs
mock_storage.search.assert_called_once_with(query="quantum", limit=10)
@pytest.mark.asyncio
async def test_detail_callback_handler(router, mock_storage, mock_item):
handler = get_callback_handler(router, "detail_callback_handler")
callback = AsyncMock(spec=CallbackQuery)
item_id = str(uuid.uuid5(uuid.NAMESPACE_URL, mock_item.url))
callback.data = f"detail:{item_id}"
callback.message = AsyncMock()
callback.message.answer = AsyncMock()
callback.answer = AsyncMock()
await handler(callback)
mock_storage.get_by_id.assert_called_once_with(item_id)
callback.message.answer.assert_called_once()
args, kwargs = callback.message.answer.call_args
assert mock_item.title in args[0]
assert mock_item.summary_ru in args[0]
callback.answer.assert_called_once()
@pytest.mark.asyncio
async def test_command_stats_handler(router, mock_storage, allowed_chat_id):
handler = get_handler(router, "command_stats_handler")
message = AsyncMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
await handler(message=message)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
assert "Database Statistics" in args[0]
@pytest.mark.asyncio
async def test_access_middleware_allowed(allowed_chat_id):
middleware = AccessMiddleware(allowed_chat_id)
handler = AsyncMock()
event = MagicMock(spec=Message)
event.chat = MagicMock()
event.chat.id = int(allowed_chat_id)
event.answer = AsyncMock()
data = {}
await middleware(handler, event, data)
handler.assert_called_once_with(event, data)
event.answer.assert_not_called()
@pytest.mark.asyncio
async def test_access_middleware_denied(allowed_chat_id):
middleware = AccessMiddleware(allowed_chat_id)
handler = AsyncMock()
event = MagicMock(spec=Message)
event.chat = MagicMock()
event.chat.id = 999999999 # Different ID
event.answer = AsyncMock()
data = {}
await middleware(handler, event, data)
handler.assert_not_called()
event.answer.assert_called_once_with("Access Denied")

View File

@ -19,7 +19,8 @@ async def test_telegram_notifier_sends_message():
timestamp=datetime.now(),
relevance_score=9,
summary_ru="Тестовое саммари",
anomalies_detected=["Test Anomaly"]
anomalies_detected=["Test Anomaly"],
category="Test Category"
)
# Act
@ -31,5 +32,6 @@ async def test_telegram_notifier_sends_message():
assert kwargs["chat_id"] == chat_id
assert kwargs["parse_mode"] == "HTML"
assert "Test Title" in kwargs["text"]
assert "Category: Test Category" in kwargs["text"]
assert "9/10" in kwargs["text"]
assert "Test Anomaly" in kwargs["text"]

View File

@ -17,7 +17,23 @@ async def test_run_iteration():
news_item = NewsItemDTO(
title="Test Title",
url="http://example.com",
url="http://example.com/new1",
content_text="Sample text",
source="Source",
timestamp=timestamp
)
existing_item = NewsItemDTO(
title="Test Title Existing",
url="http://example.com/existing",
content_text="Sample text",
source="Source",
timestamp=timestamp
)
another_new_item = NewsItemDTO(
title="Test Title 3",
url="http://example.com/new2",
content_text="Sample text",
source="Source",
timestamp=timestamp
@ -31,7 +47,7 @@ async def test_run_iteration():
)
anomaly_item = EnrichedNewsItemDTO(
**news_item.model_dump(),
**another_new_item.model_dump(),
relevance_score=5,
summary_ru="Summary",
anomalies_detected=["Anomaly"]
@ -39,18 +55,21 @@ async def test_run_iteration():
low_relevance_item = EnrichedNewsItemDTO(
**news_item.model_dump(),
relevance_score=5,
relevance_score=3,
summary_ru="Summary",
anomalies_detected=[]
)
crawler_mock.fetch_latest.return_value = [news_item, news_item, news_item]
crawler_mock.fetch_latest.return_value = [news_item, existing_item, another_new_item, news_item]
# Mock exists to return True only for existing_item
storage_mock.exists.side_effect = lambda url: url == "http://example.com/existing"
# Return different items for each call to simulate different results
processor_mock.analyze.side_effect = [
high_relevance_item,
anomaly_item,
low_relevance_item
low_relevance_item,
]
service = TrendScoutService(
@ -67,6 +86,17 @@ async def test_run_iteration():
crawler_mock.fetch_latest.assert_called_once()
assert processor_mock.analyze.call_count == 3
assert storage_mock.store.call_count == 3
assert storage_mock.exists.call_count == 4
# Should only alert on high relevance (1) or anomalies (1), total 2 times
assert notifier_mock.send_alert.call_count == 2
# Verify low relevance item had its content cleared
# It was the 3rd item stored
stored_items = [call.args[0] for call in storage_mock.store.call_args_list]
assert stored_items[0].relevance_score == 8
assert stored_items[0].content_text == "Sample text"
assert stored_items[1].relevance_score == 5
assert stored_items[1].content_text == "Sample text"
assert stored_items[2].relevance_score == 3
assert stored_items[2].content_text == ""
# Should not alert proactively anymore as per updated requirements
assert notifier_mock.send_alert.call_count == 0

View File

@ -41,7 +41,7 @@ def create_mock_session(mock_response_json):
async def test_ollama_provider_analyze_success(sample_news_item):
os.environ['OLLAMA_API_URL'] = 'http://localhost:11434/api/generate'
mock_response_json = {
"response": '{"relevance_score": 8, "summary_ru": "Тестовая статья про ИИ.", "anomalies_detected": ["NPU acceleration"]}'
"response": '{"relevance_score": 8, "summary_ru": "Тестовая статья про ИИ.", "anomalies_detected": ["NPU acceleration"], "category": "Edge AI"}'
}
provider = OllamaProvider()
@ -53,6 +53,7 @@ async def test_ollama_provider_analyze_success(sample_news_item):
assert result.relevance_score == 8
assert result.summary_ru == "Тестовая статья про ИИ."
assert result.anomalies_detected == ["NPU acceleration"]
assert result.category == "Edge AI"
@pytest.mark.asyncio
async def test_ollama_provider_analyze_empty_response(sample_news_item):
@ -69,6 +70,7 @@ async def test_ollama_provider_analyze_empty_response(sample_news_item):
assert result.relevance_score == 0
assert result.summary_ru == ""
assert result.anomalies_detected == []
assert result.category == "Other"
@pytest.mark.asyncio
async def test_ollama_provider_analyze_malformed_json(sample_news_item):
@ -85,12 +87,13 @@ async def test_ollama_provider_analyze_malformed_json(sample_news_item):
assert result.relevance_score == 0
assert "Error parsing LLM response" in result.summary_ru
assert result.anomalies_detected == []
assert result.category == "Other"
@pytest.mark.asyncio
async def test_ollama_provider_analyze_markdown_json(sample_news_item):
os.environ['OLLAMA_API_URL'] = 'http://localhost:11434/api/generate'
mock_response_json = {
"response": "```json\n{\"relevance_score\": 5, \"summary_ru\": \"Markdown test\", \"anomalies_detected\": []}\n```"
"response": "```json\n{\"relevance_score\": 5, \"summary_ru\": \"Markdown test\", \"anomalies_detected\": [], \"category\": \"Browsers\"}\n```"
}
provider = OllamaProvider()
@ -101,4 +104,5 @@ async def test_ollama_provider_analyze_markdown_json(sample_news_item):
assert result.relevance_score == 5
assert result.summary_ru == "Markdown test"
assert result.anomalies_detected == []
assert result.category == "Browsers"

View File

@ -1,5 +1,6 @@
import pytest
import pytest_asyncio
import uuid
from datetime import datetime, timezone
import chromadb
from chromadb.config import Settings
@ -27,7 +28,8 @@ async def test_store_and_search(chroma_store: ChromaStore):
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=9,
summary_ru="Apple анонсировала новый чип M4.",
anomalies_detected=["NPU acceleration"]
anomalies_detected=["NPU acceleration"],
category="Competitors"
)
item2 = EnrichedNewsItemDTO(
@ -38,7 +40,8 @@ async def test_store_and_search(chroma_store: ChromaStore):
timestamp=datetime(2023, 11, 2, 10, 0, tzinfo=timezone.utc),
relevance_score=2,
summary_ru="Местная пекарня испекла гигантский хлеб.",
anomalies_detected=[]
anomalies_detected=[],
category="Other"
)
item3 = EnrichedNewsItemDTO(
@ -49,7 +52,8 @@ async def test_store_and_search(chroma_store: ChromaStore):
timestamp=datetime(2023, 11, 3, 14, 0, tzinfo=timezone.utc),
relevance_score=10,
summary_ru="NVIDIA представила RTX 5090 с поддержкой WebGPU.",
anomalies_detected=["WebGPU", "Edge AI"]
anomalies_detected=["WebGPU", "Edge AI"],
category="Edge AI"
)
# 2. Act
@ -77,6 +81,7 @@ async def test_store_and_search(chroma_store: ChromaStore):
assert "Edge AI" in res.anomalies_detected
assert "NVIDIA's new RTX 5090" in res.content_text
assert res.source == "GPUWeekly"
assert res.category == "Edge AI"
@pytest.mark.asyncio
async def test_search_empty_store(chroma_store: ChromaStore):
@ -93,7 +98,8 @@ async def test_store_upsert(chroma_store: ChromaStore):
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=9,
summary_ru="Apple анонсировала новый чип M4.",
anomalies_detected=["NPU acceleration"]
anomalies_detected=["NPU acceleration"],
category="Competitors"
)
# Store first time
@ -114,3 +120,113 @@ async def test_store_upsert(chroma_store: ChromaStore):
assert len(results_updated) == 1
assert results_updated[0].relevance_score == 10
assert results_updated[0].summary_ru == "Apple анонсировала чип M4. Обновлено."
@pytest.mark.asyncio
async def test_exists(chroma_store: ChromaStore):
url = "https://example.com/unique-news-123"
# Check that it doesn't exist initially
assert not await chroma_store.exists(url)
item = EnrichedNewsItemDTO(
title="Test Title",
url=url,
content_text="Test content",
source="TestSource",
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=5,
summary_ru="Тест",
anomalies_detected=[],
category="Other"
)
await chroma_store.store(item)
# Check that it exists now
assert await chroma_store.exists(url)
@pytest.mark.asyncio
async def test_get_by_id(chroma_store: ChromaStore):
# 1. Arrange
url = "https://example.com/get-by-id-test"
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, url))
item = EnrichedNewsItemDTO(
title="ID Test Title",
url=url,
content_text="ID Test Content",
source="IDTestSource",
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=7,
summary_ru="Тест по ID",
anomalies_detected=["TestAnomaly"],
category="Testing"
)
# 2. Act
await chroma_store.store(item)
# Try to retrieve by ID
retrieved_item = await chroma_store.get_by_id(doc_id)
# Try to retrieve non-existent ID
none_item = await chroma_store.get_by_id("non-existent-id")
# 3. Assert
assert retrieved_item is not None
assert retrieved_item.title == "ID Test Title"
assert retrieved_item.url == url
assert retrieved_item.relevance_score == 7
assert "TestAnomaly" in retrieved_item.anomalies_detected
assert retrieved_item.category == "Testing"
assert none_item is None
@pytest.mark.asyncio
async def test_get_stats(chroma_store: ChromaStore):
# 1. Arrange
item1 = EnrichedNewsItemDTO(
title="Title 1",
url="https://example.com/1",
content_text="Content 1",
source="Source 1",
timestamp=datetime.now(timezone.utc),
relevance_score=5,
summary_ru="Сводка 1",
anomalies_detected=[],
category="Tech"
)
item2 = EnrichedNewsItemDTO(
title="Title 2",
url="https://example.com/2",
content_text="Content 2",
source="Source 2",
timestamp=datetime.now(timezone.utc),
relevance_score=5,
summary_ru="Сводка 2",
anomalies_detected=[],
category="Tech"
)
item3 = EnrichedNewsItemDTO(
title="Title 3",
url="https://example.com/3",
content_text="Content 3",
source="Source 3",
timestamp=datetime.now(timezone.utc),
relevance_score=5,
summary_ru="Сводка 3",
anomalies_detected=[],
category="Science"
)
# 2. Act
await chroma_store.store(item1)
await chroma_store.store(item2)
await chroma_store.store(item3)
stats = await chroma_store.get_stats()
# 3. Assert
assert stats["total_count"] == 3
assert stats["category_Tech"] == 2
assert stats["category_Science"] == 1