[ai] mvp generated by gemini

This commit is contained in:
Artur Mukhamadiev 2026-03-13 11:48:37 +03:00
commit 5f093075f7
30 changed files with 1186 additions and 0 deletions

216
.gitignore vendored Normal file
View File

@ -0,0 +1,216 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# poetry.lock
# poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
# .idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml

61
AGENTS.md Normal file
View File

@ -0,0 +1,61 @@
# Agents Architecture and Tasks
This document outlines the architecture, responsibilities, and tasks for the subagents working on the **"Trend-Scout AI"** Telegram bot.
## Core Principles
- **Test-Driven Development (TDD):** Write tests before implementing features. Use `pytest`.
- **SOLID Principles:** Ensure code is modular, maintainable, and follows Single Responsibility, Open/Closed, Liskov Substitution, Interface Segregation, and Dependency Inversion principles.
- **Asynchronous I/O:** Use `asyncio` for network requests, database operations, and bot handling.
## 1. Crawler Agent (Data Collection)
**Responsibility:** Collect data from various sources (RSS feeds, HTML parsing for protected sites like Samsung/Sony Newsroom).
**Inputs:**
- Target URLs and source types (RSS, HTML).
- Configuration for Scrapy/Playwright.
**Outputs:**
- Standardized DTOs (Data Transfer Objects) containing: `title`, `url`, `content_text`, `source`, `timestamp`.
**Tasks:**
1. Setup TDD environment for crawlers (mocking HTTP responses).
2. Implement RSS parser for standard sources (e.g., Nature, Habr).
3. Implement HTML parser (Playwright/Scrapy) for complex/protected sources.
4. Ensure SRP: Crawlers only fetch and parse, returning raw text data.
## 2. AI Processor Agent (NLP & LLM)
**Responsibility:** Analyze collected data using the Ollama API (`gpt-oss:120b-cloud` model).
**Inputs:**
- Standardized DTOs from Crawler Agent.
- Prompts for relevance scoring (0-10) and summarization in Russian.
- Keywords for anomaly detection (e.g., "WebGPU", "NPU acceleration", "Edge AI").
**Outputs:**
- Enriched DTOs containing: `relevance_score`, `summary_ru`, `anomalies_detected`.
**Tasks:**
1. Setup TDD with mocked Ollama API responses.
2. Implement an `ILLMProvider` interface (Dependency Inversion).
3. Implement the concrete Ollama provider.
4. Create prompt templates for relevance, summarization, and anomaly detection.
## 3. Vector Storage Agent (Database)
**Responsibility:** Store and retrieve processed data using a Vector Database (ChromaDB).
**Inputs:**
- Enriched DTOs from AI Processor Agent.
- Search queries.
**Outputs:**
- Stored records.
- Search results (similar news/trends).
**Tasks:**
1. Setup TDD for in-memory ChromaDB operations.
2. Define interfaces for `IVectorStore` (Interface Segregation).
3. Implement embedding generation (using a lightweight local model or Ollama).
4. Implement store and semantic search functionalities.
## 4. Telegram Bot Agent (Frontend)
**Responsibility:** Handle user interactions, display summaries, and send alerts for anomalies.
**Inputs:**
- Telegram updates.
- Enriched DTOs (for alerts).
**Outputs:**
- Formatted Telegram messages.
**Tasks:**
1. Setup TDD for `aiogram` handlers (mocking Telegram API).
2. Implement `/start`, `/help`, and `/latest` commands.
3. Implement a notification service for high-relevance news and anomalies.
4. Ensure OCP: Routers should easily accept new commands without modifying core logic.

0
src/__init__.py Normal file
View File

0
src/bot/__init__.py Normal file
View File

12
src/bot/bot.py Normal file
View File

@ -0,0 +1,12 @@
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from src.bot.handlers import router
def setup_bot(token: str) -> tuple[Bot, Dispatcher]:
"""
Setup the aiogram Bot and Dispatcher with handlers.
"""
bot = Bot(token=token, default=DefaultBotProperties(parse_mode="HTML"))
dp = Dispatcher()
dp.include_router(router)
return bot, dp

69
src/bot/handlers.py Normal file
View File

@ -0,0 +1,69 @@
from datetime import datetime
import html
from aiogram import Router
from aiogram.filters import CommandStart, Command
from aiogram.types import Message
from aiogram.utils.formatting import as_list, as_marked_section, Bold, TextLink
from src.processor.dto import EnrichedNewsItemDTO
router = Router(name="main_router")
@router.message(CommandStart())
async def command_start_handler(message: Message) -> None:
"""
This handler receives messages with `/start` command
"""
user_name = html.escape(message.from_user.full_name) if message.from_user else 'user'
await message.answer(f"Welcome to Trend-Scout AI, {user_name}!")
@router.message(Command("help"))
async def command_help_handler(message: Message) -> None:
"""
This handler receives messages with `/help` command
"""
help_text = (
"Available commands:\n"
"/start - Start the bot\n"
"/help - Show this help message\n"
"/latest - Show the latest enriched news trends\n"
)
await message.answer(help_text)
@router.message(Command("latest"))
async def command_latest_handler(message: Message) -> None:
"""
This handler receives messages with `/latest` command
and returns a mock EnrichedNewsItemDTO
"""
mock_item = EnrichedNewsItemDTO(
title="Breakthrough in Quantum Computing",
url="https://example.com/quantum-breakthrough",
content_text="Researchers have achieved a new milestone in quantum supremacy...",
source="Example Domain News",
timestamp=datetime.now(),
relevance_score=9,
summary_ru="Исследователи достигли нового рубежа в квантовом превосходстве.",
anomalies_detected=["Quantum supremacy", "Room temperature superconductors"]
)
title = html.escape(mock_item.title)
source = html.escape(mock_item.source)
summary = html.escape(mock_item.summary_ru)
anomalies = [html.escape(a) for a in mock_item.anomalies_detected]
anomalies_text = ", ".join(anomalies)
url = html.escape(mock_item.url)
response_text = (
f"🌟 <b>{title}</b>\n\n"
f"<b>Source:</b> {source}\n"
f"<b>Relevance Score:</b> {mock_item.relevance_score}/10\n"
f"<b>Summary:</b> {summary}\n"
f"<b>Anomalies Detected:</b> {anomalies_text}\n\n"
f"<a href='{url}'>Read more</a>"
)
await message.answer(response_text, parse_mode="HTML")

8
src/crawlers/base.py Normal file
View File

@ -0,0 +1,8 @@
from abc import ABC, abstractmethod
from typing import List
from .dto import NewsItemDTO
class ICrawler(ABC):
@abstractmethod
async def fetch_latest(self) -> List[NewsItemDTO]:
pass

9
src/crawlers/dto.py Normal file
View File

@ -0,0 +1,9 @@
from datetime import datetime
from pydantic import BaseModel
class NewsItemDTO(BaseModel):
title: str
url: str
content_text: str
source: str
timestamp: datetime

View File

@ -0,0 +1,48 @@
import aiohttp
import xml.etree.ElementTree as ET
from datetime import datetime
from email.utils import parsedate_to_datetime
from typing import List
from src.crawlers.base import ICrawler
from src.crawlers.dto import NewsItemDTO
class RSSCrawler(ICrawler):
def __init__(self, url: str, source: str):
self.url = url
self.source = source
async def fetch_latest(self) -> List[NewsItemDTO]:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
response.raise_for_status()
xml_data = await response.text()
return self._parse_xml(xml_data)
def _parse_xml(self, xml_data: str) -> List[NewsItemDTO]:
root = ET.fromstring(xml_data)
items = []
for item in root.findall('.//item'):
title = item.findtext('title') or ""
link = item.findtext('link') or ""
description = item.findtext('description') or ""
pub_date_str = item.findtext('pubDate')
if pub_date_str:
try:
timestamp = parsedate_to_datetime(pub_date_str)
except Exception:
timestamp = datetime.now()
else:
timestamp = datetime.now()
items.append(
NewsItemDTO(
title=title,
url=link,
content_text=description,
source=self.source,
timestamp=timestamp
)
)
return items

89
src/main.py Normal file
View File

@ -0,0 +1,89 @@
import asyncio
import os
import logging
from typing import List
from dotenv import load_dotenv
import chromadb
from aiogram import Bot, Dispatcher
from src.crawlers.base import ICrawler
from src.crawlers.rss_crawler import RSSCrawler
from src.processor.ollama_provider import OllamaProvider
from src.storage.chroma_store import ChromaStore
from src.notifications.telegram import TelegramNotifier
from src.orchestrator.service import TrendScoutService
from src.bot.bot import setup_bot
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
async def background_task(orchestrator: TrendScoutService, interval: int = 3600):
"""Run the orchestrator periodically."""
while True:
logger.info("Starting new TrendScout iteration...")
try:
await orchestrator.run_iteration()
logger.info("Iteration completed successfully.")
except Exception as e:
logger.error(f"Error during iteration: {e}", exc_info=True)
logger.info(f"Sleeping for {interval} seconds before next iteration.")
await asyncio.sleep(interval)
async def main():
load_dotenv()
# Load configuration
bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
ollama_url = os.getenv("OLLAMA_API_URL", "http://localhost:11434/api/generate")
chroma_db_path = os.getenv("CHROMA_DB_PATH", "./chroma_db")
if not bot_token:
logger.error("TELEGRAM_BOT_TOKEN is missing!")
return
if not chat_id or chat_id == "YOUR_CHAT_ID_HERE":
logger.warning("TELEGRAM_CHAT_ID is missing or not set. Notifications will fail.")
# 1. Initialize Bot & Dispatcher
bot, dp = setup_bot(bot_token)
# 2. Initialize Components
crawlers: List[ICrawler] = [
RSSCrawler("https://habr.com/ru/rss/hubs/artificial_intelligence/articles/?fl=ru", source="Habr AI")
]
processor = OllamaProvider()
if chroma_db_path:
chroma_client = chromadb.PersistentClient(path=chroma_db_path)
else:
chroma_client = chromadb.Client()
storage = ChromaStore(client=chroma_client)
notifier = TelegramNotifier(bot, chat_id)
orchestrator = TrendScoutService(
crawlers=crawlers,
processor=processor,
storage=storage,
notifier=notifier
)
# 3. Start tasks
logger.info("Starting TrendScout AI Bot and Background Task...")
# Create the background task
bg_task = asyncio.create_task(background_task(orchestrator, interval=3600))
# Start polling the Telegram bot (blocking call)
try:
await dp.start_polling(bot)
finally:
bg_task.cancel()
logger.info("Shutting down...")
if __name__ == "__main__":
asyncio.run(main())

View File

View File

@ -0,0 +1,8 @@
from abc import ABC, abstractmethod
from src.processor.dto import EnrichedNewsItemDTO
class INotificationService(ABC):
@abstractmethod
async def send_alert(self, item: EnrichedNewsItemDTO) -> None:
"""Send an alert about an enriched news item."""
pass

View File

@ -0,0 +1,26 @@
from typing import Union
import aiogram
import html
from src.notifications.base import INotificationService
from src.processor.dto import EnrichedNewsItemDTO
class TelegramNotifier(INotificationService):
def __init__(self, bot: aiogram.Bot, chat_id: Union[str, int]):
self.bot = bot
self.chat_id = chat_id
async def send_alert(self, item: EnrichedNewsItemDTO) -> None:
title = html.escape(item.title)
summary = html.escape(item.summary_ru)
anomalies_list = [html.escape(a) for a in item.anomalies_detected] if item.anomalies_detected else []
anomalies_text = ", ".join(anomalies_list) if anomalies_list else "None"
url = html.escape(item.url)
formatted_text = (
f"<b>{title}</b>\n\n"
f"Relevance: {item.relevance_score}/10\n"
f"Anomalies: {anomalies_text}\n\n"
f"{summary}\n\n"
f"<a href='{url}'>Source</a>"
)
await self.bot.send_message(chat_id=self.chat_id, text=formatted_text, parse_mode="HTML")

View File

View File

@ -0,0 +1,28 @@
from typing import List
from src.crawlers.base import ICrawler
from src.processor.base import ILLMProvider
from src.storage.base import IVectorStore
from src.notifications.base import INotificationService
class TrendScoutService:
def __init__(
self,
crawlers: List[ICrawler],
processor: ILLMProvider,
storage: IVectorStore,
notifier: INotificationService,
):
self.crawlers = crawlers
self.processor = processor
self.storage = storage
self.notifier = notifier
async def run_iteration(self) -> None:
for crawler in self.crawlers:
items = await crawler.fetch_latest()
for item in items:
enriched_item = await self.processor.analyze(item)
await self.storage.store(enriched_item)
if enriched_item.relevance_score >= 8 or bool(enriched_item.anomalies_detected):
await self.notifier.send_alert(enriched_item)

View File

8
src/processor/base.py Normal file
View File

@ -0,0 +1,8 @@
from abc import ABC, abstractmethod
from src.crawlers.dto import NewsItemDTO
from src.processor.dto import EnrichedNewsItemDTO
class ILLMProvider(ABC):
@abstractmethod
async def analyze(self, news_item: NewsItemDTO) -> EnrichedNewsItemDTO:
pass

8
src/processor/dto.py Normal file
View File

@ -0,0 +1,8 @@
from typing import List
from pydantic import Field
from src.crawlers.dto import NewsItemDTO
class EnrichedNewsItemDTO(NewsItemDTO):
relevance_score: int = Field(ge=0, le=10)
summary_ru: str
anomalies_detected: List[str]

View File

@ -0,0 +1,67 @@
import os
import json
import aiohttp
from typing import List
from src.crawlers.dto import NewsItemDTO
from src.processor.base import ILLMProvider
from src.processor.dto import EnrichedNewsItemDTO
class OllamaProvider(ILLMProvider):
async def analyze(self, news_item: NewsItemDTO) -> EnrichedNewsItemDTO:
base_url = os.environ.get('OLLAMA_API_URL', 'http://localhost:11434')
url = base_url if base_url.endswith('/api/generate') else f"{base_url.rstrip('/')}/api/generate"
prompt = (
f"Analyze the following article.\nTitle: {news_item.title}\n"
f"Content: {news_item.content_text}\n"
"Return JSON with 'relevance_score' (0-10), 'summary_ru' (string), and 'anomalies_detected' (list of strings)."
)
payload = {
"model": os.environ.get('OLLAMA_MODEL', 'gpt-oss:120b-cloud'),
"prompt": prompt,
"stream": False,
"format": "json"
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as response:
response.raise_for_status()
data = await response.json()
# Ollama returns the generated text inside 'response' key
generated_text = data.get('response', '')
if not generated_text:
generated_text = "{}"
# Strip markdown code blocks
cleaned_text = generated_text.strip()
if cleaned_text.startswith("```json"):
cleaned_text = cleaned_text[7:]
elif cleaned_text.startswith("```"):
cleaned_text = cleaned_text[3:]
if cleaned_text.endswith("```"):
cleaned_text = cleaned_text[:-3]
cleaned_text = cleaned_text.strip()
try:
parsed_json = json.loads(cleaned_text)
if not isinstance(parsed_json, dict):
parsed_json = {}
except json.JSONDecodeError:
parsed_json = {
"relevance_score": 0,
"summary_ru": "Error parsing LLM response: " + generated_text,
"anomalies_detected": []
}
return EnrichedNewsItemDTO(
title=news_item.title,
url=news_item.url,
content_text=news_item.content_text,
source=news_item.source,
timestamp=news_item.timestamp,
relevance_score=parsed_json.get('relevance_score', 0),
summary_ru=parsed_json.get('summary_ru', ''),
anomalies_detected=parsed_json.get('anomalies_detected', [])
)

16
src/storage/base.py Normal file
View File

@ -0,0 +1,16 @@
from abc import ABC, abstractmethod
from typing import List
from src.processor.dto import EnrichedNewsItemDTO
class IVectorStore(ABC):
"""Abstract interface for a Vector Store."""
@abstractmethod
async def store(self, item: EnrichedNewsItemDTO) -> None:
"""Store an item in the vector database."""
pass
@abstractmethod
async def search(self, query: str, limit: int = 5) -> List[EnrichedNewsItemDTO]:
"""Search for items in the vector database."""
pass

View File

@ -0,0 +1,73 @@
import uuid
from typing import List
from datetime import datetime
import chromadb
from chromadb.api import ClientAPI
from src.storage.base import IVectorStore
from src.processor.dto import EnrichedNewsItemDTO
class ChromaStore(IVectorStore):
def __init__(self, client: ClientAPI, collection_name: str = "news_collection"):
self.client = client
self.collection = self.client.get_or_create_collection(name=collection_name)
async def store(self, item: EnrichedNewsItemDTO) -> None:
# Create a deterministic UUID based on the URL
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
metadata = {
"title": item.title,
"url": item.url,
"source": item.source,
"timestamp": item.timestamp.isoformat(),
"relevance_score": item.relevance_score,
"summary_ru": item.summary_ru,
# Chroma accepts string, int, float or bool for metadata values
"anomalies_detected": ",".join(item.anomalies_detected) if item.anomalies_detected else ""
}
self.collection.upsert(
ids=[doc_id],
documents=[item.content_text],
metadatas=[metadata]
)
async def search(self, query: str, limit: int = 5) -> List[EnrichedNewsItemDTO]:
results = self.collection.query(
query_texts=[query],
n_results=limit
)
items = []
# Check if we have results
metadatas = results.get('metadatas')
if not metadatas or not metadatas[0]:
return items
documents = results.get('documents')
for idx, metadata in enumerate(metadatas[0]):
if metadata is None:
continue
document = documents[0][idx] if documents and documents[0] else ""
anomalies_str = str(metadata.get("anomalies_detected", ""))
anomalies = anomalies_str.split(",") if anomalies_str else []
anomalies = [a.strip() for a in anomalies if a.strip()]
item = EnrichedNewsItemDTO(
title=str(metadata.get("title", "")),
url=str(metadata.get("url", "")),
content_text=str(document),
source=str(metadata.get("source", "")),
timestamp=datetime.fromisoformat(str(metadata.get("timestamp", ""))),
relevance_score=int(float(str(metadata.get("relevance_score", 0)))),
summary_ru=str(metadata.get("summary_ru", "")),
anomalies_detected=anomalies
)
items.append(item)
return items

0
tests/__init__.py Normal file
View File

0
tests/bot/__init__.py Normal file
View File

View File

@ -0,0 +1,45 @@
import pytest
from unittest.mock import AsyncMock, MagicMock
from aiogram.types import Message
from src.bot.handlers import command_start_handler, command_help_handler, command_latest_handler
@pytest.mark.asyncio
async def test_command_start_handler():
message = AsyncMock()
message.from_user = MagicMock()
message.from_user.full_name = "Test User"
message.answer = AsyncMock()
await command_start_handler(message)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
assert "Welcome" in args[0] or "Trend-Scout" in args[0]
@pytest.mark.asyncio
async def test_command_help_handler():
message = AsyncMock()
message.answer = AsyncMock()
await command_help_handler(message)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
assert "/start" in args[0]
assert "/latest" in args[0]
@pytest.mark.asyncio
async def test_command_latest_handler():
message = AsyncMock()
message.answer = AsyncMock()
await command_latest_handler(message)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
response_text = args[0]
assert "Relevance:" in response_text or "Score:" in response_text or "10" in response_text
assert "Anomalies:" in response_text or "detected" in response_text.lower()
assert "Example Domain" in response_text

View File

@ -0,0 +1,68 @@
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from datetime import datetime, timezone
from src.crawlers.rss_crawler import RSSCrawler
from src.crawlers.dto import NewsItemDTO
MOCK_RSS = """<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>Mock Source</title>
<link>http://mock.source.com</link>
<description>Mock Source Description</description>
<item>
<title>Test Title 1</title>
<link>http://mock.source.com/1</link>
<description>Test Content 1</description>
<pubDate>Wed, 02 Oct 2002 08:00:00 GMT</pubDate>
</item>
<item>
<title>Test Title 2</title>
<link>http://mock.source.com/2</link>
<description>Test Content 2</description>
<pubDate>Thu, 03 Oct 2002 10:00:00 GMT</pubDate>
</item>
</channel>
</rss>
"""
@pytest.mark.asyncio
async def test_rss_crawler_fetch_latest():
url = "http://mock.source.com/rss"
source = "Mock Source"
crawler = RSSCrawler(url, source)
with patch("aiohttp.ClientSession.get") as mock_get:
# Create an async mock for the response object
mock_response = AsyncMock()
mock_response.text.return_value = MOCK_RSS
mock_response.raise_for_status = MagicMock()
# Setup context manager for the 'async with session.get(...)' part
mock_get.return_value.__aenter__.return_value = mock_response
# Call the method
items = await crawler.fetch_latest()
# Verify the mock was called with the correct URL
mock_get.assert_called_once_with(url)
# Verify the parsing results
assert len(items) == 2
# Check first item
assert isinstance(items[0], NewsItemDTO)
assert items[0].title == "Test Title 1"
assert items[0].url == "http://mock.source.com/1"
assert items[0].content_text == "Test Content 1"
assert items[0].source == source
assert items[0].timestamp == datetime(2002, 10, 2, 8, 0, tzinfo=timezone.utc)
# Check second item
assert isinstance(items[1], NewsItemDTO)
assert items[1].title == "Test Title 2"
assert items[1].url == "http://mock.source.com/2"
assert items[1].content_text == "Test Content 2"
assert items[1].source == source
assert items[1].timestamp == datetime(2002, 10, 3, 10, 0, tzinfo=timezone.utc)

View File

@ -0,0 +1,35 @@
import pytest
from unittest.mock import AsyncMock
from datetime import datetime
from src.processor.dto import EnrichedNewsItemDTO
from src.notifications.telegram import TelegramNotifier
@pytest.mark.asyncio
async def test_telegram_notifier_sends_message():
# Arrange
bot_mock = AsyncMock()
chat_id = "12345"
notifier = TelegramNotifier(bot=bot_mock, chat_id=chat_id)
item = EnrichedNewsItemDTO(
title="Test Title",
url="http://example.com",
content_text="Sample content",
source="Test Source",
timestamp=datetime.now(),
relevance_score=9,
summary_ru="Тестовое саммари",
anomalies_detected=["Test Anomaly"]
)
# Act
await notifier.send_alert(item)
# Assert
bot_mock.send_message.assert_called_once()
kwargs = bot_mock.send_message.call_args.kwargs
assert kwargs["chat_id"] == chat_id
assert kwargs["parse_mode"] == "HTML"
assert "Test Title" in kwargs["text"]
assert "9/10" in kwargs["text"]
assert "Test Anomaly" in kwargs["text"]

View File

@ -0,0 +1,72 @@
import pytest
from unittest.mock import AsyncMock
from datetime import datetime
from src.crawlers.dto import NewsItemDTO
from src.processor.dto import EnrichedNewsItemDTO
from src.orchestrator.service import TrendScoutService
@pytest.mark.asyncio
async def test_run_iteration():
# Arrange
crawler_mock = AsyncMock()
processor_mock = AsyncMock()
storage_mock = AsyncMock()
notifier_mock = AsyncMock()
timestamp = datetime.now()
news_item = NewsItemDTO(
title="Test Title",
url="http://example.com",
content_text="Sample text",
source="Source",
timestamp=timestamp
)
high_relevance_item = EnrichedNewsItemDTO(
**news_item.model_dump(),
relevance_score=8,
summary_ru="Summary",
anomalies_detected=[]
)
anomaly_item = EnrichedNewsItemDTO(
**news_item.model_dump(),
relevance_score=5,
summary_ru="Summary",
anomalies_detected=["Anomaly"]
)
low_relevance_item = EnrichedNewsItemDTO(
**news_item.model_dump(),
relevance_score=5,
summary_ru="Summary",
anomalies_detected=[]
)
crawler_mock.fetch_latest.return_value = [news_item, news_item, news_item]
# Return different items for each call to simulate different results
processor_mock.analyze.side_effect = [
high_relevance_item,
anomaly_item,
low_relevance_item
]
service = TrendScoutService(
crawlers=[crawler_mock],
processor=processor_mock,
storage=storage_mock,
notifier=notifier_mock
)
# Act
await service.run_iteration()
# Assert
crawler_mock.fetch_latest.assert_called_once()
assert processor_mock.analyze.call_count == 3
assert storage_mock.store.call_count == 3
# Should only alert on high relevance (1) or anomalies (1), total 2 times
assert notifier_mock.send_alert.call_count == 2

View File

View File

@ -0,0 +1,104 @@
import os
import pytest
from datetime import datetime
from unittest.mock import AsyncMock, patch
from src.crawlers.dto import NewsItemDTO
from src.processor.dto import EnrichedNewsItemDTO
from src.processor.ollama_provider import OllamaProvider
@pytest.fixture
def sample_news_item():
return NewsItemDTO(
title="Test News",
url="http://example.com",
content_text="This is a test article about AI and NPU acceleration.",
source="Test Source",
timestamp=datetime.now()
)
def create_mock_session(mock_response_json):
class MockResponse:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def json(self):
return mock_response_json
def raise_for_status(self):
pass
class MockSession:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
def post(self, url, **kwargs):
return MockResponse()
return MockSession()
@pytest.mark.asyncio
async def test_ollama_provider_analyze_success(sample_news_item):
os.environ['OLLAMA_API_URL'] = 'http://localhost:11434/api/generate'
mock_response_json = {
"response": '{"relevance_score": 8, "summary_ru": "Тестовая статья про ИИ.", "anomalies_detected": ["NPU acceleration"]}'
}
provider = OllamaProvider()
with patch('aiohttp.ClientSession', return_value=create_mock_session(mock_response_json)):
result = await provider.analyze(sample_news_item)
assert isinstance(result, EnrichedNewsItemDTO)
assert result.title == "Test News"
assert result.relevance_score == 8
assert result.summary_ru == "Тестовая статья про ИИ."
assert result.anomalies_detected == ["NPU acceleration"]
@pytest.mark.asyncio
async def test_ollama_provider_analyze_empty_response(sample_news_item):
os.environ['OLLAMA_API_URL'] = 'http://localhost:11434/api/generate'
mock_response_json = {
"response": ""
}
provider = OllamaProvider()
with patch('aiohttp.ClientSession', return_value=create_mock_session(mock_response_json)):
result = await provider.analyze(sample_news_item)
assert isinstance(result, EnrichedNewsItemDTO)
assert result.relevance_score == 0
assert result.summary_ru == ""
assert result.anomalies_detected == []
@pytest.mark.asyncio
async def test_ollama_provider_analyze_malformed_json(sample_news_item):
os.environ['OLLAMA_API_URL'] = 'http://localhost:11434/api/generate'
mock_response_json = {
"response": "{ invalid json"
}
provider = OllamaProvider()
with patch('aiohttp.ClientSession', return_value=create_mock_session(mock_response_json)):
result = await provider.analyze(sample_news_item)
assert isinstance(result, EnrichedNewsItemDTO)
assert result.relevance_score == 0
assert "Error parsing LLM response" in result.summary_ru
assert result.anomalies_detected == []
@pytest.mark.asyncio
async def test_ollama_provider_analyze_markdown_json(sample_news_item):
os.environ['OLLAMA_API_URL'] = 'http://localhost:11434/api/generate'
mock_response_json = {
"response": "```json\n{\"relevance_score\": 5, \"summary_ru\": \"Markdown test\", \"anomalies_detected\": []}\n```"
}
provider = OllamaProvider()
with patch('aiohttp.ClientSession', return_value=create_mock_session(mock_response_json)):
result = await provider.analyze(sample_news_item)
assert isinstance(result, EnrichedNewsItemDTO)
assert result.relevance_score == 5
assert result.summary_ru == "Markdown test"
assert result.anomalies_detected == []

View File

@ -0,0 +1,116 @@
import pytest
import pytest_asyncio
from datetime import datetime, timezone
import chromadb
from chromadb.config import Settings
from src.processor.dto import EnrichedNewsItemDTO
from src.storage.chroma_store import ChromaStore
@pytest_asyncio.fixture
async def chroma_store():
# Use EphemeralClient for in-memory testing
client = chromadb.EphemeralClient(Settings(allow_reset=True))
client.reset()
store = ChromaStore(client=client, collection_name="test_collection")
yield store
client.reset()
@pytest.mark.asyncio
async def test_store_and_search(chroma_store: ChromaStore):
# 1. Arrange
item1 = EnrichedNewsItemDTO(
title="Apple announces new M4 chip",
url="https://example.com/apple-m4",
content_text="Apple has announced its newest M4 chip for next generation Macs. This processor brings massive AI improvements.",
source="TechNews",
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=9,
summary_ru="Apple анонсировала новый чип M4.",
anomalies_detected=["NPU acceleration"]
)
item2 = EnrichedNewsItemDTO(
title="Local bakery makes giant bread",
url="https://example.com/giant-bread",
content_text="A bakery in town just baked the world's largest loaf of bread, weighing over 1000 pounds.",
source="LocalNews",
timestamp=datetime(2023, 11, 2, 10, 0, tzinfo=timezone.utc),
relevance_score=2,
summary_ru="Местная пекарня испекла гигантский хлеб.",
anomalies_detected=[]
)
item3 = EnrichedNewsItemDTO(
title="NVIDIA reveals RTX 5090 with WebGPU support",
url="https://example.com/nvidia-rtx-5090",
content_text="NVIDIA's new RTX 5090 GPU fully accelerates WebGPU workloads for advanced edge AI applications.",
source="GPUWeekly",
timestamp=datetime(2023, 11, 3, 14, 0, tzinfo=timezone.utc),
relevance_score=10,
summary_ru="NVIDIA представила RTX 5090 с поддержкой WebGPU.",
anomalies_detected=["WebGPU", "Edge AI"]
)
# 2. Act
await chroma_store.store(item1)
await chroma_store.store(item2)
await chroma_store.store(item3)
# Search for AI and chip related news
search_results = await chroma_store.search("AI processor and GPU", limit=2)
# 3. Assert
assert len(search_results) == 2
# Expected: The Apple M4 chip and NVIDIA RTX 5090 are highly relevant to AI/GPU
titles = [res.title for res in search_results]
assert "NVIDIA reveals RTX 5090 with WebGPU support" in titles
assert "Apple announces new M4 chip" in titles
assert "Local bakery makes giant bread" not in titles
# Check if properties are correctly restored for one of the items
for res in search_results:
if "NVIDIA" in res.title:
assert res.relevance_score == 10
assert "WebGPU" in res.anomalies_detected
assert "Edge AI" in res.anomalies_detected
assert "NVIDIA's new RTX 5090" in res.content_text
assert res.source == "GPUWeekly"
@pytest.mark.asyncio
async def test_search_empty_store(chroma_store: ChromaStore):
results = await chroma_store.search("test query", limit=5)
assert len(results) == 0
@pytest.mark.asyncio
async def test_store_upsert(chroma_store: ChromaStore):
item1 = EnrichedNewsItemDTO(
title="Apple announces new M4 chip",
url="https://example.com/apple-m4",
content_text="Apple has announced its newest M4 chip for next generation Macs.",
source="TechNews",
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=9,
summary_ru="Apple анонсировала новый чип M4.",
anomalies_detected=["NPU acceleration"]
)
# Store first time
await chroma_store.store(item1)
results = await chroma_store.search("Apple", limit=5)
assert len(results) == 1
assert results[0].relevance_score == 9
# Modify item and store again (same URL, should upsert)
item1_updated = item1.model_copy()
item1_updated.relevance_score = 10
item1_updated.summary_ru = "Apple анонсировала чип M4. Обновлено."
await chroma_store.store(item1_updated)
results_updated = await chroma_store.search("Apple", limit=5)
# Should still be 1 item, but updated
assert len(results_updated) == 1
assert results_updated[0].relevance_score == 10
assert results_updated[0].summary_ru == "Apple анонсировала чип M4. Обновлено."