Compare commits

..

13 Commits

Author SHA1 Message Date
9daf07b72d Update Ollama prompt and crawler sources
- crawlers.yml appended with more google scholar topics, removed habr AI
- in LLM prompt removed C++ trends relation and changed web rendering to
  web engine
2026-03-16 13:45:20 +03:00
7490970a93 Update Ollama prompt categories to include System Tools and match R&D targets 2026-03-16 13:36:55 +03:00
66399f23ab Update Ollama prompt to a unified Strategic Tech Scout format with stricter AI penalty 2026-03-16 13:30:28 +03:00
fbdb7d7806 feat(ai): optimize processor for academic content
- Add specialized prompt branch for research papers and SOTA detection
- Improve Russian summarization quality for technical abstracts
- Update relevance scoring to prioritize NPU/Edge AI breakthroughs
- Add README.md with project overview
2026-03-16 00:11:19 +03:00
a304ae9cd2 feat(crawler): add academic and research sources
- Implement crawlers for Microsoft Research, SciRate, and Google Scholar
- Use Playwright with stealth for Google Scholar anti-bot mitigation
- Update CrawlerFactory to support new research crawler types
- Add unit and integration tests for all academic sources with high coverage
2026-03-16 00:11:15 +03:00
65fccbc614 feat(storage): implement hybrid search and fix async chroma i/o
- Add ADR 001 for Hybrid Search Architecture
- Implement Phase 1 (Exact Match) and Phase 2 (Semantic Fallback) in ChromaStore
- Wrap blocking ChromaDB calls in asyncio.to_thread
- Update IVectorStore interface to support category filtering and thresholds
- Add comprehensive tests for hybrid search logic
2026-03-16 00:11:07 +03:00
217037f72e feat(crawlers): convert multiple sources from Playwright to Static/RSS
- Added `StaticCrawler` for generic aiohttp+BS4 parsing.
- Added `SkolkovoCrawler` for specialized Next.js parsing of sk.ru.
- Converted ICRA 2025, RSF, CES 2025, and Telegram Addmeto to `static`.
- Converted Horizon Europe to `rss` using its native feed.
- Updated `CrawlerFactory` to support new crawler types.
- Validated changes with unit tests.
2026-03-15 21:21:14 +03:00
a363ca41cf feat(crawlers): implement specialized CppConf crawler and AI analysis
- Added CppConfCrawler using aiohttp and regex to parse Next.js JSON data, skipping the Playwright bottleneck.
- Added C++ specific prompts to OllamaProvider for trend analysis (identifying C++26, memory safety, coroutines).
- Created offline pytest fixtures and TDD unit tests for the parser.
- Created end-to-end pipeline test mapping Crawler -> AI Processor -> Vector DB.
2026-03-15 20:34:39 +03:00
a0eeba0918 Enhance /hottest command with optional limit 2026-03-15 01:34:33 +03:00
9fdb4b35cd Implement 'Top Ranked' feature and expand Habr sources 2026-03-15 01:32:25 +03:00
019d9161de Update crawler selectors and add comprehensive tests 2026-03-15 00:48:27 +03:00
87af585e1b Refactor crawlers configuration and add new sources
- Move hard-coded crawlers from main.py to crawlers.yml
- Use CrawlerFactory to load configuration
- Add 9 new sources: C++ Russia, ICRA 2025, Technoprom, INNOPROM, Hannover Messe, RSF, Skolkovo, Horizon Europe, Addmeto
- Update task list
2026-03-15 00:45:04 +03:00
9c31977e98 [feat] playwright crawler
:Release Notes:
-

:Detailed Notes:
-

:Testing Performed:
-

:QA Notes:
as always AI generated

:Issues Addressed:
-
2026-03-14 20:13:53 +03:00
39 changed files with 3013 additions and 318 deletions

4
.gitignore vendored
View File

@ -214,3 +214,7 @@ __marimo__/
# Streamlit
.streamlit/secrets.toml
chroma_db/
hidden_docs/
.opencode

109
README.md Normal file
View File

@ -0,0 +1,109 @@
# Trend-Scout AI
**Trend-Scout AI** is an intelligent Telegram bot designed for automated monitoring, analysis, and summarization of technological trends. It was developed to support R&D activities (specifically within the context of LG Electronics R&D Lab in St. Petersburg) by scanning the environment for emerging technologies, competitive benchmarks, and scientific breakthroughs.
## 🚀 Key Features
- **Automated Multi-Source Crawling:** Monitors RSS feeds, scientific journals (Nature, Science), IT conferences (CES, CVPR), and corporate newsrooms using Playwright and Scrapy.
- **AI-Powered Analysis:** Utilizes LLMs (via Ollama API) to evaluate the relevance of news articles based on specific R&D landscapes (e.g., WebOS, Chromium, Edge AI).
- **Russian Summarization:** Automatically generates concise summaries in Russian for quick review.
- **Anomaly Detection:** Alerts users when there is a significant surge in mentions of specific technologies (e.g., "WebGPU", "NPU acceleration").
- **Semantic Search:** Employs a vector database (ChromaDB) to allow searching for trends and news by meaning rather than just keywords.
- **Telegram Interface:** Simple and effective interaction via Telegram for receiving alerts and querying the latest trends.
## 🏗 Architecture
The project follows a modular, agent-based architecture designed around SOLID principles and asynchronous I/O:
1. **Crawler Agent:** Responsible for fetching and parsing data from various sources into standardized DTOs.
2. **AI Processor Agent:** Enriches data by scoring relevance, summarizing content, and detecting technological anomalies using LLMs.
3. **Vector Storage Agent:** Manages persistent storage and semantic retrieval using ChromaDB.
4. **Telegram Bot Agent:** Handles user interaction, command processing (`/start`, `/latest`, `/help`), and notification delivery.
5. **Orchestrator:** Coordinates the flow between crawling, processing, and storage in periodic background iterations.
## 🛠 Tech Stack
- **Language:** Python 3.12+
- **Frameworks:** `aiogram` (Telegram Bot), `playwright` (Web Crawling), `pydantic` (Data Validation)
- **Database:** `ChromaDB` (Vector Store)
- **AI/LLM:** `Ollama` (local or cloud models)
- **Testing:** `pytest`, `pytest-asyncio`
- **Environment:** Docker-ready, `.env` for configuration
## 📋 Prerequisites
- Python 3.12 or higher
- [Ollama](https://ollama.ai/) installed and running (for AI processing)
- Playwright browsers installed (`playwright install chromium`)
## ⚙️ Installation & Setup
1. **Clone the repository:**
```bash
git clone https://github.com/your-repo/trend-scout-ai.git
cd trend-scout-ai
```
2. **Create and activate a virtual environment:**
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. **Install dependencies:**
```bash
pip install -r requirements.txt
playwright install chromium
```
4. **Configure environment variables:**
Create a `.env` file in the root directory:
```env
TELEGRAM_BOT_TOKEN=your_bot_token_here
TELEGRAM_CHAT_ID=your_chat_id_here
OLLAMA_API_URL=http://localhost:11434/api/generate
CHROMA_DB_PATH=./chroma_db
```
## 🏃 Usage
### Start the Bot and Background Crawler
To run the full system (bot + periodic crawler):
```bash
python -m src.main
```
### Run Manual Update
To trigger a manual crawl and update of the vector store:
```bash
python update_chroma_store.py
```
## 🧪 Testing
The project maintains a high test coverage following TDD principles.
Run all tests:
```bash
pytest
```
Run specific test categories:
```bash
pytest tests/crawlers/
pytest tests/processor/
pytest tests/storage/
```
## 📂 Project Structure
- `src/`: Core application logic.
- `bot/`: Telegram bot handlers and setup.
- `crawlers/`: Web scraping modules and factory.
- `processor/`: LLM integration and prompt logic.
- `storage/`: Vector database operations.
- `orchestrator/`: Main service coordination.
- `tests/`: Comprehensive test suite.
- `docs/`: Architecture Decision Records (ADR) and methodology.
- `chroma_db/`: Persistent vector storage (local).
- `requirements.txt`: Python dependencies.

View File

@ -0,0 +1,85 @@
# Crawler Refactoring & Source Expansion Development Tasks
## Specification Summary
**Original Requirements**: Move hard-coded crawlers from `src/main.py` to `src/crawlers.yml`. Add new sources from the provided table (IT Conferences, Scientific Forums, Exhibitions, Grants, Journals, Startups, Blogs).
**Technical Stack**: Python, aiogram, ChromaDB, Playwright, RSS, YAML.
**Target Timeline**: Immediate refactoring and expansion.
## Development Tasks
### [x] Task 1: Clean up `src/main.py`
**Description**: Refactor `src/main.py` to load crawlers from `src/crawlers.yml` using `CrawlerFactory.load_from_yaml()`.
**Acceptance Criteria**:
- `src/main.py` no longer contains hard-coded crawler instances.
- Bot starts and correctly loads crawlers from the YAML file.
- Logging confirms the number of loaded crawlers.
**Files to Edit**:
- `src/main.py`
### [x] Task 2: Verify and Update `src/crawlers.yml` for Existing Sources
**Description**: Ensure all crawlers previously hard-coded in `src/main.py` are present in `src/crawlers.yml`.
**Acceptance Criteria**:
- All 16 original sources from `main.py` are correctly configured in `crawlers.yml`.
- Selectors for Playwright crawlers (CVPR, CES) are verified.
**Files to Edit**:
- `src/crawlers.yml`
### [x] Task 3: Add New IT Conference Sources
**Description**: Add C++ Russia and ICRA 2025 to `crawlers.yml`.
**Acceptance Criteria**:
- C++ Russia (`https://cppconf.ru/`) added (suggest using Playwright).
- ICRA 2025 (`https://www.icra2025.org/`) added (suggest using Playwright).
- Correct selectors identified for both.
**Reference**: Table Category "IT Conferences"
### [x] Task 4: Add Scientific Forums and Exhibitions
**Description**: Add Technoprom-2025, INNOPROM-2025, and Hannover Messe.
**Acceptance Criteria**:
- Technoprom-2025 (`https://форумтехнопром.рф/`) added.
- INNOPROM-2025 (`https://innoprom.com/en/`) added.
- Hannover Messe (`https://www.hannovermesse.de/en/`) added.
- All use appropriate Playwright selectors.
**Reference**: Table Categories "Scientific Forums", "Exhibitions"
### [x] Task 5: Add Grants and Funds Sources
**Description**: Add RSF, Skolkovo, and Horizon Europe.
**Acceptance Criteria**:
- RSF (`https://rscf.ru/en/news/`) added.
- Skolkovo (`https://sk.ru/news/`) added.
- Horizon Europe (`https://research-and-innovation.ec.europa.eu/news_en`) added.
- Research if RSS is available for these, otherwise use Playwright.
**Reference**: Table Category "Grants and Funds"
### [x] Task 6: Add Telegram: Addmeto Source
**Description**: Add the Addmeto Telegram channel to the crawlers.
**Acceptance Criteria**:
- Source `https://t.me/s/addmeto` added.
- Use Playwright with selector `.tgme_widget_message_text` to extract content.
**Reference**: Table Category "Blogs and Channels"
### [x] Task 7: Quality Assurance and Integration Testing
**Description**: Verify that the new crawlers work and don't break the system.
**Acceptance Criteria**:
- Run existing tests: `pytest tests/`
- Run a trial iteration with a limited number of crawlers (can be done via a temporary test script).
- Verify that storage (ChromaDB) correctly handles new sources.
**QA Tool**: `./qa-playwright-capture.sh http://localhost:8000 public/qa-screenshots` (if applicable, though this is a bot, so maybe check logs).
## Quality Requirements
- [ ] All crawlers return standardized DTOs.
- [ ] No hard-coded credentials in YAML.
- [ ] Proper error handling for failed crawlers (already in Orchestrator).
- [ ] Summarization works for new sources in Russian.
## Technical Notes
**Crawler Types**:
- Use `rss` for Nature, Science, UFN, VC.ru, RB.ru, TAdviser, Google Blogs, Yandex Tech, Habr.
- Use `playwright` for Conferences, Exhibitions, and Telegram.

View File

@ -0,0 +1,112 @@
# Trend-Scout AI Semantic Search Fix Development Tasks
## Specification Summary
**Original Requirements**:
- Fix semantic search (ChromaDB) issues: `/latest` command ignores category filters.
- Fix `/search` command returning semantically irrelevant text.
- Follow TDD (Test-Driven Development), SOLID principles, and use `asyncio`.
**Technical Stack**: Python, asyncio, pytest, ChromaDB (Vector Storage), aiogram (Telegram Bot), Ollama (Embeddings/LLM).
**Target Timeline**: ~1 Development Day (6-8 hours)
## Execution Plan & Estimation
### Phase 1: Architectural Review & Setup (1 Hour)
Review the existing `IVectorStore` interface. Ensure that the interface supports passing metadata filters (for categories) and distance thresholds (for semantic relevance). Evaluate the embedding model being used, ensuring it supports Russian context effectively, as the AI Processor outputs summaries in Russian (`summary_ru`).
### Phase 2: TDD & Test Creation (2 Hours)
Strict adherence to TDD. Before touching the ChromaDB implementation, write failing `pytest` cases that mock the database and test that queries with category filters and relevance score thresholds return the expected subsets of data.
### Phase 3: ChromaDB Query Tuning & Implementation (2 Hours)
Implement the actual fixes in the ChromaDB wrapper. Map category arguments to ChromaDB's `where` metadata filter. Adjust the vector space distance metric (e.g., switching to `cosine` similarity via `hnsw:space`) and enforce a maximum distance threshold to drop irrelevant results.
### Phase 4: Bot Integration & E2E Testing (1-2 Hours)
Update the Telegram bot (`aiogram` handlers) to correctly extract category arguments from the `/latest` command and pass them to the Vector Storage Agent. Handle cases where `/search` returns no results due to the new relevance thresholds.
---
## Development Tasks
### [ ] Task 1: Architecture Review & Interface Update
**Description**: Update the `IVectorStore` interface to explicitly support metadata filtering and similarity thresholds.
**Acceptance Criteria**:
- `IVectorStore.search()` method signature accepts `filters: dict` and `max_distance: float` (or `min_relevance: float`).
- Existing mock classes/stubs are updated to match the new interface.
**Files to Create/Edit**:
- `src/vector_storage/interfaces.py`
**Estimation**: 30-45 minutes
**Reference**: Phase 1: Architectural Review
### [ ] Task 2: TDD Setup for Metadata Filtering
**Description**: Write failing tests for the Vector Storage Agent to verify category filtering.
**Acceptance Criteria**:
- Test verifies that calling `search()` with `filters={"category": "AI"}` only returns records with that exact metadata category.
- Test verifies that omitting the filter returns all categories.
- Tests must fail initially (Red phase of TDD).
**Files to Create/Edit**:
- `tests/vector_storage/test_chroma_filters.py`
**Estimation**: 45 minutes
**Reference**: Phase 2: TDD & Test Creation
### [ ] Task 3: TDD Setup for Semantic Relevance
**Description**: Write failing tests to verify that semantically irrelevant results are dropped based on distance/score thresholds.
**Acceptance Criteria**:
- Test inserts "apple", "banana", and "quantum computing". Searching for "fruit" with a strict threshold should return "apple" and "banana" but exclude "quantum computing".
- Tests must fail initially.
**Files to Create/Edit**:
- `tests/vector_storage/test_chroma_relevance.py`
**Estimation**: 45 minutes
**Reference**: Phase 2: TDD & Test Creation
### [ ] Task 4: Implement ChromaDB Metadata Filtering
**Description**: Fix the ChromaDB implementation to pass the `filters` dictionary into the `where` parameter of the ChromaDB `query` method.
**Acceptance Criteria**:
- The `tests/vector_storage/test_chroma_filters.py` tests pass (Green phase).
- Empty filters gracefully fall back to querying without the `where` clause.
**Files to Create/Edit**:
- `src/vector_storage/chroma_store.py`
**Estimation**: 30-45 minutes
**Reference**: Phase 3: ChromaDB Query Tuning
### [ ] Task 5: Tune Embeddings & Distance Thresholds
**Description**: Fix the ChromaDB implementation to respect `max_distance`. Ensure the collection is initialized with `hnsw:space` set to `cosine` (if applicable) for better semantic separation.
**Acceptance Criteria**:
- The ChromaDB `query` method filters out results where the returned `distances` exceed the `max_distance` threshold.
- The `tests/vector_storage/test_chroma_relevance.py` tests pass.
**Files to Create/Edit**:
- `src/vector_storage/chroma_store.py`
**Estimation**: 60 minutes
**Reference**: Phase 3: ChromaDB Query Tuning
### [ ] Task 6: Update Telegram Bot `/latest` Handler
**Description**: Fix the `/latest` command in the bot to parse category arguments and pass them to the Vector Store.
**Acceptance Criteria**:
- Command `/latest AI` successfully parses "AI" and calls `vector_store.search(filters={"category": "AI"})`.
- Command `/latest` defaults to no filters.
- Unit tests for the aiogram handler pass.
**Files to Create/Edit**:
- `src/bot/handlers/commands.py`
- `tests/bot/test_handlers.py`
**Estimation**: 45 minutes
**Reference**: Phase 4: Bot Integration
### [ ] Task 7: Update Telegram Bot `/search` Handler
**Description**: Fix the `/search` command to utilize the new semantic relevance threshold and handle empty results gracefully.
**Acceptance Criteria**:
- Command `/search [query]` calls `vector_store.search()` with an optimal `max_distance` threshold.
- If no results meet the threshold, the bot replies politely: "No highly relevant news found for your query." instead of showing garbage data.
**Files to Create/Edit**:
- `src/bot/handlers/commands.py`
- `tests/bot/test_handlers.py`
**Estimation**: 45 minutes
**Reference**: Phase 4: Bot Integration
## Quality Requirements
- [ ] 100% of new code must have `pytest` coverage.
- [ ] No blocking I/O calls; all ChromaDB and Telegram API interactions must use `asyncio` or run in executors if synchronous.
- [ ] Follow SOLID: Do not tightly couple the bot handlers directly to the ChromaDB client; route through `IVectorStore`.
- [ ] Ensure the embedding model used for Russian text (`summary_ru`) is correctly configured in the Vector Storage initialization.
## Technical Notes
**Development Stack**: Python, aiogram, ChromaDB, pytest, asyncio.
**Special Instructions**: ChromaDB's default distance function is `l2` (Squared L2). When comparing textual embeddings, `cosine` similarity is often much better at separating irrelevant text. Check the ChromaDB collection creation code to ensure `metadata={"hnsw:space": "cosine"}` is set. If changing this, the ChromaDB collection may need to be recreated/reindexed.
**Timeline Expectations**: ~5.5 to 7 hours.

View File

@ -0,0 +1,71 @@
# ADR 001: Architecture Design for Enhanced Semantic & Hybrid Search
## 1. Context and Problem Statement
The "Trend-Scout AI" bot currently utilizes a basic synchronous implementation of ChromaDB to fulfill both categorical retrieval (`/latest`) and free-text queries (`/search`). Two major issues have severely impacted the user experience:
1. **Incorrect Categories in `/latest`**: The system performs a dense vector search using the requested category name (e.g., "AI") rather than a deterministic exact match. This returns semantically related news regardless of their actual assigned category, yielding false positives.
2. **Poor Semantic Matches in `/search`**:
- The default English-centric embedding model (e.g., `all-MiniLM-L6-v2`) handles Russian summaries and specialized technical acronyms poorly.
- Pure vector search ignores exact keyword matches, leading to frustrated user expectations when searching for specific entities (e.g., "OpenAI o1" or specific version numbers).
3. **Blocking I/O operations**: The `ChromaStore` executes blocking synchronous operations within `async def` wrappers, potentially starving the `asyncio` event loop and violating asynchronous data flow requirements.
## 2. Decision Drivers
* **Accuracy & Relevance**: Strict categorization and high recall for exact keywords + conceptual similarity.
* **Multilingual Support**: Strong performance on both English source texts and Russian summaries.
* **Performance & Concurrency**: Fully non-blocking (async) operations.
* **Adherence to SOLID**: Maintain strict interface boundaries, dependency inversion, and existing Domain Transfer Objects (DTOs).
* **Alignment with Agent Architecture**: Ensure the Vector Storage Agent focuses strictly on storage/retrieval coordination without leaking AI processing duties.
## 3. Proposed Architecture
### 3.1. Asynchronous Data Flow (I/O)
* **Decision**: Migrate the local ChromaDB calls to run in a thread pool executor. Alternatively, if ChromaDB is hosted as a standalone server, utilize `chromadb.AsyncHttpClient`.
* **Implementation**: Encapsulate blocking calls like `self.collection.upsert()` and `self.collection.query()` inside `asyncio.to_thread()` to prevent blocking the Telegram bot's main event loop.
### 3.2. Interface Segregation (ISP) for Storage
The current `IVectorStore` interface conflates generic vector searching, exact categorical retrieval, and database administration.
* **Action**: Segregate the interfaces to adhere to ISP.
* **Refactored Interfaces**:
```python
class IStoreCommand(ABC):
@abstractmethod
async def store(self, item: EnrichedNewsItemDTO) -> None: ...
class IStoreQuery(ABC):
@abstractmethod
async def search_hybrid(self, query: str, limit: int = 5) -> List[EnrichedNewsItemDTO]: ...
@abstractmethod
async def get_latest_by_category(self, category: Optional[str], limit: int = 10) -> List[EnrichedNewsItemDTO]: ...
@abstractmethod
async def get_top_ranked(self, limit: int = 10) -> List[EnrichedNewsItemDTO]: ...
```
### 3.3. Strict Metadata Filtering for `/latest`
* **Mechanism**: The `/latest` command must completely bypass vector similarity search. Instead, it will use ChromaDB's `.get()` method coupled with a strict `where` metadata filter: `where={"category": {"$eq": category}}`.
* **Sorting Architecture**: Because ChromaDB does not natively support sorting results by a metadata field (like `timestamp`), the `get_latest_by_category` method will over-fetch (e.g., fetch up to 100 recent items using the metadata filter) and perform a fast, deterministic in-memory sort by `timestamp` descending before slicing to the requested `limit`.
### 3.4. Hybrid Search Architecture (Keyword + Vector)
* **Mechanism**: Implement a Hybrid Search Strategy utilizing **Reciprocal Rank Fusion (RRF)**.
* **Sparse Retrieval (Keyword)**: Integrate a lightweight keyword index alongside ChromaDB. Given the bot's scale, **SQLite FTS5 (Full-Text Search)** is the optimal choice. It provides persistent, fast token matching without the overhead of Elasticsearch.
* **Dense Retrieval (Vector)**: ChromaDB semantic search.
* **Fusion Strategy**:
1. The new `HybridSearchStrategy` issues queries to both the SQLite FTS index and ChromaDB concurrently using `asyncio.gather`.
2. The results are normalized using the RRF formula: `Score = 1 / (k + rank_sparse) + 1 / (k + rank_dense)` (where `k` is typically 60).
3. The combined list of DTOs is sorted by the fused score and returned.
### 3.5. Embedding Model Evaluation & Upgrade
* **Decision**: Replace the default ChromaDB embedding function with a dedicated, explicitly configured multilingual model.
* **Recommendation**: Utilize `intfloat/multilingual-e5-small` (for lightweight CPU environments) or `sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2`. Both provide excellent English-Russian cross-lingual semantic alignment.
* **Integration (DIP)**: Apply the Dependency Inversion Principle by injecting the embedding function (or an `IEmbeddingProvider` interface) into the `ChromaStore` constructor. This allows for seamless A/B testing of embedding models without touching the core storage logic.
## 4. Application to the Agent Architecture
* **Vector Storage Agent (Database)**: This agent's responsibility shifts from "pure vector storage" to "Hybrid Storage Management." It coordinates the `ChromaStore` (Dense) and `SQLiteStore` (Sparse) implementations.
* **AI Processor Agent**: To maintain Single Responsibility (SRP), embedding generation can be shifted from the storage layer to the AI Processor Agent. The AI Processor generates the vector using an Ollama hosted embedding model and attaches it directly to the `EnrichedNewsItemDTO`. The Storage Agent simply stores the pre-calculated vector, drastically reducing the dependency weight of the storage module.
## 5. Next Steps for Implementation
1. Add `sqlite3` FTS5 table initialization to the project scaffolding.
2. Refactor `src/storage/base.py` to segregate `IStoreQuery` and `IStoreCommand`.
3. Update `ChromaStore` to accept pre-calculated embeddings and utilize `asyncio.to_thread`.
4. Implement the RRF sorting algorithm in a new `search_hybrid` pipeline.
5. Update `src/bot/handlers.py` to route `/latest` through `get_latest_by_category`.

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
beautifulsoup4
aiohttp
aiogram
chromadb
playwright
playwright-stealth
pydantic
pytest
pytest-asyncio
python-dotenv
PyYAML

View File

@ -51,6 +51,7 @@ def get_router(storage: IVectorStore, processor: ILLMProvider, allowed_chat_id:
"/start - Start the bot\n"
"/help - Show this help message\n"
"/latest [category] - Show the latest enriched news trends\n"
"/hottest [limit] - Show top ranked hot trends (default 10, max 50)\n"
"/search query - Search for news\n"
"/stats - Show database statistics\n"
"/params - Show LLM processor parameters\n"
@ -76,8 +77,8 @@ def get_router(storage: IVectorStore, processor: ILLMProvider, allowed_chat_id:
"""
This handler receives messages with `/latest` command
"""
category = command.args if command.args else ""
items = await storage.search(query=category, limit=10)
category = command.args.strip() if command.args and command.args.strip() else None
items = await storage.get_latest(limit=10, category=category)
if not items:
await message.answer("No results found.")
@ -93,17 +94,64 @@ def get_router(storage: IVectorStore, processor: ILLMProvider, allowed_chat_id:
await message.answer("Latest news:", reply_markup=builder.as_markup())
@router.message(Command("hottest"))
async def command_hottest_handler(message: Message, command: CommandObject) -> None:
"""
This handler receives messages with `/hottest` command
"""
limit = 10
category = None
if command.args and command.args.strip():
parts = command.args.strip().split()
if len(parts) == 1:
if parts[0].isdigit():
limit = int(parts[0])
else:
category = parts[0]
else:
if parts[-1].isdigit():
limit = int(parts[-1])
category = " ".join(parts[:-1])
elif parts[0].isdigit():
limit = int(parts[0])
category = " ".join(parts[1:])
else:
category = command.args.strip()
if limit <= 0:
limit = 10
elif limit > 50:
limit = 50
items = await storage.get_top_ranked(limit=limit, category=category)
if not items:
await message.answer("No hot trends found yet.")
return
builder = InlineKeyboardBuilder()
for item in items:
item_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
builder.row(InlineKeyboardButton(
text=f"🔥 [{item.relevance_score}/10] {item.title}",
callback_data=f"detail:{item_id}"
))
await message.answer(f"Top {len(items)} Hottest Trends:", reply_markup=builder.as_markup())
@router.message(Command("search"))
async def command_search_handler(message: Message, command: CommandObject) -> None:
"""
This handler receives messages with `/search` command
"""
query = command.args
query = command.args.strip() if command.args and command.args.strip() else None
if not query:
await message.answer("Please provide a search query. Usage: /search query")
return
items = await storage.search(query=query, limit=10)
# Use a threshold to filter out low-relevance results for semantic search
items = await storage.search(query=query, limit=10, threshold=0.6)
if not items:
await message.answer("No results found.")
@ -124,6 +172,8 @@ def get_router(storage: IVectorStore, processor: ILLMProvider, allowed_chat_id:
"""
This handler receives callback queries for news details
"""
if not callback.data:
return
item_id = callback.data.split(":")[1]
item = await storage.get_by_id(item_id)
@ -153,7 +203,8 @@ def get_router(storage: IVectorStore, processor: ILLMProvider, allowed_chat_id:
response_text += f"<a href='{url}'>Read more</a>"
await callback.message.answer(response_text, parse_mode="HTML", disable_web_page_preview=False)
if isinstance(callback.message, Message):
await callback.message.answer(response_text, parse_mode="HTML", disable_web_page_preview=False)
await callback.answer()
@router.message(Command("stats"))

119
src/crawlers.yml Normal file
View File

@ -0,0 +1,119 @@
crawlers:
- type: rss
url: "https://www.nature.com/nature.rss"
source: "Nature"
- type: rss
url: "https://news.samsung.com/global/rss"
source: "Samsung Newsroom"
- type: playwright
url: "https://cvpr.thecvf.com/Conferences/2025"
source: "CVPR 2025"
selector: ".conference-news-item"
- type: static
url: "https://www.ces.tech/discover/?type=Article%2CSuccess+Story%2CPodcast&sort=desc&topics=Artificial+Intelligence%2CContent+and+Entertainment%2CAccessibility%2CInnovation+For+All"
source: "CES 2025"
selector: "h3"
- type: rss
url: "https://vc.ru/rss/tag/tech"
source: "VC.ru Tech"
- type: rss
url: "https://vc.ru/rss/tag/iot"
source: "vc.ru IoT"
- type: rss
url: "https://rb.ru/feeds/tag/iot"
source: "RB.ru IoT"
- type: rss
url: "https://www.science.org/rss/news_current.xml"
source: "Science News"
- type: rss
url: "https://ufn.ru/en/articles/rss.xml?pacs=03,84"
source: "УФН; PACS: 03,84"
- type: rss
url: "https://www.tadviser.ru/xml/tadviser.xml"
source: "TAdviser"
- type: rss
url: "https://blog.google/innovation-and-ai/technology/ai/rss/"
source: "Google AI Blog"
- type: rss
url: "https://habr.com/ru/rss/company/yandex/blog/"
source: "Yandex Tech"
- type: rss
url: "https://blog.google/products-and-platforms/products/chrome/rss/"
source: "Google Chrome Blog"
- type: rss
url: "https://blog.google/products-and-platforms/platforms/android/rss/"
source: "Google Android Blog"
- type: cppconf
url: "https://cppconf.ru/en/talks/"
source: "C++ Russia"
- type: static
url: "https://2025.ieee-icra.org/media/"
source: "ICRA 2025"
selector: "h4"
- type: playwright
url: "https://форумтехнопром.рф/"
source: "Technoprom-2025"
selector: ".news-item"
# - type: playwright
# url: "https://www.innoprom.com/en/media/news/"
# source: "INNOPROM-2025"
# selector: ".news-list__item"
- type: playwright
url: "https://www.hannovermesse.de/en/news/news-articles/"
source: "Hannover Messe"
selector: ".news-card"
- type: static
url: "https://rscf.ru/en/news/"
source: "RSF"
selector: ".news-item"
- type: skolkovo
url: "https://sk.ru/news/"
source: "Skolkovo"
- type: rss
url: "https://research-and-innovation.ec.europa.eu/node/2/rss_en"
source: "Horizon Europe"
- type: rss
url: "https://rb.ru/feeds/all/"
source: "RB.ru"
- type: rss
url: "https://habr.com/ru/rss/all/all/?fl=ru"
source: "Habr"
- type: static
url: "https://t.me/s/addmeto"
source: "Telegram: Addmeto"
selector: ".tgme_widget_message_text"
- type: rss
url: "https://habr.com/ru/rss/hubs/hi/articles/?fl=ru"
source: "Habr HighLoad"
- type: rss
url: "https://habr.com/ru/rss/hubs/complete_code/articles/?fl=ru"
source: "Habr Code Quality"
- type: rss
url: "https://habr.com/ru/rss/articles/rated100/?fl=ru"
source: "Habr High Ranked"
- type: rss
url: "https://www.microsoft.com/en-us/research/feed/"
source: "Microsoft Research"
- type: scirate
url: "https://scirate.com/"
source: "SciRate"
- type: scholar
url: "https://scholar.google.com/"
source: "Google Scholar WebGPU"
query: "WebGPU"
- type: scholar
url: "https://scholar.google.com/"
source: "Google Scholar NPU"
query: "NPU acceleration"
- type: scholar
url: "https://scholar.google.com/"
source: "Google Scholar Browsers"
query: "Browsers | Lightweight Web Engine"
- type: scholar
url: "https://scholar.google.com/"
source: "Google Scholar Performance"
query: "Software Optimization"
- type: scholar
url: "https://scholar.google.com/"
source: "Google Scholar BMI"
query: "Brain-machine interface (IoT|Webengine|Linux)"

View File

@ -0,0 +1,106 @@
import json
import re
import asyncio
from datetime import datetime, timezone
from typing import List
import aiohttp
from .base import ICrawler
from .dto import NewsItemDTO
class CppConfNextJsParser:
def _clean_html(self, raw_html: str) -> str:
if not raw_html:
return ""
# Remove html tags
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, ' ', raw_html)
# Remove extra whitespace
return ' '.join(cleantext.split())
def parse_talks(self, html: str) -> List[NewsItemDTO]:
match = re.search(r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', html)
if not match:
return []
try:
data = json.loads(match.group(1))
talks_by_day = data.get("props", {}).get("pageProps", {}).get("talksByDay", [])
except (json.JSONDecodeError, KeyError, TypeError):
return []
talks = []
for day in talks_by_day:
if "talks" not in day:
continue
for row in day["talks"]:
if len(row) < 2:
continue
# row[1] contains the actual list of talks happening at that time
for talk in row[1]:
if talk.get("isServiceTalk", False) or not talk.get("name"):
continue
title = talk["name"].get("en") or talk["name"].get("ru", "Unknown Title")
url = f"https://cppconf.ru/en/talks/{talk.get('id', '')}/"
# timestamp
time_str = talk.get("time") or talk.get("talkStartTime")
timestamp = datetime.now(timezone.utc)
if time_str:
try:
# format usually "2026-05-07T09:00:00Z"
timestamp = datetime.fromisoformat(time_str.replace("Z", "+00:00"))
except ValueError:
pass
# text content
short_desc = talk.get("shortDescription", {}).get("en", talk.get("shortDescription", {}).get("ru", ""))
long_desc = talk.get("longDescription", {}).get("en", talk.get("longDescription", {}).get("ru", ""))
desc = self._clean_html(short_desc) + " " + self._clean_html(long_desc)
# speakers
speakers = []
for speaker in talk.get("speakers", []):
name = speaker.get("name", {}).get("en") or speaker.get("name", {}).get("ru")
if name:
speakers.append(name)
speaker_str = f"Speakers: {', '.join(speakers)}. " if speakers else ""
content_text = f"{speaker_str}{desc}".strip()
# only keep talks with decent content
if not content_text:
content_text = "No description available."
talks.append(
NewsItemDTO(
title=title,
url=url,
content_text=content_text,
source="cppconf",
timestamp=timestamp
)
)
return talks
class CppConfCrawler(ICrawler):
def __init__(self, url: str, source: str = "cppconf"):
self.url = url
self.source = source
self.parser = CppConfNextJsParser()
async def fetch_latest(self) -> List[NewsItemDTO]:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
if response.status != 200:
return []
html = await response.text()
talks = self.parser.parse_talks(html)
for talk in talks:
talk.source = self.source
return talks

68
src/crawlers/factory.py Normal file
View File

@ -0,0 +1,68 @@
import yaml
import logging
from typing import List
from src.crawlers.base import ICrawler
from src.crawlers.rss_crawler import RSSCrawler
from src.crawlers.playwright_crawler import PlaywrightCrawler
from src.crawlers.cppconf_crawler import CppConfCrawler
from src.crawlers.static_crawler import StaticCrawler
from src.crawlers.skolkovo_crawler import SkolkovoCrawler
from src.crawlers.scirate_crawler import SciRateCrawler
from src.crawlers.scholar_crawler import ScholarCrawler
from src.crawlers.microsoft_research_crawler import MicrosoftResearchCrawler
logger = logging.getLogger(__name__)
class CrawlerFactory:
@staticmethod
def load_from_yaml(file_path: str) -> List[ICrawler]:
try:
with open(file_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if not config or not isinstance(config, dict):
logger.warning(f"Invalid or empty configuration in {file_path}")
return []
crawlers = []
for item in config.get('crawlers', []):
if not isinstance(item, dict):
continue
crawler_type = item.get('type')
url = item.get('url')
source = item.get('source')
if not source or (not url and crawler_type != 'scholar'):
logger.warning(f"Missing mandatory fields (url, source) for crawler: {item}")
continue
if crawler_type == 'rss':
crawlers.append(RSSCrawler(url=url, source=source))
elif crawler_type == 'playwright':
selector = item.get('selector')
crawlers.append(PlaywrightCrawler(url=url, source=source, selector=selector))
elif crawler_type == 'cppconf':
crawlers.append(CppConfCrawler(url=url, source=source))
elif crawler_type == 'static':
selector = item.get('selector')
if selector:
crawlers.append(StaticCrawler(url=url, source=source, selector=selector))
else:
logger.warning(f"Missing mandatory field 'selector' for static crawler: {item}")
elif crawler_type == 'skolkovo':
crawlers.append(SkolkovoCrawler(url=url, source=source))
elif crawler_type == 'scirate':
crawlers.append(SciRateCrawler(url=url, source=source))
elif crawler_type == 'scholar':
query = item.get('query', 'Artificial Intelligence')
crawlers.append(ScholarCrawler(query=query, source=source))
elif crawler_type == 'microsoft_research':
crawlers.append(MicrosoftResearchCrawler(url=url, source=source))
else:
logger.warning(f"Unknown crawler type: {crawler_type}")
return crawlers
except Exception as e:
logger.error(f"Failed to load crawlers from {file_path}: {e}")
return []

View File

@ -0,0 +1,7 @@
from typing import List
from .rss_crawler import RSSCrawler
from .dto import NewsItemDTO
class MicrosoftResearchCrawler(RSSCrawler):
def __init__(self, url: str = "https://www.microsoft.com/en-us/research/feed/", source: str = "Microsoft Research"):
super().__init__(url, source)

View File

@ -0,0 +1,73 @@
import logging
from typing import List, Optional
from playwright.async_api import async_playwright
from datetime import datetime
from urllib.parse import urljoin
from src.crawlers.base import ICrawler
from src.crawlers.dto import NewsItemDTO
logger = logging.getLogger(__name__)
class PlaywrightCrawler(ICrawler):
def __init__(self, url: str, source: str, selector: Optional[str] = None):
self.url = url
self.source = source
self.selector = selector
async def fetch_latest(self) -> List[NewsItemDTO]:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
try:
try:
await page.goto(self.url, wait_until="networkidle", timeout=60000)
news_items = []
if self.selector:
elements = await page.query_selector_all(self.selector)
for el in elements:
# Try to find a link and title within the element
# If the element itself is an 'a' tag
if await el.evaluate("node => node.tagName === 'A'"):
link_el = el
else:
link_el = await el.query_selector('a')
if link_el:
title = await link_el.inner_text()
href = await link_el.get_attribute('href')
if href:
full_url = urljoin(self.url, href)
news_items.append(
NewsItemDTO(
title=title.strip(),
url=full_url,
content_text="",
source=self.source,
timestamp=datetime.now()
)
)
else:
# Fallback: extract h2 titles as a simple heuristic
elements = await page.query_selector_all('h2')
for el in elements:
title = await el.inner_text()
if title.strip():
news_items.append(
NewsItemDTO(
title=title.strip(),
url=self.url,
content_text="",
source=self.source,
timestamp=datetime.now()
)
)
return news_items
except Exception as e:
logger.error(f"Error crawling {self.url}: {e}")
return []
finally:
await browser.close()

View File

@ -0,0 +1,92 @@
import logging
from typing import List, Optional
from playwright.async_api import async_playwright
from playwright_stealth import Stealth
from datetime import datetime, timezone
from urllib.parse import urljoin
from .base import ICrawler
from .dto import NewsItemDTO
logger = logging.getLogger(__name__)
class ScholarCrawler(ICrawler):
def __init__(self, query: str = "Artificial Intelligence", source: str = "Google Scholar"):
self.query = query
# Google Scholar query URL
self.url = f"https://scholar.google.com/scholar?hl=en&q={query.replace(' ', '+')}"
self.source = source
async def fetch_latest(self) -> List[NewsItemDTO]:
try:
async with async_playwright() as p:
# Launch browser
browser = await p.chromium.launch(headless=True)
try:
# Create a new context with a realistic user agent
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
page = await context.new_page()
# Apply stealth to avoid detection
await Stealth().apply_stealth_async(page)
logger.info(f"Navigating to {self.url}")
await page.goto(self.url, wait_until="networkidle", timeout=60000)
# Check for CAPTCHA or blocking
content = await page.content()
if "CAPTCHA" in content or "not a robot" in content:
logger.warning("Google Scholar CAPTCHA or bot detection triggered")
return []
# Select result items
results = await page.query_selector_all(".gs_ri")
news_items = []
for res in results:
# Title element
title_el = await res.query_selector(".gs_rt a")
if not title_el:
continue
title = await title_el.inner_text()
url = await title_el.get_attribute("href")
# Snippet/Abstract
snippet_el = await res.query_selector(".gs_rs")
snippet = await snippet_el.inner_text() if snippet_el else ""
# Metadata (authors, journal, year)
metadata_el = await res.query_selector(".gs_a")
metadata = await metadata_el.inner_text() if metadata_el else ""
# Citation count (usually in the bottom links)
# We look for a link that starts with "Cited by"
citation_count = "0"
bottom_links = await res.query_selector_all(".gs_fl a")
for link in bottom_links:
text = await link.inner_text()
if "Cited by" in text:
citation_count = text.replace("Cited by", "").strip()
break
content_text = f"{metadata}\n\n{snippet}\n\nCitations: {citation_count}"
news_items.append(
NewsItemDTO(
title=title.strip(),
url=url or self.url,
content_text=content_text.strip(),
source=f"{self.source}: {self.query}",
timestamp=datetime.now(timezone.utc)
)
)
return news_items
finally:
await browser.close()
except Exception as e:
logger.error(f"Error crawling Google Scholar: {e}")
return []

View File

@ -0,0 +1,65 @@
import aiohttp
from datetime import datetime, timezone
from typing import List
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from .base import ICrawler
from .dto import NewsItemDTO
class SciRateCrawler(ICrawler):
def __init__(self, url: str = "https://scirate.com/", source: str = "SciRate"):
self.url = url
self.source = source
async def fetch_latest(self) -> List[NewsItemDTO]:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
async with aiohttp.ClientSession(headers=headers) as session:
try:
async with session.get(self.url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status != 200:
return []
html = await response.text()
return self.parse_html(html)
except Exception:
return []
def parse_html(self, html: str) -> List[NewsItemDTO]:
soup = BeautifulSoup(html, "html.parser")
items = []
# SciRate papers are typically in li.paper-list-item or div.paper
papers = soup.select("li.paper-list-item, div.paper")
for paper in papers:
title_el = paper.select_one(".title a")
if not title_el:
continue
title = title_el.get_text(strip=True)
link = title_el.get("href", "")
if isinstance(link, list):
link = link[0] if link else ""
if link and link.startswith("/"):
link = urljoin(self.url, link)
authors_el = paper.select_one(".authors")
authors = authors_el.get_text(strip=True) if authors_el else ""
abstract_el = paper.select_one(".abstract")
abstract = abstract_el.get_text(strip=True) if abstract_el else ""
content_text = f"Authors: {authors}\n\n{abstract}"
items.append(NewsItemDTO(
title=title,
url=link or self.url,
content_text=content_text.strip(),
source=self.source,
timestamp=datetime.now(timezone.utc)
))
return items

View File

@ -0,0 +1,66 @@
import json
import re
import aiohttp
from datetime import datetime, timezone
from typing import List
from .base import ICrawler
from .dto import NewsItemDTO
class SkolkovoCrawler(ICrawler):
def __init__(self, url: str, source: str = "Skolkovo"):
self.url = url
self.source = source
async def fetch_latest(self) -> List[NewsItemDTO]:
async with aiohttp.ClientSession() as session:
try:
async with session.get(self.url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status != 200:
return []
html = await response.text()
return self.parse_nextjs(html)
except Exception:
return []
def parse_nextjs(self, html: str) -> List[NewsItemDTO]:
match = re.search(r'<script id="__NEXT_DATA__" type="application/json">(.*?)</script>', html)
if not match:
return []
try:
data = json.loads(match.group(1))
news_data = data["props"]["pageProps"]["initialProps"]["homeStore"]["news"]
items_list = news_data.get("items", [])
except (KeyError, TypeError, json.JSONDecodeError):
return []
news_items = []
for item in items_list:
title = item.get("title", "")
# Slug is used for URL
slug = item.get("slug", "")
url = f"https://sk.ru/news/{slug}/" if slug else self.url
content_text = item.get("description", "")
# Clean up simple HTML if present
content_text = re.sub(r'<[^>]+>', ' ', content_text)
content_text = ' '.join(content_text.split())
# Timestamp
ts_str = item.get("published_at") or item.get("created_at")
timestamp = datetime.now(timezone.utc)
if ts_str:
try:
timestamp = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except ValueError:
pass
news_items.append(NewsItemDTO(
title=title,
url=url,
content_text=content_text,
source=self.source,
timestamp=timestamp
))
return news_items

View File

@ -0,0 +1,81 @@
import asyncio
import aiohttp
import re
from typing import List
from datetime import datetime, timezone
from bs4 import BeautifulSoup
from .base import ICrawler
from .dto import NewsItemDTO
class StaticCrawler(ICrawler):
def __init__(self, url: str, source: str, selector: str):
self.url = url
self.source = source
self.selector = selector
async def fetch_latest(self) -> List[NewsItemDTO]:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
async with aiohttp.ClientSession(headers=headers) as session:
try:
async with session.get(self.url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status != 200:
return []
html = await response.text()
return self.parse_html(html)
except Exception:
return []
def parse_html(self, html: str) -> List[NewsItemDTO]:
soup = BeautifulSoup(html, "html.parser")
items = []
elements = soup.select(self.selector)
for el in elements:
# Try to find a link and title
all_links = el.find_all('a')
link_el = None
title = ""
# Find the first link that has text content
for a in all_links:
txt = a.get_text(strip=True)
if txt:
title = txt
link_el = a
break
# If no link with text, just take the first link and look for title elsewhere
if not link_el and all_links:
link_el = all_links[0]
title_el = el.find(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p'])
if title_el:
title = title_el.get_text(strip=True)
if not link_el:
continue
url = link_el.get('href') if link_el else ""
if isinstance(url, list):
url = url[0] if url else ""
if not title or not url:
continue
# Normalize URL
if str(url).startswith('/'):
from urllib.parse import urljoin
url = urljoin(self.url, str(url))
content_text = el.get_text(separator=" ", strip=True)
items.append(NewsItemDTO(
title=title,
url=str(url),
content_text=content_text,
source=self.source,
timestamp=datetime.now(timezone.utc)
))
return items

View File

@ -7,18 +7,18 @@ import chromadb
from aiogram import Bot, Dispatcher
from src.crawlers.base import ICrawler
from src.crawlers.rss_crawler import RSSCrawler
from src.crawlers.playwright_crawler import PlaywrightCrawler
from src.crawlers.factory import CrawlerFactory
from src.processor.ollama_provider import OllamaProvider
from src.storage.chroma_store import ChromaStore
from src.notifications.telegram import TelegramNotifier
from src.orchestrator.service import TrendScoutService
from src.bot.bot import setup_bot
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
async def background_task(orchestrator: TrendScoutService, interval: int = 3600):
"""Run the orchestrator periodically."""
while True:
@ -28,72 +28,60 @@ async def background_task(orchestrator: TrendScoutService, interval: int = 3600)
logger.info("Iteration completed successfully.")
except Exception as e:
logger.error(f"Error during iteration: {e}", exc_info=True)
logger.info(f"Sleeping for {interval} seconds before next iteration.")
await asyncio.sleep(interval)
async def main():
load_dotenv()
# Load configuration
bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
ollama_url = os.getenv("OLLAMA_API_URL", "http://localhost:11434/api/generate")
ollama_url = os.getenv(
"OLLAMA_API_URL", "http://localhost:11434/api/generate")
chroma_db_path = os.getenv("CHROMA_DB_PATH", "./chroma_db")
if not bot_token:
logger.error("TELEGRAM_BOT_TOKEN is missing!")
return
if not chat_id or chat_id == "YOUR_CHAT_ID_HERE":
logger.warning("TELEGRAM_CHAT_ID is missing or not set. Notifications will fail.")
logger.warning(
"TELEGRAM_CHAT_ID is missing or not set. Notifications will fail.")
# 1. Initialize Components that do not depend on Bot
crawlers: List[ICrawler] = [
RSSCrawler("https://habr.com/ru/rss/hubs/artificial_intelligence/articles/?fl=ru", source="Habr AI"),
RSSCrawler("https://www.nature.com/nature.rss", source="Nature"),
RSSCrawler("https://news.google.com/rss/search?q=WebOS+Chromium+Edge+AI+LGE+SmartTV&hl=en-US&gl=US&ceid=US:en", source="Google News R&D"),
RSSCrawler("https://news.samsung.com/global/rss", source="Samsung Newsroom"),
RSSCrawler("https://www.sony.com/en/SonyInfo/News/Service/rss.xml", source="Sony Newsroom"),
PlaywrightCrawler("https://cvpr.thecvf.com/Conferences/2025", source="CVPR 2025", selector=".conference-news-item"),
PlaywrightCrawler("https://www.ces.tech/news/press-releases.aspx", source="CES 2025", selector=".press-release-item"),
RSSCrawler("https://vc.ru/rss/tech", source="VC.ru Tech"),
RSSCrawler("https://rb.ru/rss/", source="RB.ru"),
RSSCrawler("https://www.science.org/rss/news_current.xml", source="Science News"),
RSSCrawler("https://ufn.ru/en/rss/", source="УФН"),
RSSCrawler("https://www.tadviser.ru/xml/tadviser.xml", source="TAdviser"),
RSSCrawler("https://habr.com/ru/rss/company/yandex/blog/", source="Yandex Tech"),
RSSCrawler("https://blog.google/technology/ai/rss/", source="Google AI Blog"),
]
crawlers = CrawlerFactory.load_from_yaml("src/crawlers.yml")
processor = OllamaProvider()
if chroma_db_path:
chroma_client = chromadb.PersistentClient(path=chroma_db_path)
else:
chroma_client = chromadb.Client()
storage = ChromaStore(client=chroma_client)
# 2. Initialize Bot & Dispatcher
bot, dp = setup_bot(bot_token, storage, processor, chat_id)
# 3. Initialize Notifier and Orchestrator
notifier = TelegramNotifier(bot, chat_id)
orchestrator = TrendScoutService(
crawlers=crawlers,
processor=processor,
storage=storage,
notifier=notifier
)
# 4. Start tasks
logger.info("Starting TrendScout AI Bot and Background Task...")
# Create the background task
bg_task = asyncio.create_task(background_task(orchestrator, interval=3600))
# Start polling the Telegram bot (blocking call)
try:
await dp.start_polling(bot)

View File

@ -6,6 +6,7 @@ from src.crawlers.dto import NewsItemDTO
from src.processor.base import ILLMProvider
from src.processor.dto import EnrichedNewsItemDTO
class OllamaProvider(ILLMProvider):
def get_info(self) -> dict[str, str]:
base_url = os.environ.get('OLLAMA_API_URL', 'http://localhost:11434')
@ -18,17 +19,36 @@ class OllamaProvider(ILLMProvider):
async def analyze(self, news_item: NewsItemDTO) -> EnrichedNewsItemDTO:
base_url = os.environ.get('OLLAMA_API_URL', 'http://localhost:11434')
url = base_url if base_url.endswith('/api/generate') else f"{base_url.rstrip('/')}/api/generate"
url = base_url if base_url.endswith(
'/api/generate') else f"{base_url.rstrip('/')}/api/generate"
prompt = (
f"Analyze the following article.\nTitle: {news_item.title}\n"
f"Content: {news_item.content_text}\n"
"Return JSON with 'relevance_score' (0-10), 'summary_ru' (string), 'anomalies_detected' (list of strings), and 'category' (string).\n"
"The 'summary_ru' MUST be in Russian and strictly NO MORE than 2 sentences.\n"
"The 'category' must be exactly one of: 'Browsers', 'Edge AI', 'SmartTV', 'Samsung New Technologies', 'Middleware new trends', 'Competitors', 'Other'.\n"
"For 'relevance_score', prioritize and give higher scores to articles related to R&D, Chromium, NPU, and Smart TV operating systems.\n"
"Regarding 'anomalies_detected': only detect factual, conceptual, or industry-related anomalies (e.g., sudden technological shifts, unexpected competitor moves). "
"DO NOT detect technical anomalies related to the text's formatting, HTML tags, metadata, or document structure. "
"If no real anomalies are found, return an empty list."
"Act as a Strategic Tech Scout for an R&D department specializing in WebEngine (Chromium) extensions, "
"cross-platform porting, Middleware platform solutions, and System Tools (SWE) for developers. "
"Evaluate ALL articles, including C++ conference talks and academic research, based on their value to these specific targets.\n\n"
f"Analyze the following article or research abstract.\nTitle: {news_item.title}\nSource: {news_item.source}\nContent: {news_item.content_text}\n\n"
"Return a JSON object strictly with these keys:\n"
"1. 'relevance_score' (integer 0-10): Score the potential impact on our R&D targets.\n"
"2. 'summary_ru' (string): A concise technical summary in Russian (2-3 sentences). Explain methodology, core innovation, and practical relevance.\n"
"3. 'anomalies_detected' (list of strings): Identify state-of-the-art (SOTA) breakthroughs, strategic disruptions, new standards, or unexpected results. Return [] if none.\n"
"4. 'category' (string): Must be exactly one of: 'WebEngines/Browsers', 'System Tools (SWE)', 'Middleware Platforms', 'Cross-Platform', 'SmartTV/IoT', 'Samsung New Technologies', 'Competitors', 'Academic/SOTA', 'Other'.\n\n"
"SCORING GUIDELINES ('relevance_score'):\n"
"Start with a base score:\n"
"- 9-10 (Core R&D): Breakthroughs in web engines, cross-platform frameworks, system tools, or SOTA research in web engines/middleware.\n"
"- 7-8 (Ecosystem): Solid improvements applicable to Automotive Content Platforms, IoT ecosystems, SmartTV OS, or major SWE tool improvements.\n"
"- 4-6 (Peripheral): Theoretical work, general programming news, or technologies with distant industrial application.\n"
"- 0-3 (Out of Scope): Pure medicine, social sciences, consumer electronics reviews, pure audio/acoustics.\n\n"
"AI PENALTY (CRITICAL):\n"
"AI mentions are unwanted. Penalize the 'relevance_score' if the article is about AI:\n"
"- Subtract 2 points for a minor or peripheral AI/ML mention.\n"
"- Subtract 5 points if it is primarily an AI/ML/LLM article.\n"
"Ensure the final score remains between 0 and 10.\n\n"
"ANOMALY DETECTION ('anomalies_detected'):\n"
"Do not just summarize. Look for strategic/architectural disruptions (e.g., a competitor abandoning a proprietary OS for Chromium, sudden new industry standards, convergence of WebTech with hardware, or research significantly outperforming current SOTA). Ignore technical text formatting issues."
)
payload = {
"model": os.environ.get('OLLAMA_MODEL', 'gpt-oss:120b-cloud'),
@ -36,29 +56,29 @@ class OllamaProvider(ILLMProvider):
"stream": False,
"format": "json"
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as response:
response.raise_for_status()
data = await response.json()
# Ollama returns the generated text inside 'response' key
generated_text = data.get('response', '')
if not generated_text:
generated_text = "{}"
# Strip markdown code blocks
cleaned_text = generated_text.strip()
if cleaned_text.startswith("```json"):
cleaned_text = cleaned_text[7:]
elif cleaned_text.startswith("```"):
cleaned_text = cleaned_text[3:]
if cleaned_text.endswith("```"):
cleaned_text = cleaned_text[:-3]
cleaned_text = cleaned_text.strip()
try:
parsed_json = json.loads(cleaned_text)
if not isinstance(parsed_json, dict):
@ -70,7 +90,7 @@ class OllamaProvider(ILLMProvider):
"anomalies_detected": [],
"category": "Other"
}
return EnrichedNewsItemDTO(
title=news_item.title,
url=news_item.url,
@ -79,6 +99,7 @@ class OllamaProvider(ILLMProvider):
timestamp=news_item.timestamp,
relevance_score=parsed_json.get('relevance_score', 0),
summary_ru=parsed_json.get('summary_ru', ''),
anomalies_detected=parsed_json.get('anomalies_detected', []),
anomalies_detected=parsed_json.get(
'anomalies_detected', []),
category=parsed_json.get('category', 'Other')
)

View File

@ -16,7 +16,7 @@ class IVectorStore(ABC):
pass
@abstractmethod
async def search(self, query: str, limit: int = 5) -> List[EnrichedNewsItemDTO]:
async def search(self, query: str, limit: int = 5, category: Optional[str] = None, threshold: Optional[float] = None) -> List[EnrichedNewsItemDTO]:
"""Search for items in the vector database."""
pass
@ -29,3 +29,13 @@ class IVectorStore(ABC):
async def get_stats(self) -> dict[str, int]:
"""Get storage statistics including total count and breakdown by category."""
pass
@abstractmethod
async def get_latest(self, limit: int = 10, category: Optional[str] = None) -> List[EnrichedNewsItemDTO]:
"""Retrieve latest items chronologically, optionally filtered by category."""
pass
@abstractmethod
async def get_top_ranked(self, limit: int = 10, category: Optional[str] = None) -> List[EnrichedNewsItemDTO]:
"""Retrieve top ranked items by relevance score, optionally filtered by category."""
pass

View File

@ -1,4 +1,6 @@
import uuid
import asyncio
import logging
from typing import List, Optional, Mapping, Any
from datetime import datetime
@ -8,6 +10,8 @@ from chromadb.api import ClientAPI
from src.storage.base import IVectorStore
from src.processor.dto import EnrichedNewsItemDTO
logger = logging.getLogger(__name__)
class ChromaStore(IVectorStore):
def __init__(self, client: ClientAPI, collection_name: str = "news_collection"):
self.client = client
@ -29,14 +33,15 @@ class ChromaStore(IVectorStore):
"anomalies_detected": ",".join(item.anomalies_detected) if item.anomalies_detected else ""
}
self.collection.upsert(
await asyncio.to_thread(
self.collection.upsert,
ids=[doc_id],
documents=[item.content_text],
metadatas=[metadata]
)
async def get_by_id(self, item_id: str) -> Optional[EnrichedNewsItemDTO]:
results = self.collection.get(ids=[item_id])
results = await asyncio.to_thread(self.collection.get, ids=[item_id])
metadatas = results.get('metadatas')
if not metadatas or not metadatas[0]:
@ -47,31 +52,74 @@ class ChromaStore(IVectorStore):
return self._reconstruct_dto(metadatas[0], document)
async def search(self, query: str, limit: int = 5) -> List[EnrichedNewsItemDTO]:
results = self.collection.query(
query_texts=[query],
n_results=limit
)
async def search(self, query: str, limit: int = 5, category: Optional[str] = None, threshold: Optional[float] = None) -> List[EnrichedNewsItemDTO]:
where: Any = {}
if category:
where["category"] = category
items = []
# Check if we have results
metadatas = results.get('metadatas')
if not metadatas or not metadatas[0]:
return items
seen_urls = set()
documents = results.get('documents')
# Phase 1: Try exact match
if query:
try:
keyword_results = await asyncio.to_thread(
self.collection.get,
where_document={"$contains": query},
where=where if where else None,
limit=limit,
include=["metadatas", "documents"]
)
kw_metadatas = keyword_results.get('metadatas') or []
kw_documents = keyword_results.get('documents') or []
for meta, doc in zip(kw_metadatas, kw_documents):
if meta:
dto = self._reconstruct_dto(meta, doc)
items.append(dto)
seen_urls.add(dto.url)
except Exception as e:
logger.warning(f"Phase 1 keyword search failed: {e}")
for idx, metadata in enumerate(metadatas[0]):
if metadata is None:
continue
# Only proceed to Phase 2 if we need more items
if len(items) < limit:
try:
semantic_results = await asyncio.to_thread(
self.collection.query,
query_texts=[query] if query else ["*"],
n_results=limit,
where=where if where else None
)
document = documents[0][idx] if documents and documents[0] else ""
items.append(self._reconstruct_dto(metadata, document))
# Sort items by relevance_score in descending order
items.sort(key=lambda x: x.relevance_score, reverse=True)
return items
metadatas = semantic_results.get('metadatas')
if metadatas and metadatas[0]:
documents = semantic_results.get('documents')
distances = semantic_results.get('distances')
for idx, metadata in enumerate(metadatas[0]):
if metadata is None:
continue
# Distance filtering (semantic threshold)
if threshold is not None and distances and distances[0]:
distance = distances[0][idx]
if distance > threshold:
continue
document = documents[0][idx] if documents and documents[0] else ""
dto = self._reconstruct_dto(metadata, document)
if dto.url not in seen_urls:
items.append(dto)
seen_urls.add(dto.url)
if len(items) >= limit:
break
except Exception as e:
logger.error(f"Phase 2 semantic search failed: {e}")
# Note: Do not re-sort by relevance_score here, as we want Exact Matches first,
# then Semantic Matches sorted by distance (which ChromaDB already returns).
return items[:limit]
def _reconstruct_dto(self, metadata: Mapping[str, Any], document: str) -> EnrichedNewsItemDTO:
anomalies_str = str(metadata.get("anomalies_detected", ""))
@ -83,7 +131,7 @@ class ChromaStore(IVectorStore):
url=str(metadata.get("url", "")),
content_text=str(document),
source=str(metadata.get("source", "")),
timestamp=datetime.fromisoformat(str(metadata.get("timestamp", ""))),
timestamp=datetime.fromisoformat(str(metadata['timestamp'])),
relevance_score=int(float(str(metadata.get("relevance_score", 0)))),
summary_ru=str(metadata.get("summary_ru", "")),
category=str(metadata.get("category", "")),
@ -92,12 +140,11 @@ class ChromaStore(IVectorStore):
async def exists(self, url: str) -> bool:
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, url))
result = self.collection.get(ids=[doc_id])
result = await asyncio.to_thread(self.collection.get, ids=[doc_id])
return len(result.get("ids", [])) > 0
async def get_stats(self) -> dict[str, int]:
# Retrieve all metadatas to calculate stats
results = self.collection.get(include=["metadatas"])
results = await asyncio.to_thread(self.collection.get, include=["metadatas"])
metadatas = results.get("metadatas")
if metadatas is None:
metadatas = []
@ -107,9 +154,56 @@ class ChromaStore(IVectorStore):
}
for meta in metadatas:
if meta:
if meta is not None:
# meta is a dict, but might not have 'category'
category = str(meta.get("category", "Uncategorized"))
key = f"category_{category}"
stats[key] = stats.get(key, 0) + 1
return stats
async def get_latest(self, limit: int = 10, category: Optional[str] = None) -> List[EnrichedNewsItemDTO]:
where: Any = {"category": category} if category else None
results = await asyncio.to_thread(
self.collection.get,
include=["metadatas", "documents"],
where=where
)
metadatas = results.get("metadatas") or []
documents = results.get("documents") or []
items = []
for meta, doc in zip(metadatas, documents):
if meta:
try:
items.append(self._reconstruct_dto(meta, doc))
except Exception:
pass
# Sort strictly by timestamp descending
items.sort(key=lambda x: x.timestamp, reverse=True)
return items[:limit]
async def get_top_ranked(self, limit: int = 10, category: Optional[str] = None) -> List[EnrichedNewsItemDTO]:
where: Any = {"category": category} if category else None
results = await asyncio.to_thread(
self.collection.get,
include=["metadatas", "documents"],
where=where
)
metadatas = results.get("metadatas") or []
documents = results.get("documents") or []
items = []
for meta, doc in zip(metadatas, documents):
if meta:
try:
items.append(self._reconstruct_dto(meta, doc))
except Exception:
pass
# Sort strictly by relevance_score descending
items.sort(key=lambda x: x.relevance_score, reverse=True)
return items[:limit]

View File

@ -0,0 +1,71 @@
import pytest
from unittest.mock import AsyncMock, MagicMock
from aiogram.types import Message
from aiogram.filters import CommandObject
from src.bot.handlers import get_router
from src.processor.dto import EnrichedNewsItemDTO
from datetime import datetime, timezone
@pytest.mark.asyncio
async def test_latest_command_with_category_passing():
# Arrange
storage = MagicMock()
storage.get_latest = AsyncMock(return_value=[])
processor = MagicMock()
message = MagicMock(spec=Message)
message.answer = AsyncMock()
command = CommandObject(command="latest", args="Tech")
# We need to call the handler directly or via the router
# For simplicity, let's call the handler function if it was exported,
# but it's defined inside get_router.
# Let's extract the handler from the router.
router = get_router(storage, processor, "123")
# Find the handler for /latest
handler = None
for observer in router.message.handlers:
if "latest" in str(observer.callback):
handler = observer.callback
break
assert handler is not None
# Act
await handler(message, command)
# Assert
# Verify that storage.get_latest was called with the category
storage.get_latest.assert_called_once_with(limit=10, category="Tech")
@pytest.mark.asyncio
async def test_search_command_with_threshold():
# Arrange
storage = MagicMock()
storage.search = AsyncMock(return_value=[])
processor = MagicMock()
message = MagicMock(spec=Message)
message.answer = AsyncMock()
command = CommandObject(command="search", args="AI News")
router = get_router(storage, processor, "123")
handler = None
for observer in router.message.handlers:
if "search" in str(observer.callback):
handler = observer.callback
break
assert handler is not None
# Act
await handler(message, command)
# Assert
# Verify that storage.search was called with a threshold
args, kwargs = storage.search.call_args
assert kwargs["query"] == "AI News"
assert "threshold" in kwargs
assert kwargs["threshold"] < 1.0 # Should have some threshold

View File

@ -26,6 +26,7 @@ def mock_item():
def mock_storage(mock_item):
storage = AsyncMock()
storage.search.return_value = [mock_item]
storage.get_latest.return_value = [mock_item]
storage.get_by_id.return_value = mock_item
storage.get_stats.return_value = {"total": 1, "AI": 1}
return storage
@ -115,6 +116,7 @@ async def test_command_latest_handler(router, mock_storage, allowed_chat_id):
await handler(message=message, command=command)
mock_storage.get_latest.assert_called_once_with(limit=10, category=None)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
assert "Latest news:" in args[0]
@ -136,7 +138,7 @@ async def test_command_search_handler(router, mock_storage, allowed_chat_id):
args, kwargs = message.answer.call_args
assert "Search results:" in args[0]
assert "reply_markup" in kwargs
mock_storage.search.assert_called_once_with(query="quantum", limit=10)
mock_storage.search.assert_called_once_with(query="quantum", limit=10, threshold=0.6)
@pytest.mark.asyncio
async def test_detail_callback_handler(router, mock_storage, mock_item):
@ -144,7 +146,7 @@ async def test_detail_callback_handler(router, mock_storage, mock_item):
callback = AsyncMock(spec=CallbackQuery)
item_id = str(uuid.uuid5(uuid.NAMESPACE_URL, mock_item.url))
callback.data = f"detail:{item_id}"
callback.message = AsyncMock()
callback.message = AsyncMock(spec=Message)
callback.message.answer = AsyncMock()
callback.answer = AsyncMock()
@ -170,6 +172,39 @@ async def test_command_stats_handler(router, mock_storage, allowed_chat_id):
args, kwargs = message.answer.call_args
assert "Database Statistics" in args[0]
@pytest.mark.asyncio
async def test_command_hottest_handler(router, mock_storage, allowed_chat_id, mock_item):
handler = get_handler(router, "command_hottest_handler")
message = AsyncMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
command = CommandObject(prefix="/", command="hottest", args=None)
mock_storage.get_top_ranked.return_value = [mock_item]
await handler(message=message, command=command)
mock_storage.get_top_ranked.assert_called_once_with(limit=10, category=None)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
assert "Top 1 Hottest Trends:" in args[0]
assert "reply_markup" in kwargs
assert "🔥" in str(kwargs["reply_markup"])
@pytest.mark.asyncio
async def test_command_hottest_handler_empty(router, mock_storage, allowed_chat_id):
handler = get_handler(router, "command_hottest_handler")
message = AsyncMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
command = CommandObject(prefix="/", command="hottest", args=None)
mock_storage.get_top_ranked.return_value = []
await handler(message=message, command=command)
message.answer.assert_called_once_with("No hot trends found yet.")
@pytest.mark.asyncio
async def test_access_middleware_allowed(allowed_chat_id):
middleware = AccessMiddleware(allowed_chat_id)

View File

@ -0,0 +1,168 @@
import uuid
import pytest
from unittest.mock import AsyncMock, MagicMock
from aiogram.types import Message, InlineKeyboardMarkup
from aiogram.filters import CommandObject
from datetime import datetime
from src.bot.handlers import get_router
from src.processor.dto import EnrichedNewsItemDTO
@pytest.fixture
def mock_storage():
return AsyncMock()
@pytest.fixture
def mock_processor():
processor = MagicMock()
processor.get_info.return_value = {"model": "test-model"}
return processor
@pytest.fixture
def allowed_chat_id():
return "123456789"
@pytest.fixture
def router(mock_storage, mock_processor, allowed_chat_id):
return get_router(mock_storage, mock_processor, allowed_chat_id)
def get_handler(router, callback_name):
for handler in router.message.handlers:
if handler.callback.__name__ == callback_name:
return handler.callback
raise ValueError(f"Handler {callback_name} not found")
@pytest.mark.asyncio
async def test_command_hottest_handler_success(router, mock_storage, allowed_chat_id):
"""
Test that /hottest command calls get_top_ranked and returns a list of items.
"""
# 1. Arrange
handler = get_handler(router, "command_hottest_handler")
message = AsyncMock()
message.chat = MagicMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
mock_items = [
EnrichedNewsItemDTO(
title=f"Hot News {i}",
url=f"https://example.com/{i}",
content_text=f"Content {i}",
source="Source",
timestamp=datetime.now(),
relevance_score=10-i,
summary_ru=f"Сводка {i}",
anomalies_detected=[],
category="Tech"
) for i in range(3)
]
mock_storage.get_top_ranked.return_value = mock_items
# 2. Act
command = CommandObject(prefix='/', command='hottest', args=None)
await handler(message=message, command=command)
# 3. Assert
mock_storage.get_top_ranked.assert_called_once_with(limit=10, category=None)
message.answer.assert_called_once()
args, kwargs = message.answer.call_args
assert "Top 3 Hottest Trends:" in args[0]
assert "reply_markup" in kwargs
assert isinstance(kwargs["reply_markup"], InlineKeyboardMarkup)
# Check if all 3 items are in the markup
markup = kwargs["reply_markup"]
assert len(markup.inline_keyboard) == 3
# Check if icons and scores are present
button_text = markup.inline_keyboard[0][0].text
assert "🔥" in button_text
assert "[10/10]" in button_text
assert "Hot News 0" in button_text
@pytest.mark.asyncio
async def test_command_hottest_handler_empty(router, mock_storage, allowed_chat_id):
"""
Test that /hottest command handles empty results correctly.
"""
# 1. Arrange
handler = get_handler(router, "command_hottest_handler")
message = AsyncMock()
message.chat = MagicMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
mock_storage.get_top_ranked.return_value = []
# 2. Act
command = CommandObject(prefix='/', command='hottest', args=None)
await handler(message=message, command=command)
# 3. Assert
mock_storage.get_top_ranked.assert_called_once_with(limit=10, category=None)
message.answer.assert_called_once_with("No hot trends found yet.")
@pytest.mark.asyncio
async def test_command_hottest_handler_custom_limit(router, mock_storage, allowed_chat_id):
"""
Test that /hottest command with custom limit correctly passes it to storage.
"""
# 1. Arrange
handler = get_handler(router, "command_hottest_handler")
message = AsyncMock()
message.chat = MagicMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
mock_storage.get_top_ranked.return_value = []
# 2. Act
command = CommandObject(prefix='/', command='hottest', args='25')
await handler(message=message, command=command)
# 3. Assert
mock_storage.get_top_ranked.assert_called_once_with(limit=25, category=None)
@pytest.mark.asyncio
async def test_command_hottest_handler_max_limit(router, mock_storage, allowed_chat_id):
"""
Test that /hottest command enforces maximum limit.
"""
# 1. Arrange
handler = get_handler(router, "command_hottest_handler")
message = AsyncMock()
message.chat = MagicMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
mock_storage.get_top_ranked.return_value = []
# 2. Act
command = CommandObject(prefix='/', command='hottest', args='1000')
await handler(message=message, command=command)
# 3. Assert
mock_storage.get_top_ranked.assert_called_once_with(limit=50, category=None)
@pytest.mark.asyncio
async def test_command_hottest_handler_invalid_limit(router, mock_storage, allowed_chat_id):
"""
Test that /hottest command handles invalid limit by falling back to default.
"""
# 1. Arrange
handler = get_handler(router, "command_hottest_handler")
message = AsyncMock()
message.chat = MagicMock()
message.chat.id = int(allowed_chat_id)
message.answer = AsyncMock()
mock_storage.get_top_ranked.return_value = []
# 2. Act
command = CommandObject(prefix='/', command='hottest', args='invalid')
await handler(message=message, command=command)
# 3. Assert
mock_storage.get_top_ranked.assert_called_once_with(limit=10, category='invalid')

View File

@ -0,0 +1,216 @@
import pytest
import aiohttp
from unittest.mock import AsyncMock, patch, MagicMock
from datetime import datetime, timezone
from src.crawlers.scirate_crawler import SciRateCrawler
from src.crawlers.scholar_crawler import ScholarCrawler
from src.crawlers.factory import CrawlerFactory
from src.crawlers.dto import NewsItemDTO
@pytest.mark.asyncio
async def test_scirate_crawler_parse_html():
crawler = SciRateCrawler()
sample_html = """
<li class="paper-list-item">
<div class="title"><a href="/arxiv/2403.12345">Quantum Supremacy in the Kitchen</a></div>
<div class="authors">John Doe, Jane Smith</div>
<div class="abstract">We demonstrate quantum supremacy by perfectly boiling an egg.</div>
</li>
<div class="paper">
<div class="title"><a href="https://scirate.com/arxiv/2403.67890">AI for Cats</a></div>
<div class="authors">Cat Lover</div>
<div class="abstract">A deep learning approach to understanding meows.</div>
</div>
"""
items = crawler.parse_html(sample_html)
assert len(items) == 2
assert items[0].title == "Quantum Supremacy in the Kitchen"
assert "arxiv/2403.12345" in items[0].url
assert "John Doe, Jane Smith" in items[0].content_text
assert "boiling an egg" in items[0].content_text
assert items[0].source == "SciRate"
assert items[1].title == "AI for Cats"
assert items[1].url == "https://scirate.com/arxiv/2403.67890"
assert "Cat Lover" in items[1].content_text
assert "meows" in items[1].content_text
@pytest.mark.asyncio
async def test_scirate_crawler_fetch_latest():
crawler = SciRateCrawler()
sample_html = """
<li class="paper-list-item">
<div class="title"><a href="/arxiv/2403.12345">Quantum Supremacy</a></div>
</li>
"""
with patch("aiohttp.ClientSession.get") as mock_get:
mock_response = AsyncMock()
mock_response.status = 200
mock_response.text.return_value = sample_html
mock_get.return_value.__aenter__.return_value = mock_response
items = await crawler.fetch_latest()
assert len(items) == 1
assert items[0].title == "Quantum Supremacy"
@pytest.mark.asyncio
async def test_scirate_crawler_fetch_error():
crawler = SciRateCrawler()
with patch("aiohttp.ClientSession.get") as mock_get:
mock_response = AsyncMock()
mock_response.status = 404
mock_get.return_value.__aenter__.return_value = mock_response
items = await crawler.fetch_latest()
assert items == []
@pytest.mark.asyncio
async def test_scholar_crawler_fetch_latest():
crawler = ScholarCrawler(query="WebGPU", source="Scholar")
with patch("src.crawlers.scholar_crawler.async_playwright") as mock_playwright, \
patch("src.crawlers.scholar_crawler.Stealth") as mock_stealth_class:
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_context = AsyncMock()
mock_browser.new_context.return_value = mock_context
mock_page = AsyncMock()
mock_context.new_page.return_value = mock_page
mock_page.content.return_value = "<html><body>Results</body></html>"
# Mock Stealth instance and method
mock_stealth_instance = MagicMock()
mock_stealth_instance.apply_stealth_async = AsyncMock()
mock_stealth_class.return_value = mock_stealth_instance
# Mock result elements
mock_res = AsyncMock()
mock_title_el = AsyncMock()
mock_title_el.inner_text.return_value = "WebGPU Accelerated ML"
mock_title_el.get_attribute.return_value = "https://arxiv.org/abs/2403.abc"
mock_snippet_el = AsyncMock()
mock_snippet_el.inner_text.return_value = "This paper discusses WebGPU..."
mock_metadata_el = AsyncMock()
mock_metadata_el.inner_text.return_value = "J. Smith, 2024 - arxiv.org"
mock_citation_link = AsyncMock()
mock_citation_link.inner_text.return_value = "Cited by 15"
mock_res.query_selector.side_effect = lambda selector: {
".gs_rt a": mock_title_el,
".gs_rs": mock_snippet_el,
".gs_a": mock_metadata_el
}.get(selector)
mock_res.query_selector_all.return_value = [mock_citation_link]
mock_page.query_selector_all.return_value = [mock_res]
items = await crawler.fetch_latest()
assert len(items) == 1
assert items[0].title == "WebGPU Accelerated ML"
assert items[0].url == "https://arxiv.org/abs/2403.abc"
assert "15" in items[0].content_text
assert "J. Smith, 2024" in items[0].content_text
assert items[0].source == "Scholar: WebGPU"
mock_browser.close.assert_called_once()
@pytest.mark.asyncio
async def test_scholar_crawler_captcha_detection():
crawler = ScholarCrawler(query="WebGPU", source="Scholar")
with patch("src.crawlers.scholar_crawler.async_playwright") as mock_playwright, \
patch("src.crawlers.scholar_crawler.Stealth") as mock_stealth_class:
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_context = AsyncMock()
mock_browser.new_context.return_value = mock_context
mock_page = AsyncMock()
mock_context.new_page.return_value = mock_page
# Mock Stealth instance and method
mock_stealth_instance = MagicMock()
mock_stealth_instance.apply_stealth_async = AsyncMock()
mock_stealth_class.return_value = mock_stealth_instance
# Simulate CAPTCHA in content
mock_page.content.return_value = "<html><body>Please verify you are not a robot CAPTCHA</body></html>"
items = await crawler.fetch_latest()
assert items == []
mock_browser.close.assert_called_once()
@pytest.mark.asyncio
async def test_scholar_crawler_error_handling():
crawler = ScholarCrawler(query="WebGPU", source="Scholar")
with patch("src.crawlers.scholar_crawler.async_playwright") as mock_playwright, \
patch("src.crawlers.scholar_crawler.Stealth") as mock_stealth_class:
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_context = AsyncMock()
mock_browser.new_context.return_value = mock_context
mock_page = AsyncMock()
mock_context.new_page.return_value = mock_page
mock_stealth_instance = MagicMock()
mock_stealth_instance.apply_stealth_async = AsyncMock()
mock_stealth_class.return_value = mock_stealth_instance
# Simulate exception during goto
mock_page.goto.side_effect = Exception("Browser crash")
items = await crawler.fetch_latest()
assert items == []
mock_browser.close.assert_called_once()
def test_factory_registration():
# Test if SciRate and Scholar are registered in the factory
with patch("builtins.open", MagicMock()):
with patch("yaml.safe_load") as mock_yaml:
mock_yaml.return_value = {
'crawlers': [
{'type': 'scirate', 'url': 'https://scirate.com/', 'source': 'SciRate'},
{'type': 'scholar', 'url': 'https://scholar.google.com/', 'source': 'Scholar', 'query': 'AI'}
]
}
crawlers = CrawlerFactory.load_from_yaml("fake_path.yml")
assert len(crawlers) == 2
assert isinstance(crawlers[0], SciRateCrawler)
assert isinstance(crawlers[1], ScholarCrawler)
assert crawlers[1].query == 'AI'

View File

@ -0,0 +1,23 @@
import pytest
from datetime import datetime
from src.crawlers.cppconf_crawler import CppConfNextJsParser
from src.crawlers.dto import NewsItemDTO
@pytest.fixture
def cppconf_html():
with open("tests/fixtures/cppconf/talks.html", "r", encoding="utf-8") as f:
return f.read()
def test_cppconf_parser(cppconf_html):
parser = CppConfNextJsParser()
talks = parser.parse_talks(cppconf_html)
assert len(talks) > 0, "Should extract at least one talk"
first_talk = talks[0]
assert isinstance(first_talk, NewsItemDTO)
assert len(first_talk.title) > 0
assert first_talk.url.startswith("https://cppconf.ru/en/talks/")
assert len(first_talk.content_text) > 0
assert first_talk.source == "cppconf"
assert isinstance(first_talk.timestamp, datetime)

View File

@ -0,0 +1,134 @@
import pytest
import yaml
from unittest.mock import patch, mock_open
from src.crawlers.factory import CrawlerFactory
from src.crawlers.rss_crawler import RSSCrawler
from src.crawlers.playwright_crawler import PlaywrightCrawler
from src.crawlers.scirate_crawler import SciRateCrawler
from src.crawlers.scholar_crawler import ScholarCrawler
from src.crawlers.microsoft_research_crawler import MicrosoftResearchCrawler
from src.crawlers.static_crawler import StaticCrawler
from src.crawlers.skolkovo_crawler import SkolkovoCrawler
from src.crawlers.cppconf_crawler import CppConfCrawler
VALID_YAML = """
crawlers:
- type: rss
url: "https://example.com/rss"
source: "Example RSS"
- type: playwright
url: "https://example.com/playwright"
source: "Example Playwright"
selector: ".item"
- type: scirate
url: "https://scirate.com/"
source: "SciRate"
- type: scholar
query: "AI"
source: "Google Scholar"
- type: microsoft_research
url: "https://example.com/msr"
source: "Microsoft Research"
"""
INVALID_TYPE_YAML = """
crawlers:
- type: unknown
url: "https://example.com/unknown"
source: "Unknown"
- type: rss
url: "https://example.com/rss"
source: "Example RSS"
"""
MALFORMED_YAML = """
crawlers:
- type: rss
[ missing stuff ]
"""
MISSING_KEYS_YAML = """
crawlers:
- type: rss
# url is missing
source: "Missing URL"
- url: "https://example.com/no-type"
source: "Missing Type"
"""
def test_load_from_yaml_valid():
with patch("builtins.open", mock_open(read_data=VALID_YAML)):
crawlers = CrawlerFactory.load_from_yaml("dummy.yml")
assert len(crawlers) == 5
assert isinstance(crawlers[0], RSSCrawler)
assert isinstance(crawlers[1], PlaywrightCrawler)
assert isinstance(crawlers[2], SciRateCrawler)
assert isinstance(crawlers[3], ScholarCrawler)
assert isinstance(crawlers[4], MicrosoftResearchCrawler)
def test_load_from_yaml_unknown_type():
with patch("builtins.open", mock_open(read_data=INVALID_TYPE_YAML)):
with patch("src.crawlers.factory.logger") as mock_logger:
crawlers = CrawlerFactory.load_from_yaml("dummy.yml")
assert len(crawlers) == 1
assert isinstance(crawlers[0], RSSCrawler)
mock_logger.warning.assert_called_with("Unknown crawler type: unknown")
def test_load_from_yaml_malformed():
with patch("builtins.open", mock_open(read_data=MALFORMED_YAML)):
with patch("src.crawlers.factory.logger") as mock_logger:
crawlers = CrawlerFactory.load_from_yaml("dummy.yml")
assert crawlers == []
# Error log should be called due to yaml.ScannerError or similar
mock_logger.error.assert_called()
def test_load_from_yaml_missing_keys():
with patch("builtins.open", mock_open(read_data=MISSING_KEYS_YAML)):
with patch("src.crawlers.factory.logger") as mock_logger:
crawlers = CrawlerFactory.load_from_yaml("dummy.yml")
# First item missing url -> skipped with warning
# Second item missing type -> warning in else block
assert len(crawlers) == 0
# Check for warnings
warning_calls = [call.args[0] for call in mock_logger.warning.call_args_list]
assert any("Missing mandatory fields" in msg for msg in warning_calls)
assert any("Unknown crawler type: None" in msg for msg in warning_calls)
def test_load_from_yaml_file_not_found():
with patch("src.crawlers.factory.logger") as mock_logger:
# We don't need to patch open here, just call with non-existent file
crawlers = CrawlerFactory.load_from_yaml("non_existent_file_12345.yml")
assert crawlers == []
mock_logger.error.assert_called()
def test_load_from_yaml_empty_file():
with patch("builtins.open", mock_open(read_data="")):
with patch("src.crawlers.factory.logger") as mock_logger:
crawlers = CrawlerFactory.load_from_yaml("empty.yml")
assert crawlers == []
mock_logger.warning.assert_called_with("Invalid or empty configuration in empty.yml")
def test_integration_load_actual_config():
# This test verifies that the real src/crawlers.yml can be loaded without errors or warnings.
with patch("src.crawlers.factory.logger") as mock_logger:
crawlers = CrawlerFactory.load_from_yaml("src/crawlers.yml")
assert len(crawlers) > 0
mock_logger.warning.assert_not_called()
mock_logger.error.assert_not_called()
# Verify types and mandatory fields for all loaded crawlers
for crawler in crawlers:
assert isinstance(crawler, (RSSCrawler, PlaywrightCrawler, StaticCrawler, SkolkovoCrawler, CppConfCrawler, SciRateCrawler, ScholarCrawler, MicrosoftResearchCrawler))
if not isinstance(crawler, ScholarCrawler):
assert crawler.url.startswith("http")
assert crawler.source
if isinstance(crawler, PlaywrightCrawler):
# According to src/crawlers.yml, all playwright crawlers currently have selectors
assert crawler.selector

View File

@ -0,0 +1,38 @@
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from datetime import datetime, timezone
from src.crawlers.microsoft_research_crawler import MicrosoftResearchCrawler
from src.crawlers.dto import NewsItemDTO
MOCK_MSR_RSS = """<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>Microsoft Research</title>
<item>
<title>MSR Paper Title</title>
<link>https://www.microsoft.com/en-us/research/publication/msr-paper/</link>
<description>MSR Paper Description</description>
<pubDate>Mon, 10 Mar 2026 10:00:00 GMT</pubDate>
</item>
</channel>
</rss>
"""
@pytest.mark.asyncio
async def test_microsoft_research_crawler_fetch_latest():
crawler = MicrosoftResearchCrawler()
with patch("aiohttp.ClientSession.get") as mock_get:
mock_response = AsyncMock()
mock_response.text.return_value = MOCK_MSR_RSS
mock_response.status = 200
mock_response.raise_for_status = MagicMock()
mock_get.return_value.__aenter__.return_value = mock_response
items = await crawler.fetch_latest()
assert len(items) == 1
assert items[0].title == "MSR Paper Title"
assert items[0].url == "https://www.microsoft.com/en-us/research/publication/msr-paper/"
assert items[0].source == "Microsoft Research"
assert items[0].timestamp == datetime(2026, 3, 10, 10, 0, tzinfo=timezone.utc)

View File

@ -0,0 +1,27 @@
import pytest
import aiohttp
from src.crawlers.static_crawler import StaticCrawler
from src.crawlers.skolkovo_crawler import SkolkovoCrawler
from src.crawlers.dto import NewsItemDTO
@pytest.mark.asyncio
async def test_static_crawler_addmeto():
crawler = StaticCrawler(url="https://t.me/s/addmeto", source="Telegram: Addmeto", selector=".tgme_widget_message_text")
items = await crawler.fetch_latest()
assert len(items) > 0
assert items[0].source == "Telegram: Addmeto"
@pytest.mark.asyncio
async def test_static_crawler_rsf():
crawler = StaticCrawler(url="https://rscf.ru/en/news/", source="RSF", selector=".news-item")
items = await crawler.fetch_latest()
assert len(items) > 0
assert items[0].source == "RSF"
assert "rscf.ru" in items[0].url
@pytest.mark.asyncio
async def test_skolkovo_crawler():
crawler = SkolkovoCrawler(url="https://sk.ru/news/", source="Skolkovo")
items = await crawler.fetch_latest()
assert len(items) > 0
assert items[0].source == "Skolkovo"
assert "sk.ru" in items[0].url

View File

@ -0,0 +1,99 @@
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from src.crawlers.playwright_crawler import PlaywrightCrawler
from src.crawlers.dto import NewsItemDTO
@pytest.mark.asyncio
async def test_playwright_crawler_fetch_latest_with_selector():
url = "https://example.com/news"
source = "ExampleSource"
selector = ".news-item"
crawler = PlaywrightCrawler(url, source, selector)
with patch("src.crawlers.playwright_crawler.async_playwright") as mock_playwright:
# Mocking the async context manager chain
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_page = AsyncMock()
mock_browser.new_page.return_value = mock_page
# Setup mock elements
mock_element = AsyncMock()
mock_element.evaluate.return_value = False # Assume it's not an 'a' tag itself
mock_link = AsyncMock()
mock_link.inner_text.return_value = "Test News Title"
mock_link.get_attribute.return_value = "/news/1"
mock_element.query_selector.return_value = mock_link
mock_page.query_selector_all.return_value = [mock_element]
results = await crawler.fetch_latest()
assert len(results) == 1
assert results[0].title == "Test News Title"
assert results[0].url == "https://example.com/news/1"
assert results[0].source == source
mock_page.goto.assert_called_once_with(url, wait_until="networkidle", timeout=60000)
mock_browser.close.assert_called_once()
@pytest.mark.asyncio
async def test_playwright_crawler_fetch_latest_no_selector():
url = "https://example.com/blog"
source = "ExampleBlog"
crawler = PlaywrightCrawler(url, source)
with patch("src.crawlers.playwright_crawler.async_playwright") as mock_playwright:
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_page = AsyncMock()
mock_browser.new_page.return_value = mock_page
# Setup mock elements for fallback (h2)
mock_h2 = AsyncMock()
mock_h2.inner_text.return_value = "Headline Title"
mock_page.query_selector_all.return_value = [mock_h2]
results = await crawler.fetch_latest()
assert len(results) == 1
assert results[0].title == "Headline Title"
assert results[0].url == url
assert results[0].source == source
@pytest.mark.asyncio
async def test_playwright_crawler_fetch_latest_error():
url = "https://example.com/error"
source = "ErrorSource"
crawler = PlaywrightCrawler(url, source)
with patch("src.crawlers.playwright_crawler.async_playwright") as mock_playwright:
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_page = AsyncMock()
mock_browser.new_page.return_value = mock_page
# Simulate an error in page.goto
mock_page.goto.side_effect = Exception("Crawl failed")
results = await crawler.fetch_latest()
assert results == []
mock_browser.close.assert_called_once()

View File

@ -0,0 +1,115 @@
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from datetime import datetime, timezone
from src.crawlers.scholar_crawler import ScholarCrawler
from src.crawlers.dto import NewsItemDTO
@pytest.mark.asyncio
async def test_scholar_crawler_fetch_latest():
query = "Large Language Models"
source = "Google Scholar"
crawler = ScholarCrawler(query=query, source=source)
with patch("src.crawlers.scholar_crawler.async_playwright") as mock_playwright, \
patch("src.crawlers.scholar_crawler.Stealth") as mock_stealth_class:
mock_stealth = MagicMock()
mock_stealth.apply_stealth_async = AsyncMock()
mock_stealth_class.return_value = mock_stealth
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_context = AsyncMock()
mock_browser.new_context.return_value = mock_context
mock_page = AsyncMock()
mock_context.new_page.return_value = mock_page
# Mock content to avoid CAPTCHA detection in crawler
mock_page.content.return_value = "<html><body>Results</body></html>"
# Setup mock results
mock_res = AsyncMock()
# Title element
mock_title_el = AsyncMock()
mock_title_el.inner_text.return_value = "LLM Paper Title"
mock_title_el.get_attribute.return_value = "https://arxiv.org/abs/2401.00001"
mock_res.query_selector.side_effect = lambda selector: {
".gs_rt a": mock_title_el,
".gs_rs": AsyncMock(inner_text=AsyncMock(return_value="This is a snippet")),
".gs_a": AsyncMock(inner_text=AsyncMock(return_value="Authors et al.")),
}.get(selector)
# Citations
mock_citation_link = AsyncMock()
mock_citation_link.inner_text.return_value = "Cited by 123"
mock_res.query_selector_all.return_value = [mock_citation_link]
mock_page.query_selector_all.return_value = [mock_res]
items = await crawler.fetch_latest()
assert len(items) == 1
assert items[0].title == "LLM Paper Title"
@pytest.mark.asyncio
async def test_scholar_crawler_no_title():
crawler = ScholarCrawler()
with patch("src.crawlers.scholar_crawler.async_playwright") as mock_playwright:
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_context = AsyncMock()
mock_browser.new_context.return_value = mock_context
mock_page = AsyncMock()
mock_context.new_page.return_value = mock_page
mock_page.content.return_value = "<html><body>Results</body></html>"
# Result item without title link
mock_res = AsyncMock()
mock_res.query_selector.return_value = None
mock_page.query_selector_all.return_value = [mock_res]
items = await crawler.fetch_latest()
assert len(items) == 0
@pytest.mark.asyncio
async def test_scholar_crawler_exception():
crawler = ScholarCrawler()
with patch("src.crawlers.scholar_crawler.async_playwright") as mock_playwright:
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
# Force exception
mock_browser.new_context.side_effect = Exception("Browser error")
items = await crawler.fetch_latest()
assert items == []
@pytest.mark.asyncio
async def test_scholar_crawler_captcha():
crawler = ScholarCrawler()
with patch("src.crawlers.scholar_crawler.async_playwright") as mock_playwright:
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_context = AsyncMock()
mock_browser.new_context.return_value = mock_context
mock_page = AsyncMock()
mock_context.new_page.return_value = mock_page
# Simulate CAPTCHA
mock_page.content.return_value = "<html><body>Please solve this CAPTCHA</body></html>"
items = await crawler.fetch_latest()
assert items == []

View File

@ -0,0 +1,90 @@
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from datetime import datetime, timezone
from src.crawlers.scirate_crawler import SciRateCrawler
from src.crawlers.dto import NewsItemDTO
MOCK_SCIRATE_HTML = """
<html>
<body>
<li class="paper-list-item">
<div class="title">
<a href="/arxiv/2403.12345">Attention is Really All You Need</a>
</div>
<div class="authors">Vaswani et al.</div>
<div class="abstract">This paper presents a new architecture...</div>
</li>
<div class="paper">
<div class="title">
<a href="https://example.com/paper2">Another Paper</a>
</div>
<div class="authors">Doe and Smith</div>
<div class="abstract">Abstract of another paper.</div>
</div>
</body>
</html>
"""
@pytest.mark.asyncio
async def test_scirate_crawler_fetch_latest():
url = "https://scirate.com/"
source = "SciRate"
crawler = SciRateCrawler(url, source)
# HTML with multiple items, one missing title, one with list-like link
mock_html = """
<html>
<body>
<li class="paper-list-item">
<div class="title"><a href="/arxiv/1">Paper 1</a></div>
</li>
<li class="paper-list-item">
<div class="title">No link here</div>
</li>
<li class="paper-list-item">
<div class="title"><a href="/arxiv/3">Paper 3</a></div>
</li>
</body>
</html>
"""
with patch("aiohttp.ClientSession.get") as mock_get:
mock_response = AsyncMock()
mock_response.text.return_value = mock_html
mock_response.status = 200
mock_get.return_value.__aenter__.return_value = mock_response
# We also want to test the 'isinstance(link, list)' part.
# This is tricky because BS4 normally doesn't return a list for href.
# But we can mock title_el.get to return a list.
with patch("bs4.element.Tag.get", side_effect=[["/arxiv/list"], "/arxiv/3"]):
items = await crawler.fetch_latest()
assert len(items) == 2
assert items[0].url == "https://scirate.com/arxiv/list"
assert items[1].url == "https://scirate.com/arxiv/3"
@pytest.mark.asyncio
async def test_scirate_crawler_exception():
crawler = SciRateCrawler()
with patch("aiohttp.ClientSession.get") as mock_get:
mock_response = AsyncMock()
mock_response.text.return_value = "<html></html>"
mock_response.status = 200
mock_get.return_value.__aenter__.return_value = mock_response
# Force an exception in parse_html
with patch.object(SciRateCrawler, 'parse_html', side_effect=Exception("Parsing failed")):
items = await crawler.fetch_latest()
assert items == []
@pytest.mark.asyncio
async def test_scirate_crawler_error():
crawler = SciRateCrawler()
with patch("aiohttp.ClientSession.get") as mock_get:
mock_response = AsyncMock()
mock_response.status = 500
mock_get.return_value.__aenter__.return_value = mock_response
items = await crawler.fetch_latest()
assert items == []

1
tests/fixtures/cppconf/talks.html vendored Normal file

File diff suppressed because one or more lines are too long

View File

@ -106,6 +106,28 @@ async def test_ollama_provider_analyze_markdown_json(sample_news_item):
assert result.anomalies_detected == []
assert result.category == "Browsers"
@pytest.mark.asyncio
async def test_ollama_provider_academic_content():
os.environ['OLLAMA_API_URL'] = 'http://localhost:11434/api/generate'
academic_item = NewsItemDTO(
title="Attention Is All You Need",
url="https://arxiv.org/abs/1706.03762",
content_text="The dominant sequence transduction models...",
source="ArXiv",
timestamp=datetime.now()
)
mock_response_json = {
"response": '{"relevance_score": 10, "summary_ru": "Революционная архитектура Transformer.", "anomalies_detected": ["SOTA"], "category": "Academic/SOTA"}'
}
provider = OllamaProvider()
with patch('aiohttp.ClientSession', return_value=create_mock_session(mock_response_json)):
result = await provider.analyze(academic_item)
assert result.relevance_score == 10
assert result.category == "Academic/SOTA"
assert "Transformer" in result.summary_ru
def test_ollama_provider_get_info():
os.environ['OLLAMA_API_URL'] = 'http://test-url:11434'
os.environ['OLLAMA_MODEL'] = 'test-model'

View File

@ -1,261 +1,285 @@
import pytest
import pytest_asyncio
import asyncio
import uuid
from datetime import datetime, timezone
import chromadb
from chromadb.config import Settings
from unittest.mock import MagicMock, patch
from typing import Dict, Any
from src.processor.dto import EnrichedNewsItemDTO
from src.storage.chroma_store import ChromaStore
@pytest_asyncio.fixture
async def chroma_store():
# Use EphemeralClient for in-memory testing
client = chromadb.EphemeralClient(Settings(allow_reset=True))
client.reset()
store = ChromaStore(client=client, collection_name="test_collection")
yield store
client.reset()
@pytest.fixture
def mock_client():
return MagicMock()
@pytest.fixture
def mock_collection():
return MagicMock()
@pytest.fixture
def chroma_store(mock_client, mock_collection):
mock_client.get_or_create_collection.return_value = mock_collection
return ChromaStore(client=mock_client, collection_name="test_collection")
@pytest.mark.asyncio
async def test_store_and_search(chroma_store: ChromaStore):
# 1. Arrange
item1 = EnrichedNewsItemDTO(
title="Apple announces new M4 chip",
url="https://example.com/apple-m4",
content_text="Apple has announced its newest M4 chip for next generation Macs. This processor brings massive AI improvements.",
source="TechNews",
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=9,
summary_ru="Apple анонсировала новый чип M4.",
anomalies_detected=["NPU acceleration"],
category="Competitors"
)
item2 = EnrichedNewsItemDTO(
title="Local bakery makes giant bread",
url="https://example.com/giant-bread",
content_text="A bakery in town just baked the world's largest loaf of bread, weighing over 1000 pounds.",
source="LocalNews",
timestamp=datetime(2023, 11, 2, 10, 0, tzinfo=timezone.utc),
relevance_score=2,
summary_ru="Местная пекарня испекла гигантский хлеб.",
anomalies_detected=[],
category="Other"
)
item3 = EnrichedNewsItemDTO(
title="NVIDIA reveals RTX 5090 with WebGPU support",
url="https://example.com/nvidia-rtx-5090",
content_text="NVIDIA's new RTX 5090 GPU fully accelerates WebGPU workloads for advanced edge AI applications.",
source="GPUWeekly",
timestamp=datetime(2023, 11, 3, 14, 0, tzinfo=timezone.utc),
relevance_score=10,
summary_ru="NVIDIA представила RTX 5090 с поддержкой WebGPU.",
anomalies_detected=["WebGPU", "Edge AI"],
category="Edge AI"
)
# 2. Act
await chroma_store.store(item1)
await chroma_store.store(item2)
await chroma_store.store(item3)
# Search for AI and chip related news
search_results = await chroma_store.search("AI processor and GPU", limit=2)
# 3. Assert
assert len(search_results) == 2
# Expected: The Apple M4 chip and NVIDIA RTX 5090 are highly relevant to AI/GPU
titles = [res.title for res in search_results]
assert "NVIDIA reveals RTX 5090 with WebGPU support" in titles
assert "Apple announces new M4 chip" in titles
assert "Local bakery makes giant bread" not in titles
# Check if properties are correctly restored for one of the items
for res in search_results:
if "NVIDIA" in res.title:
assert res.relevance_score == 10
assert "WebGPU" in res.anomalies_detected
assert "Edge AI" in res.anomalies_detected
assert "NVIDIA's new RTX 5090" in res.content_text
assert res.source == "GPUWeekly"
assert res.category == "Edge AI"
@pytest.mark.asyncio
async def test_search_empty_store(chroma_store: ChromaStore):
results = await chroma_store.search("test query", limit=5)
assert len(results) == 0
@pytest.mark.asyncio
async def test_store_upsert(chroma_store: ChromaStore):
item1 = EnrichedNewsItemDTO(
title="Apple announces new M4 chip",
url="https://example.com/apple-m4",
content_text="Apple has announced its newest M4 chip for next generation Macs.",
source="TechNews",
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=9,
summary_ru="Apple анонсировала новый чип M4.",
anomalies_detected=["NPU acceleration"],
category="Competitors"
)
# Store first time
await chroma_store.store(item1)
results = await chroma_store.search("Apple", limit=5)
assert len(results) == 1
assert results[0].relevance_score == 9
# Modify item and store again (same URL, should upsert)
item1_updated = item1.model_copy()
item1_updated.relevance_score = 10
item1_updated.summary_ru = "Apple анонсировала чип M4. Обновлено."
await chroma_store.store(item1_updated)
results_updated = await chroma_store.search("Apple", limit=5)
# Should still be 1 item, but updated
assert len(results_updated) == 1
assert results_updated[0].relevance_score == 10
assert results_updated[0].summary_ru == "Apple анонсировала чип M4. Обновлено."
@pytest.mark.asyncio
async def test_exists(chroma_store: ChromaStore):
url = "https://example.com/unique-news-123"
# Check that it doesn't exist initially
assert not await chroma_store.exists(url)
async def test_store(chroma_store, mock_collection):
# Arrange
item = EnrichedNewsItemDTO(
title="Test Title",
url=url,
content_text="Test content",
source="TestSource",
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=5,
url="https://example.com/test",
content_text="Test Content",
source="Test Source",
timestamp=datetime(2023, 1, 1, tzinfo=timezone.utc),
relevance_score=8,
summary_ru="Тест",
anomalies_detected=[],
category="Other"
category="Tech",
anomalies_detected=["A1", "A2"]
)
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
# Act
await chroma_store.store(item)
# Check that it exists now
assert await chroma_store.exists(url)
# Assert
mock_collection.upsert.assert_called_once()
args, kwargs = mock_collection.upsert.call_args
assert kwargs['ids'] == [doc_id]
assert kwargs['documents'] == ["Test Content"]
assert kwargs['metadatas'][0]['title'] == "Test Title"
assert kwargs['metadatas'][0]['category'] == "Tech"
assert kwargs['metadatas'][0]['anomalies_detected'] == "A1,A2"
@pytest.mark.asyncio
async def test_get_by_id(chroma_store: ChromaStore):
# 1. Arrange
url = "https://example.com/get-by-id-test"
async def test_get_by_id_found(chroma_store, mock_collection):
# Arrange
item_id = "some-id"
mock_collection.get.return_value = {
"metadatas": [{
"title": "Title",
"url": "https://url.com",
"source": "Source",
"timestamp": "2023-01-01T00:00:00",
"relevance_score": 5.0,
"summary_ru": "Сводка",
"category": "Cat",
"anomalies_detected": "A1"
}],
"documents": ["Content"]
}
# Act
result = await chroma_store.get_by_id(item_id)
# Assert
assert result is not None
assert result.title == "Title"
assert result.content_text == "Content"
assert result.anomalies_detected == ["A1"]
mock_collection.get.assert_called_once_with(ids=[item_id])
@pytest.mark.asyncio
async def test_get_by_id_not_found(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {"metadatas": [], "documents": []}
# Act
result = await chroma_store.get_by_id("none")
# Assert
assert result is None
@pytest.mark.asyncio
async def test_exists(chroma_store, mock_collection):
# Arrange
url = "https://example.com"
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, url))
item = EnrichedNewsItemDTO(
title="ID Test Title",
url=url,
content_text="ID Test Content",
source="IDTestSource",
timestamp=datetime(2023, 11, 1, 12, 0, tzinfo=timezone.utc),
relevance_score=7,
summary_ru="Тест по ID",
anomalies_detected=["TestAnomaly"],
category="Testing"
)
# 2. Act
await chroma_store.store(item)
# Try to retrieve by ID
retrieved_item = await chroma_store.get_by_id(doc_id)
# Try to retrieve non-existent ID
none_item = await chroma_store.get_by_id("non-existent-id")
# 3. Assert
assert retrieved_item is not None
assert retrieved_item.title == "ID Test Title"
assert retrieved_item.url == url
assert retrieved_item.relevance_score == 7
assert "TestAnomaly" in retrieved_item.anomalies_detected
assert retrieved_item.category == "Testing"
assert none_item is None
mock_collection.get.return_value = {"ids": [doc_id]}
# Act
exists = await chroma_store.exists(url)
# Assert
assert exists is True
mock_collection.get.assert_called_once_with(ids=[doc_id])
@pytest.mark.asyncio
async def test_get_stats(chroma_store: ChromaStore):
# 1. Arrange
item1 = EnrichedNewsItemDTO(
title="Title 1",
url="https://example.com/1",
content_text="Content 1",
source="Source 1",
timestamp=datetime.now(timezone.utc),
relevance_score=5,
summary_ru="Сводка 1",
anomalies_detected=[],
category="Tech"
)
item2 = EnrichedNewsItemDTO(
title="Title 2",
url="https://example.com/2",
content_text="Content 2",
source="Source 2",
timestamp=datetime.now(timezone.utc),
relevance_score=5,
summary_ru="Сводка 2",
anomalies_detected=[],
category="Tech"
)
item3 = EnrichedNewsItemDTO(
title="Title 3",
url="https://example.com/3",
content_text="Content 3",
source="Source 3",
timestamp=datetime.now(timezone.utc),
relevance_score=5,
summary_ru="Сводка 3",
anomalies_detected=[],
category="Science"
)
async def test_get_stats(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {
"metadatas": [
{"category": "Tech"},
{"category": "Tech"},
{"category": "Science"},
None,
{"other": "data"}
]
}
# 2. Act
await chroma_store.store(item1)
await chroma_store.store(item2)
await chroma_store.store(item3)
# Act
stats = await chroma_store.get_stats()
# 3. Assert
assert stats["total_count"] == 3
# Assert
assert stats["total_count"] == 5
assert stats["category_Tech"] == 2
assert stats["category_Science"] == 1
assert stats["category_Uncategorized"] == 1 # for the dict without category
@pytest.mark.asyncio
async def test_search_sorting(chroma_store: ChromaStore):
async def test_get_latest(chroma_store, mock_collection):
# Arrange
items = [
EnrichedNewsItemDTO(
title=f"Title {i}",
url=f"https://example.com/{i}",
content_text=f"Content {i}",
source="Source",
timestamp=datetime.now(timezone.utc),
relevance_score=i,
summary_ru=f"Сводка {i}",
anomalies_detected=[],
category="Tech"
) for i in range(1, 6) # Scores 1 to 5
]
for item in items:
await chroma_store.store(item)
mock_collection.get.return_value = {
"metadatas": [
{"title": "Old", "timestamp": "2023-01-01T00:00:00", "url": "u1", "relevance_score": 1},
{"title": "New", "timestamp": "2023-01-02T00:00:00", "url": "u2", "relevance_score": 1},
],
"documents": ["doc1", "doc2"]
}
# Act
results = await chroma_store.search("Content", limit=10)
results = await chroma_store.get_latest(limit=10, category="Tech")
# Assert
assert len(results) == 5
# Should be sorted 5, 4, 3, 2, 1
scores = [r.relevance_score for r in results]
assert scores == [5, 4, 3, 2, 1]
assert len(results) == 2
assert results[0].title == "New"
assert results[1].title == "Old"
mock_collection.get.assert_called_once_with(
include=["metadatas", "documents"],
where={"category": "Tech"}
)
@pytest.mark.asyncio
async def test_get_top_ranked(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {
"metadatas": [
{"title": "Low", "timestamp": "2023-01-01T00:00:00", "url": "u1", "relevance_score": 2},
{"title": "High", "timestamp": "2023-01-01T00:00:00", "url": "u2", "relevance_score": 10},
],
"documents": ["doc1", "doc2"]
}
# Act
results = await chroma_store.get_top_ranked(limit=1, category="Tech")
# Assert
assert len(results) == 1
assert results[0].title == "High"
mock_collection.get.assert_called_once_with(
include=["metadatas", "documents"],
where={"category": "Tech"}
)
@pytest.mark.asyncio
async def test_search_hybrid_exact_match_fills_limit(chroma_store, mock_collection):
# Arrange
query = "Apple"
mock_collection.get.return_value = {
"metadatas": [
{"title": "Apple M4", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10},
{"title": "Apple Vision", "url": "u2", "timestamp": "2023-01-01T00:00:00", "relevance_score": 9},
],
"documents": ["doc1", "doc2"]
}
# Act
results = await chroma_store.search(query, limit=2)
# Assert
assert len(results) == 2
assert results[0].title == "Apple M4"
assert results[1].title == "Apple Vision"
mock_collection.get.assert_called_once()
mock_collection.query.assert_not_called()
@pytest.mark.asyncio
async def test_search_hybrid_falls_back_to_semantic(chroma_store, mock_collection):
# Arrange
query = "Apple"
# Exact match finds 1 item
mock_collection.get.return_value = {
"metadatas": [{"title": "Apple M4", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10}],
"documents": ["doc1"]
}
# Semantic match finds more items, including the same one
mock_collection.query.return_value = {
"metadatas": [[
{"title": "Apple M4", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10},
{"title": "M3 Chip", "url": "u2", "timestamp": "2023-01-01T00:00:00", "relevance_score": 8},
]],
"documents": [["doc1", "doc2"]],
"distances": [[0.1, 0.5]]
}
# Act
results = await chroma_store.search(query, limit=2)
# Assert
assert len(results) == 2
assert results[0].title == "Apple M4"
assert results[1].title == "M3 Chip"
assert mock_collection.get.called
assert mock_collection.query.called
@pytest.mark.asyncio
async def test_search_with_category_and_threshold(chroma_store, mock_collection):
# Arrange
query = "AI"
mock_collection.get.return_value = {"metadatas": [], "documents": []}
mock_collection.query.return_value = {
"metadatas": [[
{"title": "Good match", "url": "u1", "timestamp": "2023-01-01T00:00:00", "relevance_score": 10},
{"title": "Bad match", "url": "u2", "timestamp": "2023-01-01T00:00:00", "relevance_score": 5},
]],
"documents": [["doc1", "doc2"]],
"distances": [[0.2, 0.8]]
}
# Act
results = await chroma_store.search(query, limit=5, category="Tech", threshold=0.5)
# Assert
assert len(results) == 1
assert results[0].title == "Good match"
mock_collection.get.assert_called_with(
where_document={"$contains": "AI"},
where={"category": "Tech"},
include=["metadatas", "documents"]
)
mock_collection.query.assert_called_with(
query_texts=["AI"],
n_results=5,
where={"category": "Tech"}
)
@pytest.mark.asyncio
async def test_search_exception_handling(chroma_store, mock_collection):
# Arrange
mock_collection.get.side_effect = Exception("Get failed")
mock_collection.query.side_effect = Exception("Query failed")
# Act
results = await chroma_store.search("query")
# Assert
assert results == [] # Should not crash
@pytest.mark.asyncio
async def test_search_empty_query(chroma_store, mock_collection):
# Arrange
mock_collection.get.return_value = {"metadatas": [], "documents": []}
mock_collection.query.return_value = {"metadatas": [[]], "documents": [[]], "distances": [[]]}
# Act
await chroma_store.search("")
# Assert
mock_collection.get.assert_called_with(
where_document=None,
where=None,
include=["metadatas", "documents"]
)
mock_collection.query.assert_called_with(
query_texts=["*"],
n_results=5,
where=None
)

View File

@ -0,0 +1,120 @@
import pytest
import pytest_asyncio
from unittest.mock import MagicMock
from datetime import datetime, timezone
from src.storage.chroma_store import ChromaStore
from src.processor.dto import EnrichedNewsItemDTO
@pytest.fixture
def mock_chroma_client():
client = MagicMock()
collection = MagicMock()
client.get_or_create_collection.return_value = collection
return client, collection
@pytest_asyncio.fixture
async def chroma_store(mock_chroma_client):
client, collection = mock_chroma_client
store = ChromaStore(client=client, collection_name="test_collection")
return store
@pytest.mark.asyncio
async def test_search_with_category_filter(chroma_store, mock_chroma_client):
client, collection = mock_chroma_client
# Mock return value for collection.query
collection.query.return_value = {
"ids": [["id1"]],
"metadatas": [[{
"title": "AI in Robotics",
"url": "https://example.com/robotics",
"source": "Tech",
"timestamp": datetime.now(timezone.utc).isoformat(),
"relevance_score": 8,
"summary_ru": "AI в робототехнике",
"category": "Robotics",
"anomalies_detected": ""
}]],
"documents": [["Full content here"]],
"distances": [[0.1]]
}
# We want to test that 'category' is passed as a 'where' clause to ChromaDB
# Note: We need to update the search method signature in the next step
results = await chroma_store.search(query="AI", limit=5, category="Robotics")
# Assert collection.query was called with correct 'where' filter
args, kwargs = collection.query.call_args
assert kwargs["where"] == {"category": "Robotics"}
assert len(results) == 1
assert results[0].category == "Robotics"
@pytest.mark.asyncio
async def test_search_with_relevance_threshold(chroma_store, mock_chroma_client):
client, collection = mock_chroma_client
# Mock return value: one relevant (low distance), one irrelevant (high distance)
collection.query.return_value = {
"ids": [["id-rel", "id-irrel"]],
"metadatas": [[
{
"title": "Relevant News",
"url": "url1",
"source": "s",
"timestamp": datetime.now(timezone.utc).isoformat(),
"relevance_score": 9,
"summary_ru": "Р",
"category": "C",
"anomalies_detected": ""
},
{
"title": "Irrelevant News",
"url": "url2",
"source": "s",
"timestamp": datetime.now(timezone.utc).isoformat(),
"relevance_score": 1,
"summary_ru": "И",
"category": "C",
"anomalies_detected": ""
}
]],
"documents": [["doc1", "doc2"]],
"distances": [[0.2, 0.8]] # Lower distance means more similar
}
# threshold=0.5 means distances <= 0.5 are kept
results = await chroma_store.search(query="test", limit=10, threshold=0.5)
assert len(results) == 1
assert results[0].title == "Relevant News"
@pytest.mark.asyncio
async def test_get_latest_semantic_threshold(chroma_store, mock_chroma_client):
"""
Test that /latest uses semantic search if a category is provided,
but also respects the threshold even for plain searches.
"""
client, collection = mock_chroma_client
collection.query.return_value = {
"ids": [["id1"]],
"metadatas": [[{
"title": "Latest News",
"url": "url",
"source": "s",
"timestamp": datetime.now(timezone.utc).isoformat(),
"relevance_score": 5,
"summary_ru": "L",
"category": "Tech",
"anomalies_detected": ""
}]],
"documents": [["doc"]],
"distances": [[0.05]]
}
# If category is provided, we should use category filter
results = await chroma_store.search(query="", limit=10, category="Tech")
args, kwargs = collection.query.call_args
assert kwargs["where"] == {"category": "Tech"}
assert len(results) == 1

View File

@ -0,0 +1,95 @@
import pytest
import pytest_asyncio
from datetime import datetime, timezone
import chromadb
from chromadb.config import Settings
from src.processor.dto import EnrichedNewsItemDTO
from src.storage.chroma_store import ChromaStore
@pytest_asyncio.fixture
async def chroma_store():
# Use EphemeralClient for in-memory testing
client = chromadb.EphemeralClient(Settings(allow_reset=True))
client.reset()
store = ChromaStore(client=client, collection_name="test_top_ranked_collection")
yield store
client.reset()
@pytest.mark.asyncio
async def test_get_top_ranked_sorting(chroma_store: ChromaStore):
"""
Test that get_top_ranked returns items sorted by relevance_score in descending order.
"""
# 1. Arrange - create items with various relevance scores
items = [
EnrichedNewsItemDTO(
title=f"News {score}",
url=f"https://example.com/{score}",
content_text=f"Content for news with score {score}",
source="Source",
timestamp=datetime.now(timezone.utc),
relevance_score=score,
summary_ru=f"Сводка {score}",
anomalies_detected=[],
category="Tech"
) for score in [5, 10, 2, 8, 1]
]
for item in items:
await chroma_store.store(item)
# 2. Act
results = await chroma_store.get_top_ranked(limit=10)
# 3. Assert
assert len(results) == 5
scores = [r.relevance_score for r in results]
# Should be [10, 8, 5, 2, 1]
assert scores == [10, 8, 5, 2, 1]
assert results[0].title == "News 10"
assert results[-1].title == "News 1"
@pytest.mark.asyncio
async def test_get_top_ranked_limit(chroma_store: ChromaStore):
"""
Test that get_top_ranked respects the limit parameter.
"""
# 1. Arrange
items = [
EnrichedNewsItemDTO(
title=f"News {i}",
url=f"https://example.com/{i}",
content_text=f"Content {i}",
source="Source",
timestamp=datetime.now(timezone.utc),
relevance_score=i,
summary_ru=f"Сводка {i}",
anomalies_detected=[],
category="Tech"
) for i in range(1, 11) # 10 items
]
for item in items:
await chroma_store.store(item)
# 2. Act
limit_5 = await chroma_store.get_top_ranked(limit=5)
limit_2 = await chroma_store.get_top_ranked(limit=2)
# 3. Assert
assert len(limit_5) == 5
assert len(limit_2) == 2
assert limit_5[0].relevance_score == 10
assert limit_5[4].relevance_score == 6
@pytest.mark.asyncio
async def test_get_top_ranked_empty_store(chroma_store: ChromaStore):
"""
Test that get_top_ranked returns an empty list if store is empty.
"""
# 1. Act
results = await chroma_store.get_top_ranked(limit=10)
# 2. Assert
assert results == []

View File

@ -0,0 +1,68 @@
import pytest
import chromadb
from unittest.mock import AsyncMock, patch
from src.crawlers.cppconf_crawler import CppConfCrawler
from src.processor.ollama_provider import OllamaProvider
from src.storage.chroma_store import ChromaStore
@pytest.fixture
def cppconf_html():
with open("tests/fixtures/cppconf/talks.html", "r", encoding="utf-8") as f:
return f.read()
@pytest.mark.asyncio
async def test_cppconf_e2e_pipeline(cppconf_html):
# 1. Mock Crawler fetch
crawler = CppConfCrawler(url="https://cppconf.ru/en/talks/", source="C++ Russia")
with patch("aiohttp.ClientSession.get") as mock_get:
mock_response = AsyncMock()
mock_response.status = 200
mock_response.text.return_value = cppconf_html
mock_get.return_value.__aenter__.return_value = mock_response
talks = await crawler.fetch_latest()
assert len(talks) > 0
talk = talks[0]
assert talk.source == "C++ Russia"
assert "https://cppconf.ru/en/talks/" in talk.url
# 2. Mock AI Processor
provider = OllamaProvider()
mock_llm_response = {
"relevance_score": 9,
"summary_ru": "Этот доклад обсуждает новые фичи C++26 и их влияние на производительность. Показаны примеры использования концептов и корутин.",
"anomalies_detected": ["Сравнение производительности с Rust"],
"category": "C++ Trends"
}
with patch("aiohttp.ClientSession.post") as mock_post:
mock_llm_post_response = AsyncMock()
mock_llm_post_response.raise_for_status = AsyncMock()
import json
mock_llm_post_response.json.return_value = {"response": json.dumps(mock_llm_response)}
mock_post.return_value.__aenter__.return_value = mock_llm_post_response
enriched_talk = await provider.analyze(talk)
assert enriched_talk.relevance_score == 9
assert "Rust" in enriched_talk.anomalies_detected[0]
assert enriched_talk.category == "C++ Trends"
# 3. Vector DB Store
client = chromadb.Client()
store = ChromaStore(client=client, collection_name="test_cppconf_collection")
await store.store(enriched_talk)
# Verify it exists
exists = await store.exists(enriched_talk.url)
assert exists is True
# Search
results = await store.search("C++26 features", limit=1)
assert len(results) == 1
assert results[0].relevance_score == 9
assert results[0].url == enriched_talk.url

109
tests/test_crawlers.py Normal file
View File

@ -0,0 +1,109 @@
import pytest
import os
from unittest.mock import MagicMock, AsyncMock, patch
from src.crawlers.factory import CrawlerFactory
from src.crawlers.rss_crawler import RSSCrawler
from src.crawlers.playwright_crawler import PlaywrightCrawler
from src.crawlers.dto import NewsItemDTO
def test_crawler_factory_load_real_file():
# Ensure the file exists
assert os.path.exists("src/crawlers.yml")
crawlers = CrawlerFactory.load_from_yaml("src/crawlers.yml")
assert len(crawlers) > 0
# Check if we have both types
types = [type(c) for c in crawlers]
assert RSSCrawler in types
assert PlaywrightCrawler in types
@pytest.mark.asyncio
async def test_rss_crawler_fetch_latest():
rss_url = "https://example.com/rss"
source = "Test Source"
crawler = RSSCrawler(rss_url, source)
mock_xml = """<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>RSS Title</title>
<item>
<title>Test News</title>
<link>https://example.com/news1</link>
<description>Test Description</description>
<pubDate>Mon, 01 Jan 2024 00:00:00 +0000</pubDate>
</item>
</channel>
</rss>
"""
with patch("aiohttp.ClientSession.get") as mock_get:
mock_response = AsyncMock()
mock_response.status = 200
mock_response.text.return_value = mock_xml
mock_response.raise_for_status = MagicMock()
mock_get.return_value.__aenter__.return_value = mock_response
items = await crawler.fetch_latest()
assert len(items) == 1
assert items[0].title == "Test News"
assert items[0].url == "https://example.com/news1"
assert items[0].source == source
@pytest.mark.asyncio
async def test_playwright_crawler_fetch_latest():
url = "https://example.com/news"
source = "Test Playwright"
selector = ".news-item"
crawler = PlaywrightCrawler(url, source, selector)
with patch("src.crawlers.playwright_crawler.async_playwright") as mock_playwright:
mock_p = AsyncMock()
mock_playwright.return_value.__aenter__.return_value = mock_p
mock_browser = AsyncMock()
mock_p.chromium.launch.return_value = mock_browser
mock_page = AsyncMock()
mock_browser.new_page.return_value = mock_page
mock_element = AsyncMock()
mock_element.evaluate.return_value = False # Not an 'a' tag
mock_link = AsyncMock()
mock_link.inner_text.return_value = "Test News"
mock_link.get_attribute.return_value = "/news1"
mock_element.query_selector.return_value = mock_link
mock_page.query_selector_all.return_value = [mock_element]
items = await crawler.fetch_latest()
assert len(items) == 1
assert items[0].title == "Test News"
assert items[0].url == "https://example.com/news1"
assert items[0].source == source
def test_crawler_factory_invalid_config(tmp_path):
config_file = tmp_path / "invalid_crawlers.yml"
config_file.write_text("""
crawlers:
- type: unknown
url: "https://example.com"
source: "Unknown"
- type: rss
url: "https://example.com"
# missing source
- not_a_dict
""")
crawlers = CrawlerFactory.load_from_yaml(str(config_file))
assert len(crawlers) == 0
def test_crawler_factory_empty_file(tmp_path):
config_file = tmp_path / "empty.yml"
config_file.write_text("")
crawlers = CrawlerFactory.load_from_yaml(str(config_file))
assert len(crawlers) == 0

7
update_chroma_store.py Normal file
View File

@ -0,0 +1,7 @@
import re
with open("src/storage/chroma_store.py", "r") as f:
content = f.read()
# I will rewrite the class completely because there are many changes to make.