Compare commits
No commits in common. "a49df98191562fd2e320fb3952501f3a336a0b46" and "ef3faec7f89fc061745c3f22835c8138420ed1da" have entirely different histories.
a49df98191
...
ef3faec7f8
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,4 @@
|
||||
from aiogram import Bot, Dispatcher
|
||||
from aiohttp_socks import ProxyConnector
|
||||
from aiogram.client.default import DefaultBotProperties
|
||||
from src.bot.handlers import get_router
|
||||
from src.storage.base import IVectorStore
|
||||
@ -9,10 +8,7 @@ def setup_bot(token: str, storage: IVectorStore, processor: ILLMProvider, allowe
|
||||
"""
|
||||
Setup the aiogram Bot and Dispatcher with handlers.
|
||||
"""
|
||||
connector = ProxyConnector.from_url("socks5://127.0.0.1:1080", rdns=True)
|
||||
bot = Bot(token=token,
|
||||
default=DefaultBotProperties(parse_mode="HTML"),
|
||||
connector=connector)
|
||||
bot = Bot(token=token, default=DefaultBotProperties(parse_mode="HTML"))
|
||||
dp = Dispatcher()
|
||||
dp.include_router(get_router(storage, processor, allowed_chat_id))
|
||||
return bot, dp
|
||||
|
||||
@ -1,43 +0,0 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class AnomalyType(str, Enum):
|
||||
WEBGPU = "WebGPU"
|
||||
NPU_ACCELERATION = "NPU acceleration"
|
||||
EDGE_AI = "Edge AI"
|
||||
QUANTUM_COMPUTING = "Quantum computing"
|
||||
NEUROMORPHIC = "Neuromorphic computing"
|
||||
SPATIAL_COMPUTING = "Spatial computing"
|
||||
UNKNOWN = "Unknown"
|
||||
|
||||
@classmethod
|
||||
def from_string(cls, value: str) -> "AnomalyType":
|
||||
normalized = value.strip().lower()
|
||||
for member in cls:
|
||||
if member.value.lower() == normalized:
|
||||
return member
|
||||
for member in cls:
|
||||
if normalized in member.value.lower():
|
||||
return member
|
||||
return cls.UNKNOWN
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
descriptions = {
|
||||
"WebGPU": "WebGPU graphics API or GPU compute",
|
||||
"NPU acceleration": "Neural Processing Unit hardware",
|
||||
"Edge AI": "Edge computing with AI",
|
||||
"Quantum computing": "Quantum computing technology",
|
||||
"Neuromorphic computing": "Neuromorphic processor architecture",
|
||||
"Spatial computing": "Spatial computing and AR/VR",
|
||||
"Unknown": "Unrecognized anomaly type",
|
||||
}
|
||||
return descriptions.get(self.value, "")
|
||||
|
||||
|
||||
def normalize_anomaly_list(anomalies: list[str]) -> list[AnomalyType]:
|
||||
return [AnomalyType.from_string(a) for a in anomalies]
|
||||
|
||||
|
||||
def is_valid_anomaly(value: str) -> bool:
|
||||
return AnomalyType.from_string(value) != AnomalyType.UNKNOWN
|
||||
@ -1,12 +1,8 @@
|
||||
import uuid
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Dict
|
||||
from typing import List, Optional, Mapping, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from src.storage.sqlite_store import SQLiteStore
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime
|
||||
|
||||
import chromadb
|
||||
from chromadb.api import ClientAPI
|
||||
@ -17,15 +13,9 @@ from src.processor.dto import EnrichedNewsItemDTO
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ChromaStore(IVectorStore):
|
||||
def __init__(
|
||||
self,
|
||||
client: ClientAPI,
|
||||
collection_name: str = "news_collection",
|
||||
sqlite_store: Optional["SQLiteStore"] = None
|
||||
):
|
||||
def __init__(self, client: ClientAPI, collection_name: str = "news_collection"):
|
||||
self.client = client
|
||||
self.collection = self.client.get_or_create_collection(name=collection_name)
|
||||
self.sqlite_store = sqlite_store
|
||||
|
||||
async def store(self, item: EnrichedNewsItemDTO) -> None:
|
||||
# Create a deterministic UUID based on the URL
|
||||
@ -148,45 +138,12 @@ class ChromaStore(IVectorStore):
|
||||
anomalies_detected=anomalies
|
||||
)
|
||||
|
||||
def _dict_to_dto(self, item_dict: Dict[str, Any]) -> EnrichedNewsItemDTO:
|
||||
"""Convert a dict (from SQLite row) back to EnrichedNewsItemDTO."""
|
||||
anomalies = item_dict.get("anomalies_detected", []) or []
|
||||
if isinstance(anomalies, str):
|
||||
anomalies = [a.strip() for a in anomalies.split(",") if a.strip()]
|
||||
|
||||
timestamp = item_dict.get("timestamp")
|
||||
if isinstance(timestamp, (int, float)):
|
||||
timestamp = datetime.fromtimestamp(timestamp, tz=timezone.utc)
|
||||
elif isinstance(timestamp, str):
|
||||
timestamp = datetime.fromisoformat(timestamp)
|
||||
else:
|
||||
timestamp = datetime.now(timezone.utc)
|
||||
|
||||
return EnrichedNewsItemDTO(
|
||||
title=str(item_dict.get("title", "")),
|
||||
url=str(item_dict.get("url", "")),
|
||||
content_text=str(item_dict.get("content_text", "")),
|
||||
source=str(item_dict.get("source", "")),
|
||||
timestamp=timestamp,
|
||||
relevance_score=int(float(str(item_dict.get("relevance_score", 0)))),
|
||||
summary_ru=str(item_dict.get("summary_ru", "")),
|
||||
category=str(item_dict.get("category", "")),
|
||||
anomalies_detected=anomalies if isinstance(anomalies, list) else []
|
||||
)
|
||||
|
||||
async def exists(self, url: str) -> bool:
|
||||
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, url))
|
||||
result = await asyncio.to_thread(self.collection.get, ids=[doc_id])
|
||||
return len(result.get("ids", [])) > 0
|
||||
|
||||
async def get_stats(self) -> dict[str, int]:
|
||||
if self.sqlite_store is not None:
|
||||
stats = await self.sqlite_store.get_stats(use_cache=True)
|
||||
return {
|
||||
"total_count": stats["total_count"],
|
||||
**{f"category_{k}": v for k, v in stats["category_counts"].items()}
|
||||
}
|
||||
|
||||
results = await asyncio.to_thread(self.collection.get, include=["metadatas"])
|
||||
metadatas = results.get("metadatas")
|
||||
if metadatas is None:
|
||||
@ -206,10 +163,6 @@ class ChromaStore(IVectorStore):
|
||||
return stats
|
||||
|
||||
async def get_latest(self, limit: int = 10, category: Optional[str] = None) -> List[EnrichedNewsItemDTO]:
|
||||
if self.sqlite_store is not None:
|
||||
items_dict = await self.sqlite_store.get_latest(limit=limit, category=category)
|
||||
return [self._dict_to_dto(item) for item in items_dict]
|
||||
|
||||
where: Any = {"category": category} if category else None
|
||||
results = await asyncio.to_thread(
|
||||
self.collection.get,
|
||||
@ -233,10 +186,6 @@ class ChromaStore(IVectorStore):
|
||||
return items[:limit]
|
||||
|
||||
async def get_top_ranked(self, limit: int = 10, category: Optional[str] = None) -> List[EnrichedNewsItemDTO]:
|
||||
if self.sqlite_store is not None:
|
||||
items_dict = await self.sqlite_store.get_top_ranked(limit=limit, category=category)
|
||||
return [self._dict_to_dto(item) for item in items_dict]
|
||||
|
||||
where: Any = {"category": category} if category else None
|
||||
results = await asyncio.to_thread(
|
||||
self.collection.get,
|
||||
|
||||
@ -1,305 +0,0 @@
|
||||
import sqlite3
|
||||
import json
|
||||
import asyncio
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timezone
|
||||
from dataclasses import dataclass
|
||||
|
||||
from src.processor.dto import EnrichedNewsItemDTO
|
||||
from src.processor.anomaly_types import AnomalyType, normalize_anomaly_list
|
||||
|
||||
|
||||
@dataclass
|
||||
class StatsCache:
|
||||
total_count: int = 0
|
||||
category_counts: Optional[Dict[str, int]] = None
|
||||
source_counts: Optional[Dict[str, int]] = None
|
||||
anomaly_counts: Optional[Dict[str, int]] = None
|
||||
last_updated: Optional[datetime] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.category_counts is None:
|
||||
self.category_counts = {}
|
||||
if self.source_counts is None:
|
||||
self.source_counts = {}
|
||||
if self.anomaly_counts is None:
|
||||
self.anomaly_counts = {}
|
||||
|
||||
|
||||
class SQLiteStore:
|
||||
def __init__(self, db_path: Path):
|
||||
self.db_path = db_path
|
||||
self._init_schema()
|
||||
self._stats_cache: Optional[StatsCache] = None
|
||||
|
||||
def _init_schema(self):
|
||||
with self._get_connection() as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA synchronous=FULL")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS news_items (
|
||||
id TEXT PRIMARY KEY,
|
||||
title TEXT NOT NULL,
|
||||
url TEXT UNIQUE NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
timestamp INTEGER NOT NULL,
|
||||
relevance_score INTEGER NOT NULL CHECK (relevance_score >= 0 AND relevance_score <= 10),
|
||||
summary_ru TEXT,
|
||||
category TEXT NOT NULL DEFAULT '',
|
||||
content_text TEXT,
|
||||
created_at INTEGER DEFAULT (unixepoch())
|
||||
)
|
||||
""")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_news_timestamp ON news_items(timestamp DESC)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_news_relevance ON news_items(relevance_score DESC)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_news_category ON news_items(category)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_news_source ON news_items(source)")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS anomaly_types (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT UNIQUE NOT NULL,
|
||||
description TEXT
|
||||
)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS news_anomalies (
|
||||
news_id TEXT NOT NULL,
|
||||
anomaly_id TEXT NOT NULL,
|
||||
detected_at INTEGER DEFAULT (unixepoch()),
|
||||
PRIMARY KEY (news_id, anomaly_id),
|
||||
FOREIGN KEY (news_id) REFERENCES news_items(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (anomaly_id) REFERENCES anomaly_types(id)
|
||||
)
|
||||
""")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_news_anomalies_news ON news_anomalies(news_id)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_news_anomalies_anomaly ON news_anomalies(anomaly_id)")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS stats_cache (
|
||||
key TEXT PRIMARY KEY DEFAULT 'main',
|
||||
total_count INTEGER DEFAULT 0,
|
||||
category_counts TEXT DEFAULT '{}',
|
||||
source_counts TEXT DEFAULT '{}',
|
||||
anomaly_counts TEXT DEFAULT '{}',
|
||||
last_updated INTEGER
|
||||
)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS schema_version (
|
||||
version INTEGER PRIMARY KEY,
|
||||
applied_at INTEGER DEFAULT (unixepoch())
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
@contextmanager
|
||||
def _get_connection(self):
|
||||
conn = sqlite3.connect(self.db_path, timeout=30.0)
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _row_to_dict(self, row: sqlite3.Row) -> Dict[str, Any]:
|
||||
return dict(row)
|
||||
|
||||
async def store_with_anomalies(self, item: EnrichedNewsItemDTO, anomaly_types: List[AnomalyType]) -> str:
|
||||
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
|
||||
timestamp_int = int(item.timestamp.timestamp())
|
||||
|
||||
with self._get_connection() as conn:
|
||||
try:
|
||||
conn.execute("BEGIN IMMEDIATE")
|
||||
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO news_items
|
||||
(id, title, url, source, timestamp, relevance_score, summary_ru, category, content_text)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
doc_id,
|
||||
item.title,
|
||||
item.url,
|
||||
item.source,
|
||||
timestamp_int,
|
||||
item.relevance_score,
|
||||
item.summary_ru,
|
||||
item.category,
|
||||
item.content_text
|
||||
))
|
||||
|
||||
for anomaly in anomaly_types:
|
||||
anomaly_id = str(uuid.uuid5(uuid.NAMESPACE_URL, anomaly.value))
|
||||
conn.execute("""
|
||||
INSERT OR IGNORE INTO anomaly_types (id, name, description)
|
||||
VALUES (?, ?, ?)
|
||||
""", (anomaly_id, anomaly.value, anomaly.description))
|
||||
|
||||
conn.execute("""
|
||||
INSERT OR IGNORE INTO news_anomalies (news_id, anomaly_id)
|
||||
VALUES (?, ?)
|
||||
""", (doc_id, anomaly_id))
|
||||
|
||||
conn.execute("COMMIT")
|
||||
except Exception:
|
||||
conn.execute("ROLLBACK")
|
||||
raise
|
||||
|
||||
self._invalidate_cache()
|
||||
return doc_id
|
||||
|
||||
async def get_by_id(self, item_id: str) -> Optional[Dict[str, Any]]:
|
||||
with self._get_connection() as conn:
|
||||
row = conn.execute("""
|
||||
SELECT * FROM news_items WHERE id = ?
|
||||
""", (item_id,)).fetchone()
|
||||
|
||||
if not row:
|
||||
return None
|
||||
|
||||
result = self._row_to_dict(row)
|
||||
result["timestamp"] = datetime.fromtimestamp(result["timestamp"], tz=timezone.utc)
|
||||
|
||||
anomaly_rows = conn.execute("""
|
||||
SELECT a.name FROM news_anomalies na
|
||||
JOIN anomaly_types a ON na.anomaly_id = a.id
|
||||
WHERE na.news_id = ?
|
||||
""", (item_id,)).fetchall()
|
||||
result["anomalies_detected"] = [r["name"] for r in anomaly_rows]
|
||||
|
||||
return result
|
||||
|
||||
async def exists(self, url: str) -> bool:
|
||||
doc_id = str(uuid.uuid5(uuid.NAMESPACE_URL, url))
|
||||
with self._get_connection() as conn:
|
||||
row = conn.execute("SELECT 1 FROM news_items WHERE id = ?", (doc_id,)).fetchone()
|
||||
return row is not None
|
||||
|
||||
async def get_latest(self, limit: int = 10, category: Optional[str] = None, offset: int = 0) -> List[Dict[str, Any]]:
|
||||
with self._get_connection() as conn:
|
||||
if category:
|
||||
rows = conn.execute("""
|
||||
SELECT * FROM news_items
|
||||
WHERE category = ?
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ? OFFSET ?
|
||||
""", (category, limit, offset)).fetchall()
|
||||
else:
|
||||
rows = conn.execute("""
|
||||
SELECT * FROM news_items
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ? OFFSET ?
|
||||
""", (limit, offset)).fetchall()
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
item = self._row_to_dict(row)
|
||||
item["timestamp"] = datetime.fromtimestamp(item["timestamp"], tz=timezone.utc)
|
||||
results.append(item)
|
||||
return results
|
||||
|
||||
async def get_top_ranked(self, limit: int = 10, category: Optional[str] = None, offset: int = 0) -> List[Dict[str, Any]]:
|
||||
with self._get_connection() as conn:
|
||||
if category:
|
||||
rows = conn.execute("""
|
||||
SELECT * FROM news_items
|
||||
WHERE category = ?
|
||||
ORDER BY relevance_score DESC, timestamp DESC
|
||||
LIMIT ? OFFSET ?
|
||||
""", (category, limit, offset)).fetchall()
|
||||
else:
|
||||
rows = conn.execute("""
|
||||
SELECT * FROM news_items
|
||||
ORDER BY relevance_score DESC, timestamp DESC
|
||||
LIMIT ? OFFSET ?
|
||||
""", (limit, offset)).fetchall()
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
item = self._row_to_dict(row)
|
||||
item["timestamp"] = datetime.fromtimestamp(item["timestamp"], tz=timezone.utc)
|
||||
results.append(item)
|
||||
return results
|
||||
|
||||
async def get_stats(self, use_cache: bool = True) -> Dict[str, Any]:
|
||||
if use_cache and self._stats_cache is not None:
|
||||
return {
|
||||
"total_count": self._stats_cache.total_count,
|
||||
"category_counts": self._stats_cache.category_counts,
|
||||
"source_counts": self._stats_cache.source_counts,
|
||||
"anomaly_counts": self._stats_cache.anomaly_counts,
|
||||
"last_updated": self._stats_cache.last_updated
|
||||
}
|
||||
|
||||
with self._get_connection() as conn:
|
||||
total = conn.execute("SELECT COUNT(*) FROM news_items").fetchone()[0]
|
||||
|
||||
category_rows = conn.execute("""
|
||||
SELECT category, COUNT(*) as count FROM news_items GROUP BY category
|
||||
""").fetchall()
|
||||
category_counts = {r["category"]: r["count"] for r in category_rows}
|
||||
|
||||
source_rows = conn.execute("""
|
||||
SELECT source, COUNT(*) as count FROM news_items GROUP BY source
|
||||
""").fetchall()
|
||||
source_counts = {r["source"]: r["count"] for r in source_rows}
|
||||
|
||||
anomaly_rows = conn.execute("""
|
||||
SELECT a.name, COUNT(*) as count
|
||||
FROM news_anomalies na
|
||||
JOIN anomaly_types a ON na.anomaly_id = a.id
|
||||
GROUP BY a.name
|
||||
""").fetchall()
|
||||
anomaly_counts = {r["name"]: r["count"] for r in anomaly_rows}
|
||||
|
||||
result = {
|
||||
"total_count": total,
|
||||
"category_counts": category_counts,
|
||||
"source_counts": source_counts,
|
||||
"anomaly_counts": anomaly_counts,
|
||||
"last_updated": datetime.now(timezone.utc)
|
||||
}
|
||||
|
||||
self._stats_cache = StatsCache(
|
||||
total_count=total,
|
||||
category_counts=category_counts,
|
||||
source_counts=source_counts,
|
||||
anomaly_counts=anomaly_counts,
|
||||
last_updated=datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def _invalidate_cache(self):
|
||||
self._stats_cache = None
|
||||
|
||||
async def invalidate_cache(self):
|
||||
self._invalidate_cache()
|
||||
|
||||
async def count_all(self) -> int:
|
||||
with self._get_connection() as conn:
|
||||
return conn.execute("SELECT COUNT(*) FROM news_items").fetchone()[0]
|
||||
|
||||
async def get_all_items_for_migration(self, batch_size: int = 100) -> List[Dict[str, Any]]:
|
||||
with self._get_connection() as conn:
|
||||
rows = conn.execute("""
|
||||
SELECT * FROM news_items ORDER BY created_at
|
||||
""").fetchall()
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
item = self._row_to_dict(row)
|
||||
item["timestamp"] = datetime.fromtimestamp(item["timestamp"], tz=timezone.utc)
|
||||
|
||||
anomaly_rows = conn.execute("""
|
||||
SELECT a.name FROM news_anomalies na
|
||||
JOIN anomaly_types a ON na.anomaly_id = a.id
|
||||
WHERE na.news_id = ?
|
||||
""", (item["id"],)).fetchall()
|
||||
item["anomalies_detected"] = [r["name"] for r in anomaly_rows]
|
||||
results.append(item)
|
||||
|
||||
return results
|
||||
@ -10,7 +10,6 @@ from src.crawlers.microsoft_research_crawler import MicrosoftResearchCrawler
|
||||
from src.crawlers.static_crawler import StaticCrawler
|
||||
from src.crawlers.skolkovo_crawler import SkolkovoCrawler
|
||||
from src.crawlers.cppconf_crawler import CppConfCrawler
|
||||
from src.crawlers.github_crawler import GitHubTrendingCrawler
|
||||
|
||||
VALID_YAML = """
|
||||
crawlers:
|
||||
@ -126,7 +125,7 @@ def test_integration_load_actual_config():
|
||||
|
||||
# Verify types and mandatory fields for all loaded crawlers
|
||||
for crawler in crawlers:
|
||||
assert isinstance(crawler, (RSSCrawler, PlaywrightCrawler, StaticCrawler, SkolkovoCrawler, CppConfCrawler, SciRateCrawler, ScholarCrawler, MicrosoftResearchCrawler, GitHubTrendingCrawler))
|
||||
assert isinstance(crawler, (RSSCrawler, PlaywrightCrawler, StaticCrawler, SkolkovoCrawler, CppConfCrawler, SciRateCrawler, ScholarCrawler, MicrosoftResearchCrawler))
|
||||
if not isinstance(crawler, ScholarCrawler):
|
||||
assert crawler.url.startswith("http")
|
||||
assert crawler.source
|
||||
|
||||
@ -1,69 +1,27 @@
|
||||
import pytest
|
||||
import aiohttp
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import patch, MagicMock, AsyncMock
|
||||
from src.crawlers.static_crawler import StaticCrawler
|
||||
from src.crawlers.skolkovo_crawler import SkolkovoCrawler
|
||||
from src.crawlers.dto import NewsItemDTO
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_static_crawler_addmeto():
|
||||
html_content = """
|
||||
<div class="tgme_widget_message_text">
|
||||
<a href="https://t.me/addmeto/123">Message Link</a>
|
||||
Some content text about AI.
|
||||
</div>
|
||||
"""
|
||||
with patch("aiohttp.ClientSession.get") as mock_get:
|
||||
mock_response = MagicMock()
|
||||
mock_response.status = 200
|
||||
mock_response.text = AsyncMock(return_value=html_content)
|
||||
mock_response.__aenter__.return_value = mock_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
crawler = StaticCrawler(url="https://t.me/s/addmeto", source="Telegram: Addmeto", selector=".tgme_widget_message_text")
|
||||
items = await crawler.fetch_latest()
|
||||
assert len(items) > 0
|
||||
assert items[0].source == "Telegram: Addmeto"
|
||||
assert "t.me/addmeto/123" in items[0].url
|
||||
|
||||
crawler = StaticCrawler(url="https://t.me/s/addmeto", source="Telegram: Addmeto", selector=".tgme_widget_message_text")
|
||||
items = await crawler.fetch_latest()
|
||||
assert len(items) > 0
|
||||
assert items[0].source == "Telegram: Addmeto"
|
||||
@pytest.mark.asyncio
|
||||
async def test_static_crawler_rsf():
|
||||
html_content = """
|
||||
<div class="news-item">
|
||||
<a href="/en/news/123">RSF News Title</a>
|
||||
Description of news.
|
||||
</div>
|
||||
"""
|
||||
with patch("aiohttp.ClientSession.get") as mock_get:
|
||||
mock_response = MagicMock()
|
||||
mock_response.status = 200
|
||||
mock_response.text = AsyncMock(return_value=html_content)
|
||||
mock_response.__aenter__.return_value = mock_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
crawler = StaticCrawler(url="https://rscf.ru/en/news/", source="RSF", selector=".news-item")
|
||||
items = await crawler.fetch_latest()
|
||||
assert len(items) > 0
|
||||
assert items[0].source == "RSF"
|
||||
assert "rscf.ru/en/news/123" in items[0].url
|
||||
crawler = StaticCrawler(url="https://rscf.ru/en/news/", source="RSF", selector=".news-item")
|
||||
items = await crawler.fetch_latest()
|
||||
assert len(items) > 0
|
||||
assert items[0].source == "RSF"
|
||||
assert "rscf.ru" in items[0].url
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skolkovo_crawler():
|
||||
html_content = """
|
||||
<div class="news-list">
|
||||
<div class="item">
|
||||
<a href="/news/123">Skolkovo News</a>
|
||||
</div>
|
||||
</div>
|
||||
"""
|
||||
with patch("src.crawlers.playwright_crawler.PlaywrightCrawler.fetch_latest") as mock_fetch:
|
||||
mock_fetch.return_value = [
|
||||
NewsItemDTO(title="Skolkovo News", url="https://sk.ru/news/123", content_text="Text", source="Skolkovo", timestamp=datetime.now(timezone.utc))
|
||||
]
|
||||
crawler = SkolkovoCrawler(url="https://sk.ru/news/", source="Skolkovo")
|
||||
items = await crawler.fetch_latest()
|
||||
assert len(items) > 0
|
||||
assert items[0].source == "Skolkovo"
|
||||
assert "sk.ru" in items[0].url
|
||||
|
||||
crawler = SkolkovoCrawler(url="https://sk.ru/news/", source="Skolkovo")
|
||||
items = await crawler.fetch_latest()
|
||||
assert len(items) > 0
|
||||
assert items[0].source == "Skolkovo"
|
||||
assert "sk.ru" in items[0].url
|
||||
|
||||
@ -2,7 +2,7 @@ import pytest
|
||||
import asyncio
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock, patch, AsyncMock
|
||||
from unittest.mock import MagicMock, patch
|
||||
from typing import Dict, Any
|
||||
|
||||
from src.processor.dto import EnrichedNewsItemDTO
|
||||
@ -280,248 +280,3 @@ async def test_search_empty_query(chroma_store, mock_collection):
|
||||
n_results=5,
|
||||
where=None
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for SQLiteStore integration
|
||||
# =============================================================================
|
||||
|
||||
@pytest.fixture
|
||||
def mock_sqlite_store():
|
||||
return AsyncMock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def chroma_store_with_sqlite(mock_client, mock_collection, mock_sqlite_store):
|
||||
mock_client.get_or_create_collection.return_value = mock_collection
|
||||
return ChromaStore(
|
||||
client=mock_client,
|
||||
collection_name="test_collection",
|
||||
sqlite_store=mock_sqlite_store
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_latest_delegates_to_sqlite_store(chroma_store_with_sqlite, mock_sqlite_store):
|
||||
# Arrange
|
||||
ts = datetime(2024, 1, 15, tzinfo=timezone.utc)
|
||||
mock_sqlite_store.get_latest.return_value = [
|
||||
{"title": "Latest1", "url": "u1", "content_text": "c1", "source": "s1",
|
||||
"timestamp": ts, "relevance_score": 8, "summary_ru": "sum1",
|
||||
"category": "Tech", "anomalies_detected": ["A1"]},
|
||||
{"title": "Latest2", "url": "u2", "content_text": "c2", "source": "s2",
|
||||
"timestamp": ts, "relevance_score": 7, "summary_ru": "sum2",
|
||||
"category": "Tech", "anomalies_detected": []},
|
||||
]
|
||||
|
||||
# Act
|
||||
results = await chroma_store_with_sqlite.get_latest(limit=10, category="Tech")
|
||||
|
||||
# Assert
|
||||
mock_sqlite_store.get_latest.assert_called_once_with(limit=10, category="Tech")
|
||||
assert len(results) == 2
|
||||
assert results[0].title == "Latest1"
|
||||
assert results[0].relevance_score == 8
|
||||
assert results[0].anomalies_detected == ["A1"]
|
||||
assert results[1].title == "Latest2"
|
||||
assert results[1].anomalies_detected == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_latest_fallback_when_no_sqlite_store(chroma_store, mock_collection):
|
||||
# Arrange
|
||||
mock_collection.get.return_value = {
|
||||
"metadatas": [
|
||||
{"title": "Chroma Latest", "timestamp": "2024-01-01T00:00:00", "url": "u1",
|
||||
"relevance_score": 5, "source": "src", "category": "Tech"},
|
||||
],
|
||||
"documents": ["content"]
|
||||
}
|
||||
|
||||
# Act
|
||||
results = await chroma_store.get_latest(limit=10)
|
||||
|
||||
# Assert
|
||||
assert len(results) == 1
|
||||
assert results[0].title == "Chroma Latest"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_top_ranked_delegates_to_sqlite_store(chroma_store_with_sqlite, mock_sqlite_store):
|
||||
# Arrange
|
||||
ts = datetime(2024, 1, 15, tzinfo=timezone.utc)
|
||||
mock_sqlite_store.get_top_ranked.return_value = [
|
||||
{"title": "Top1", "url": "u1", "content_text": "c1", "source": "s1",
|
||||
"timestamp": ts, "relevance_score": 10, "summary_ru": "sum1",
|
||||
"category": "Tech", "anomalies_detected": []},
|
||||
{"title": "Top2", "url": "u2", "content_text": "c2", "source": "s2",
|
||||
"timestamp": ts, "relevance_score": 9, "summary_ru": "sum2",
|
||||
"category": "Tech", "anomalies_detected": ["A2"]},
|
||||
]
|
||||
|
||||
# Act
|
||||
results = await chroma_store_with_sqlite.get_top_ranked(limit=5)
|
||||
|
||||
# Assert
|
||||
mock_sqlite_store.get_top_ranked.assert_called_once_with(limit=5, category=None)
|
||||
assert len(results) == 2
|
||||
assert results[0].title == "Top1"
|
||||
assert results[0].relevance_score == 10
|
||||
assert results[1].title == "Top2"
|
||||
assert results[1].anomalies_detected == ["A2"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_top_ranked_fallback_when_no_sqlite_store(chroma_store, mock_collection):
|
||||
# Arrange
|
||||
mock_collection.get.return_value = {
|
||||
"metadatas": [
|
||||
{"title": "Chroma Top", "timestamp": "2024-01-01T00:00:00", "url": "u1",
|
||||
"relevance_score": 10, "source": "src", "category": "Tech"},
|
||||
],
|
||||
"documents": ["content"]
|
||||
}
|
||||
|
||||
# Act
|
||||
results = await chroma_store.get_top_ranked(limit=10)
|
||||
|
||||
# Assert
|
||||
assert len(results) == 1
|
||||
assert results[0].title == "Chroma Top"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_stats_delegates_to_sqlite_store(chroma_store_with_sqlite, mock_sqlite_store):
|
||||
# Arrange
|
||||
mock_sqlite_store.get_stats.return_value = {
|
||||
"total_count": 100,
|
||||
"category_counts": {"Tech": 60, "Science": 40},
|
||||
"source_counts": {"src1": 70, "src2": 30},
|
||||
"anomaly_counts": {"A1": 15, "A2": 5},
|
||||
"last_updated": datetime(2024, 1, 15, tzinfo=timezone.utc)
|
||||
}
|
||||
|
||||
# Act
|
||||
stats = await chroma_store_with_sqlite.get_stats()
|
||||
|
||||
# Assert
|
||||
mock_sqlite_store.get_stats.assert_called_once_with(use_cache=True)
|
||||
assert stats["total_count"] == 100
|
||||
assert stats["category_Tech"] == 60
|
||||
assert stats["category_Science"] == 40
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_stats_fallback_when_no_sqlite_store(chroma_store, mock_collection):
|
||||
# Arrange
|
||||
mock_collection.get.return_value = {
|
||||
"metadatas": [
|
||||
{"category": "Tech"},
|
||||
{"category": "Tech"},
|
||||
{"category": "Science"},
|
||||
]
|
||||
}
|
||||
|
||||
# Act
|
||||
stats = await chroma_store.get_stats()
|
||||
|
||||
# Assert
|
||||
assert stats["total_count"] == 3
|
||||
assert stats["category_Tech"] == 2
|
||||
assert stats["category_Science"] == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dict_to_dto_handles_integer_timestamp():
|
||||
# Arrange
|
||||
store = ChromaStore(client=MagicMock(), collection_name="test")
|
||||
item_dict = {
|
||||
"title": "Test",
|
||||
"url": "http://test.com",
|
||||
"content_text": "Content",
|
||||
"source": "Source",
|
||||
"timestamp": 1705312800, # Unix timestamp as int
|
||||
"relevance_score": 7,
|
||||
"summary_ru": "Summary",
|
||||
"category": "Tech",
|
||||
"anomalies_detected": ["A1", "A2"]
|
||||
}
|
||||
|
||||
# Act
|
||||
dto = store._dict_to_dto(item_dict)
|
||||
|
||||
# Assert
|
||||
assert dto.timestamp.year == 2024
|
||||
assert dto.timestamp.month == 1
|
||||
assert dto.anomalies_detected == ["A1", "A2"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dict_to_dto_handles_string_timestamp():
|
||||
# Arrange
|
||||
store = ChromaStore(client=MagicMock(), collection_name="test")
|
||||
item_dict = {
|
||||
"title": "Test",
|
||||
"url": "http://test.com",
|
||||
"content_text": "Content",
|
||||
"source": "Source",
|
||||
"timestamp": "2024-01-15T12:00:00",
|
||||
"relevance_score": 7,
|
||||
"summary_ru": "Summary",
|
||||
"category": "Tech",
|
||||
"anomalies_detected": []
|
||||
}
|
||||
|
||||
# Act
|
||||
dto = store._dict_to_dto(item_dict)
|
||||
|
||||
# Assert
|
||||
assert dto.timestamp.year == 2024
|
||||
assert dto.timestamp.month == 1
|
||||
assert dto.timestamp.day == 15
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dict_to_dto_handles_string_anomalies():
|
||||
# Arrange
|
||||
store = ChromaStore(client=MagicMock(), collection_name="test")
|
||||
item_dict = {
|
||||
"title": "Test",
|
||||
"url": "http://test.com",
|
||||
"content_text": "Content",
|
||||
"source": "Source",
|
||||
"timestamp": datetime(2024, 1, 15, tzinfo=timezone.utc),
|
||||
"relevance_score": 7,
|
||||
"summary_ru": "Summary",
|
||||
"category": "Tech",
|
||||
"anomalies_detected": "A1,A2,A3" # String instead of list
|
||||
}
|
||||
|
||||
# Act
|
||||
dto = store._dict_to_dto(item_dict)
|
||||
|
||||
# Assert
|
||||
assert dto.anomalies_detected == ["A1", "A2", "A3"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dict_to_dto_handles_empty_anomalies():
|
||||
# Arrange
|
||||
store = ChromaStore(client=MagicMock(), collection_name="test")
|
||||
item_dict = {
|
||||
"title": "Test",
|
||||
"url": "http://test.com",
|
||||
"content_text": "Content",
|
||||
"source": "Source",
|
||||
"timestamp": datetime(2024, 1, 15, tzinfo=timezone.utc),
|
||||
"relevance_score": 7,
|
||||
"summary_ru": "Summary",
|
||||
"category": "Tech",
|
||||
"anomalies_detected": None
|
||||
}
|
||||
|
||||
# Act
|
||||
dto = store._dict_to_dto(item_dict)
|
||||
|
||||
# Assert
|
||||
assert dto.anomalies_detected == []
|
||||
|
||||
@ -1,306 +0,0 @@
|
||||
import pytest
|
||||
import sqlite3
|
||||
import uuid
|
||||
import asyncio
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
from contextlib import contextmanager
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from src.storage.sqlite_store import SQLiteStore, StatsCache
|
||||
from src.processor.dto import EnrichedNewsItemDTO
|
||||
from src.processor.anomaly_types import AnomalyType
|
||||
|
||||
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
@pytest.fixture
|
||||
def sqlite_store():
|
||||
# Use a temporary file for the database to allow multiple connections
|
||||
# to the same data while still being isolated between tests.
|
||||
fd, path = tempfile.mkstemp()
|
||||
os.close(fd)
|
||||
db_path = Path(path)
|
||||
store = SQLiteStore(db_path)
|
||||
yield store
|
||||
if db_path.exists():
|
||||
try:
|
||||
os.remove(db_path)
|
||||
except PermissionError:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_store_with_anomalies(sqlite_store):
|
||||
"""1. Test store_with_anomalies - verify atomic writes with anomaly junction records"""
|
||||
item = EnrichedNewsItemDTO(
|
||||
title="Test News Title",
|
||||
url="https://example.com/test-news-1",
|
||||
content_text="Sample content for the news item.",
|
||||
source="Test Source",
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
relevance_score=8,
|
||||
summary_ru="Тестовая сводка",
|
||||
category="Technology",
|
||||
anomalies_detected=["WebGPU"]
|
||||
)
|
||||
anomalies = [AnomalyType.WEBGPU]
|
||||
|
||||
doc_id = await sqlite_store.store_with_anomalies(item, anomalies)
|
||||
assert doc_id is not None
|
||||
|
||||
# Verify records in database
|
||||
with sqlite_store._get_connection() as conn:
|
||||
news_row = conn.execute("SELECT * FROM news_items WHERE id = ?", (doc_id,)).fetchone()
|
||||
assert news_row is not None
|
||||
assert news_row["title"] == "Test News Title"
|
||||
assert news_row["category"] == "Technology"
|
||||
|
||||
# Check anomaly_types table
|
||||
anomaly_type_row = conn.execute(
|
||||
"SELECT * FROM anomaly_types WHERE name = ?", (AnomalyType.WEBGPU.value,)
|
||||
).fetchone()
|
||||
assert anomaly_type_row is not None
|
||||
assert anomaly_type_row["name"] == "WebGPU"
|
||||
|
||||
# Check news_anomalies junction table
|
||||
junction_row = conn.execute(
|
||||
"SELECT * FROM news_anomalies WHERE news_id = ? AND anomaly_id = ?",
|
||||
(doc_id, anomaly_type_row["id"])
|
||||
).fetchone()
|
||||
assert junction_row is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_by_id(sqlite_store):
|
||||
"""2. Test get_by_id - verify proper reconstruction of DTO with anomalies"""
|
||||
item_timestamp = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
|
||||
item = EnrichedNewsItemDTO(
|
||||
title="DTO Reconstruction Test",
|
||||
url="https://example.com/reconstruct",
|
||||
content_text="Detailed content here.",
|
||||
source="Source B",
|
||||
timestamp=item_timestamp,
|
||||
relevance_score=7,
|
||||
summary_ru="Сводка для теста",
|
||||
category="Science",
|
||||
anomalies_detected=["WebGPU", "Edge AI"]
|
||||
)
|
||||
anomalies = [AnomalyType.WEBGPU, AnomalyType.EDGE_AI]
|
||||
doc_id = await sqlite_store.store_with_anomalies(item, anomalies)
|
||||
|
||||
result = await sqlite_store.get_by_id(doc_id)
|
||||
assert result is not None
|
||||
assert result["id"] == doc_id
|
||||
assert result["title"] == item.title
|
||||
assert result["url"] == item.url
|
||||
assert result["relevance_score"] == item.relevance_score
|
||||
assert result["category"] == item.category
|
||||
assert "WebGPU" in result["anomalies_detected"]
|
||||
assert "Edge AI" in result["anomalies_detected"]
|
||||
assert len(result["anomalies_detected"]) == 2
|
||||
|
||||
# Check timestamp reconstruction
|
||||
# result["timestamp"] is expected to be a datetime object
|
||||
assert result["timestamp"].replace(tzinfo=timezone.utc) == item_timestamp
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exists(sqlite_store):
|
||||
"""3. Test exists - verify URL-based existence check"""
|
||||
url = "https://example.com/exists-test"
|
||||
item = EnrichedNewsItemDTO(
|
||||
title="Exists Test",
|
||||
url=url,
|
||||
content_text="...",
|
||||
source="Source",
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
relevance_score=1,
|
||||
summary_ru="...",
|
||||
category="Cat",
|
||||
anomalies_detected=[]
|
||||
)
|
||||
|
||||
# Before storing
|
||||
assert await sqlite_store.exists(url) is False
|
||||
|
||||
# After storing
|
||||
await sqlite_store.store_with_anomalies(item, [])
|
||||
assert await sqlite_store.exists(url) is True
|
||||
|
||||
# Other URL
|
||||
assert await sqlite_store.exists("https://example.com/does-not-exist") is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_latest(sqlite_store):
|
||||
"""4. Test get_latest - verify indexed timestamp ordering with pagination"""
|
||||
base_time = datetime(2023, 1, 1, tzinfo=timezone.utc).timestamp()
|
||||
|
||||
# Store 5 items with increasing timestamps
|
||||
for i in range(5):
|
||||
item = EnrichedNewsItemDTO(
|
||||
title=f"News Item {i}",
|
||||
url=f"https://example.com/news-{i}",
|
||||
content_text="...",
|
||||
source="Source",
|
||||
timestamp=datetime.fromtimestamp(base_time + i, tz=timezone.utc),
|
||||
relevance_score=5,
|
||||
summary_ru="...",
|
||||
category="Tech" if i % 2 == 0 else "Science",
|
||||
anomalies_detected=[]
|
||||
)
|
||||
await sqlite_store.store_with_anomalies(item, [])
|
||||
|
||||
# Latest items (descending timestamp)
|
||||
latest = await sqlite_store.get_latest(limit=10)
|
||||
assert len(latest) == 5
|
||||
assert latest[0]["title"] == "News Item 4"
|
||||
assert latest[-1]["title"] == "News Item 0"
|
||||
|
||||
# Pagination: limit 2, offset 1
|
||||
paged = await sqlite_store.get_latest(limit=2, offset=1)
|
||||
assert len(paged) == 2
|
||||
assert paged[0]["title"] == "News Item 3"
|
||||
assert paged[1]["title"] == "News Item 2"
|
||||
|
||||
# Category filter
|
||||
science_only = await sqlite_store.get_latest(category="Science")
|
||||
assert len(science_only) == 2
|
||||
for item in science_only:
|
||||
assert item["category"] == "Science"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_top_ranked(sqlite_store):
|
||||
"""5. Test get_top_ranked - verify indexed relevance ordering with pagination"""
|
||||
for i in range(5):
|
||||
item = EnrichedNewsItemDTO(
|
||||
title=f"Ranked Item {i}",
|
||||
url=f"https://example.com/ranked-{i}",
|
||||
content_text="...",
|
||||
source="Source",
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
relevance_score=i * 2, # Scores: 0, 2, 4, 6, 8
|
||||
summary_ru="...",
|
||||
category="Tech",
|
||||
anomalies_detected=[]
|
||||
)
|
||||
await sqlite_store.store_with_anomalies(item, [])
|
||||
|
||||
# Top ranked items (descending score)
|
||||
top = await sqlite_store.get_top_ranked(limit=10)
|
||||
assert len(top) == 5
|
||||
assert top[0]["relevance_score"] == 8
|
||||
assert top[-1]["relevance_score"] == 0
|
||||
|
||||
# Pagination: limit 2, offset 1
|
||||
paged = await sqlite_store.get_top_ranked(limit=2, offset=1)
|
||||
assert len(paged) == 2
|
||||
assert paged[0]["relevance_score"] == 6
|
||||
assert paged[1]["relevance_score"] == 4
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_stats(sqlite_store):
|
||||
"""6. Test get_stats - verify correct aggregation with cache"""
|
||||
item1 = EnrichedNewsItemDTO(
|
||||
title="N1", url="u1", content_text="C1", source="S1",
|
||||
timestamp=datetime.now(timezone.utc), relevance_score=5,
|
||||
summary_ru="S1", category="CatA", anomalies_detected=["WebGPU"]
|
||||
)
|
||||
item2 = EnrichedNewsItemDTO(
|
||||
title="N2", url="u2", content_text="C2", source="S2",
|
||||
timestamp=datetime.now(timezone.utc), relevance_score=5,
|
||||
summary_ru="S2", category="CatA", anomalies_detected=["WebGPU", "Edge AI"]
|
||||
)
|
||||
|
||||
await sqlite_store.store_with_anomalies(item1, [AnomalyType.WEBGPU])
|
||||
await sqlite_store.store_with_anomalies(item2, [AnomalyType.WEBGPU, AnomalyType.EDGE_AI])
|
||||
|
||||
# Initial stats
|
||||
stats = await sqlite_store.get_stats(use_cache=False)
|
||||
assert stats["total_count"] == 2
|
||||
assert stats["category_counts"]["CatA"] == 2
|
||||
assert stats["source_counts"]["S1"] == 1
|
||||
assert stats["source_counts"]["S2"] == 1
|
||||
assert stats["anomaly_counts"]["WebGPU"] == 2
|
||||
assert stats["anomaly_counts"]["Edge AI"] == 1
|
||||
|
||||
# Verify cache works: manually add item to DB without invalidating cache
|
||||
with sqlite_store._get_connection() as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO news_items (id, title, url, source, timestamp, relevance_score, category) VALUES (?,?,?,?,?,?,?)",
|
||||
("cached-id", "cached", "cached-url", "cached-source", 0, 0, "CatB")
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Should still return cached stats if use_cache=True
|
||||
cached_stats = await sqlite_store.get_stats(use_cache=True)
|
||||
assert cached_stats["total_count"] == 2
|
||||
assert "CatB" not in cached_stats["category_counts"]
|
||||
|
||||
# Should return new stats if use_cache=False
|
||||
new_stats = await sqlite_store.get_stats(use_cache=False)
|
||||
assert new_stats["total_count"] == 3
|
||||
assert new_stats["category_counts"]["CatB"] == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_acid_compliance(sqlite_store):
|
||||
"""7. Test ACID compliance - verify transactions rollback on error"""
|
||||
item = EnrichedNewsItemDTO(
|
||||
title="Atomic Test",
|
||||
url="https://example.com/atomic",
|
||||
content_text="Should not be saved",
|
||||
source="Source",
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
relevance_score=5,
|
||||
summary_ru="Summary",
|
||||
category="Tech",
|
||||
anomalies_detected=["WebGPU"]
|
||||
)
|
||||
|
||||
# Use a mock to simulate an error mid-transaction
|
||||
# We want it to fail after news_items is inserted but before anomalies are finished
|
||||
|
||||
original_get_conn = sqlite_store._get_connection
|
||||
|
||||
@contextmanager
|
||||
def mocked_get_connection():
|
||||
with original_get_conn() as conn:
|
||||
# We can't patch conn.execute directly, so we'll wrap the connection
|
||||
# but still allow it to behave like a connection for the rest.
|
||||
|
||||
# Since SQLiteStore uses conn.execute directly on the yielded connection,
|
||||
# we can return a proxy object.
|
||||
|
||||
class ConnProxy:
|
||||
def __init__(self, real_conn):
|
||||
self.real_conn = real_conn
|
||||
def execute(self, sql, *args):
|
||||
if "anomaly_types" in sql or "news_anomalies" in sql:
|
||||
raise sqlite3.Error("Simulated database failure")
|
||||
return self.real_conn.execute(sql, *args)
|
||||
def commit(self): return self.real_conn.commit()
|
||||
def rollback(self): return self.real_conn.rollback()
|
||||
def __getattr__(self, name): return getattr(self.real_conn, name)
|
||||
# The connection is used in context blocks
|
||||
def __enter__(self): return self
|
||||
def __exit__(self, *args): pass
|
||||
|
||||
yield ConnProxy(conn)
|
||||
|
||||
# Patch the _get_connection method of the store
|
||||
with patch.object(sqlite_store, '_get_connection', side_effect=mocked_get_connection):
|
||||
with pytest.raises(sqlite3.Error, match="Simulated database failure"):
|
||||
await sqlite_store.store_with_anomalies(item, [AnomalyType.WEBGPU])
|
||||
|
||||
# If ACID works, the news_items table should still be empty
|
||||
assert await sqlite_store.count_all() == 0
|
||||
|
||||
with sqlite_store._get_connection() as conn:
|
||||
count = conn.execute("SELECT COUNT(*) FROM news_items").fetchone()[0]
|
||||
assert count == 0
|
||||
Loading…
x
Reference in New Issue
Block a user