import pytest import sqlite3 import uuid import asyncio from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Any from contextlib import contextmanager from unittest.mock import patch, MagicMock from src.storage.sqlite_store import SQLiteStore, StatsCache from src.processor.dto import EnrichedNewsItemDTO from src.processor.anomaly_types import AnomalyType import tempfile import os @pytest.fixture def sqlite_store(): # Use a temporary file for the database to allow multiple connections # to the same data while still being isolated between tests. fd, path = tempfile.mkstemp() os.close(fd) db_path = Path(path) store = SQLiteStore(db_path) yield store if db_path.exists(): try: os.remove(db_path) except PermissionError: pass @pytest.mark.asyncio async def test_store_with_anomalies(sqlite_store): """1. Test store_with_anomalies - verify atomic writes with anomaly junction records""" item = EnrichedNewsItemDTO( title="Test News Title", url="https://example.com/test-news-1", content_text="Sample content for the news item.", source="Test Source", timestamp=datetime.now(timezone.utc), relevance_score=8, summary_ru="Тестовая сводка", category="Technology", anomalies_detected=["WebGPU"] ) anomalies = [AnomalyType.WEBGPU] doc_id = await sqlite_store.store_with_anomalies(item, anomalies) assert doc_id is not None # Verify records in database with sqlite_store._get_connection() as conn: news_row = conn.execute("SELECT * FROM news_items WHERE id = ?", (doc_id,)).fetchone() assert news_row is not None assert news_row["title"] == "Test News Title" assert news_row["category"] == "Technology" # Check anomaly_types table anomaly_type_row = conn.execute( "SELECT * FROM anomaly_types WHERE name = ?", (AnomalyType.WEBGPU.value,) ).fetchone() assert anomaly_type_row is not None assert anomaly_type_row["name"] == "WebGPU" # Check news_anomalies junction table junction_row = conn.execute( "SELECT * FROM news_anomalies WHERE news_id = ? AND anomaly_id = ?", (doc_id, anomaly_type_row["id"]) ).fetchone() assert junction_row is not None @pytest.mark.asyncio async def test_get_by_id(sqlite_store): """2. Test get_by_id - verify proper reconstruction of DTO with anomalies""" item_timestamp = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc) item = EnrichedNewsItemDTO( title="DTO Reconstruction Test", url="https://example.com/reconstruct", content_text="Detailed content here.", source="Source B", timestamp=item_timestamp, relevance_score=7, summary_ru="Сводка для теста", category="Science", anomalies_detected=["WebGPU", "Edge AI"] ) anomalies = [AnomalyType.WEBGPU, AnomalyType.EDGE_AI] doc_id = await sqlite_store.store_with_anomalies(item, anomalies) result = await sqlite_store.get_by_id(doc_id) assert result is not None assert result["id"] == doc_id assert result["title"] == item.title assert result["url"] == item.url assert result["relevance_score"] == item.relevance_score assert result["category"] == item.category assert "WebGPU" in result["anomalies_detected"] assert "Edge AI" in result["anomalies_detected"] assert len(result["anomalies_detected"]) == 2 # Check timestamp reconstruction # result["timestamp"] is expected to be a datetime object assert result["timestamp"].replace(tzinfo=timezone.utc) == item_timestamp @pytest.mark.asyncio async def test_exists(sqlite_store): """3. Test exists - verify URL-based existence check""" url = "https://example.com/exists-test" item = EnrichedNewsItemDTO( title="Exists Test", url=url, content_text="...", source="Source", timestamp=datetime.now(timezone.utc), relevance_score=1, summary_ru="...", category="Cat", anomalies_detected=[] ) # Before storing assert await sqlite_store.exists(url) is False # After storing await sqlite_store.store_with_anomalies(item, []) assert await sqlite_store.exists(url) is True # Other URL assert await sqlite_store.exists("https://example.com/does-not-exist") is False @pytest.mark.asyncio async def test_get_latest(sqlite_store): """4. Test get_latest - verify indexed timestamp ordering with pagination""" base_time = datetime(2023, 1, 1, tzinfo=timezone.utc).timestamp() # Store 5 items with increasing timestamps for i in range(5): item = EnrichedNewsItemDTO( title=f"News Item {i}", url=f"https://example.com/news-{i}", content_text="...", source="Source", timestamp=datetime.fromtimestamp(base_time + i, tz=timezone.utc), relevance_score=5, summary_ru="...", category="Tech" if i % 2 == 0 else "Science", anomalies_detected=[] ) await sqlite_store.store_with_anomalies(item, []) # Latest items (descending timestamp) latest = await sqlite_store.get_latest(limit=10) assert len(latest) == 5 assert latest[0]["title"] == "News Item 4" assert latest[-1]["title"] == "News Item 0" # Pagination: limit 2, offset 1 paged = await sqlite_store.get_latest(limit=2, offset=1) assert len(paged) == 2 assert paged[0]["title"] == "News Item 3" assert paged[1]["title"] == "News Item 2" # Category filter science_only = await sqlite_store.get_latest(category="Science") assert len(science_only) == 2 for item in science_only: assert item["category"] == "Science" @pytest.mark.asyncio async def test_get_top_ranked(sqlite_store): """5. Test get_top_ranked - verify indexed relevance ordering with pagination""" for i in range(5): item = EnrichedNewsItemDTO( title=f"Ranked Item {i}", url=f"https://example.com/ranked-{i}", content_text="...", source="Source", timestamp=datetime.now(timezone.utc), relevance_score=i * 2, # Scores: 0, 2, 4, 6, 8 summary_ru="...", category="Tech", anomalies_detected=[] ) await sqlite_store.store_with_anomalies(item, []) # Top ranked items (descending score) top = await sqlite_store.get_top_ranked(limit=10) assert len(top) == 5 assert top[0]["relevance_score"] == 8 assert top[-1]["relevance_score"] == 0 # Pagination: limit 2, offset 1 paged = await sqlite_store.get_top_ranked(limit=2, offset=1) assert len(paged) == 2 assert paged[0]["relevance_score"] == 6 assert paged[1]["relevance_score"] == 4 @pytest.mark.asyncio async def test_get_stats(sqlite_store): """6. Test get_stats - verify correct aggregation with cache""" item1 = EnrichedNewsItemDTO( title="N1", url="u1", content_text="C1", source="S1", timestamp=datetime.now(timezone.utc), relevance_score=5, summary_ru="S1", category="CatA", anomalies_detected=["WebGPU"] ) item2 = EnrichedNewsItemDTO( title="N2", url="u2", content_text="C2", source="S2", timestamp=datetime.now(timezone.utc), relevance_score=5, summary_ru="S2", category="CatA", anomalies_detected=["WebGPU", "Edge AI"] ) await sqlite_store.store_with_anomalies(item1, [AnomalyType.WEBGPU]) await sqlite_store.store_with_anomalies(item2, [AnomalyType.WEBGPU, AnomalyType.EDGE_AI]) # Initial stats stats = await sqlite_store.get_stats(use_cache=False) assert stats["total_count"] == 2 assert stats["category_counts"]["CatA"] == 2 assert stats["source_counts"]["S1"] == 1 assert stats["source_counts"]["S2"] == 1 assert stats["anomaly_counts"]["WebGPU"] == 2 assert stats["anomaly_counts"]["Edge AI"] == 1 # Verify cache works: manually add item to DB without invalidating cache with sqlite_store._get_connection() as conn: conn.execute( "INSERT INTO news_items (id, title, url, source, timestamp, relevance_score, category) VALUES (?,?,?,?,?,?,?)", ("cached-id", "cached", "cached-url", "cached-source", 0, 0, "CatB") ) conn.commit() # Should still return cached stats if use_cache=True cached_stats = await sqlite_store.get_stats(use_cache=True) assert cached_stats["total_count"] == 2 assert "CatB" not in cached_stats["category_counts"] # Should return new stats if use_cache=False new_stats = await sqlite_store.get_stats(use_cache=False) assert new_stats["total_count"] == 3 assert new_stats["category_counts"]["CatB"] == 1 @pytest.mark.asyncio async def test_acid_compliance(sqlite_store): """7. Test ACID compliance - verify transactions rollback on error""" item = EnrichedNewsItemDTO( title="Atomic Test", url="https://example.com/atomic", content_text="Should not be saved", source="Source", timestamp=datetime.now(timezone.utc), relevance_score=5, summary_ru="Summary", category="Tech", anomalies_detected=["WebGPU"] ) # Use a mock to simulate an error mid-transaction # We want it to fail after news_items is inserted but before anomalies are finished original_get_conn = sqlite_store._get_connection @contextmanager def mocked_get_connection(): with original_get_conn() as conn: # We can't patch conn.execute directly, so we'll wrap the connection # but still allow it to behave like a connection for the rest. # Since SQLiteStore uses conn.execute directly on the yielded connection, # we can return a proxy object. class ConnProxy: def __init__(self, real_conn): self.real_conn = real_conn def execute(self, sql, *args): if "anomaly_types" in sql or "news_anomalies" in sql: raise sqlite3.Error("Simulated database failure") return self.real_conn.execute(sql, *args) def commit(self): return self.real_conn.commit() def rollback(self): return self.real_conn.rollback() def __getattr__(self, name): return getattr(self.real_conn, name) # The connection is used in context blocks def __enter__(self): return self def __exit__(self, *args): pass yield ConnProxy(conn) # Patch the _get_connection method of the store with patch.object(sqlite_store, '_get_connection', side_effect=mocked_get_connection): with pytest.raises(sqlite3.Error, match="Simulated database failure"): await sqlite_store.store_with_anomalies(item, [AnomalyType.WEBGPU]) # If ACID works, the news_items table should still be empty assert await sqlite_store.count_all() == 0 with sqlite_store._get_connection() as conn: count = conn.execute("SELECT COUNT(*) FROM news_items").fetchone()[0] assert count == 0