:Release Notes: - Add ACID-compliant SQLiteStore (WAL mode, FULL sync, FK constraints) - Add AnomalyType enum for normalized anomaly storage - Add legacy data migration script (dry-run, batch, rollback) - Update ChromaStore to delegate indexed queries to SQLite - Add test suite for SQLiteStore (7 tests, all passing) :Detailed Notes: - SQLiteStore: news_items, anomaly_types, news_anomalies tables with indexes - Performance: get_latest/get_top_ranked O(n)→O(log n), get_stats O(n)→O(1) - ChromaDB remains primary vector store; SQLite provides indexed metadata queries :Testing Performed: - python3 -m pytest tests/ -v (112 passed) :QA Notes: - Tests verified by Python QA Engineer subagent :Issues Addressed: - get_latest/get_top_ranked fetched ALL items then sorted in Python - get_stats iterated over ALL items - anomalies_detected stored as comma-joined string (no index) Change-Id: I708808b6e72889869afcf16d4ac274260242007a
307 lines
11 KiB
Python
307 lines
11 KiB
Python
import pytest
|
|
import sqlite3
|
|
import uuid
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
from contextlib import contextmanager
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from src.storage.sqlite_store import SQLiteStore, StatsCache
|
|
from src.processor.dto import EnrichedNewsItemDTO
|
|
from src.processor.anomaly_types import AnomalyType
|
|
|
|
|
|
import tempfile
|
|
import os
|
|
|
|
@pytest.fixture
|
|
def sqlite_store():
|
|
# Use a temporary file for the database to allow multiple connections
|
|
# to the same data while still being isolated between tests.
|
|
fd, path = tempfile.mkstemp()
|
|
os.close(fd)
|
|
db_path = Path(path)
|
|
store = SQLiteStore(db_path)
|
|
yield store
|
|
if db_path.exists():
|
|
try:
|
|
os.remove(db_path)
|
|
except PermissionError:
|
|
pass
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_store_with_anomalies(sqlite_store):
|
|
"""1. Test store_with_anomalies - verify atomic writes with anomaly junction records"""
|
|
item = EnrichedNewsItemDTO(
|
|
title="Test News Title",
|
|
url="https://example.com/test-news-1",
|
|
content_text="Sample content for the news item.",
|
|
source="Test Source",
|
|
timestamp=datetime.now(timezone.utc),
|
|
relevance_score=8,
|
|
summary_ru="Тестовая сводка",
|
|
category="Technology",
|
|
anomalies_detected=["WebGPU"]
|
|
)
|
|
anomalies = [AnomalyType.WEBGPU]
|
|
|
|
doc_id = await sqlite_store.store_with_anomalies(item, anomalies)
|
|
assert doc_id is not None
|
|
|
|
# Verify records in database
|
|
with sqlite_store._get_connection() as conn:
|
|
news_row = conn.execute("SELECT * FROM news_items WHERE id = ?", (doc_id,)).fetchone()
|
|
assert news_row is not None
|
|
assert news_row["title"] == "Test News Title"
|
|
assert news_row["category"] == "Technology"
|
|
|
|
# Check anomaly_types table
|
|
anomaly_type_row = conn.execute(
|
|
"SELECT * FROM anomaly_types WHERE name = ?", (AnomalyType.WEBGPU.value,)
|
|
).fetchone()
|
|
assert anomaly_type_row is not None
|
|
assert anomaly_type_row["name"] == "WebGPU"
|
|
|
|
# Check news_anomalies junction table
|
|
junction_row = conn.execute(
|
|
"SELECT * FROM news_anomalies WHERE news_id = ? AND anomaly_id = ?",
|
|
(doc_id, anomaly_type_row["id"])
|
|
).fetchone()
|
|
assert junction_row is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_id(sqlite_store):
|
|
"""2. Test get_by_id - verify proper reconstruction of DTO with anomalies"""
|
|
item_timestamp = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
|
|
item = EnrichedNewsItemDTO(
|
|
title="DTO Reconstruction Test",
|
|
url="https://example.com/reconstruct",
|
|
content_text="Detailed content here.",
|
|
source="Source B",
|
|
timestamp=item_timestamp,
|
|
relevance_score=7,
|
|
summary_ru="Сводка для теста",
|
|
category="Science",
|
|
anomalies_detected=["WebGPU", "Edge AI"]
|
|
)
|
|
anomalies = [AnomalyType.WEBGPU, AnomalyType.EDGE_AI]
|
|
doc_id = await sqlite_store.store_with_anomalies(item, anomalies)
|
|
|
|
result = await sqlite_store.get_by_id(doc_id)
|
|
assert result is not None
|
|
assert result["id"] == doc_id
|
|
assert result["title"] == item.title
|
|
assert result["url"] == item.url
|
|
assert result["relevance_score"] == item.relevance_score
|
|
assert result["category"] == item.category
|
|
assert "WebGPU" in result["anomalies_detected"]
|
|
assert "Edge AI" in result["anomalies_detected"]
|
|
assert len(result["anomalies_detected"]) == 2
|
|
|
|
# Check timestamp reconstruction
|
|
# result["timestamp"] is expected to be a datetime object
|
|
assert result["timestamp"].replace(tzinfo=timezone.utc) == item_timestamp
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exists(sqlite_store):
|
|
"""3. Test exists - verify URL-based existence check"""
|
|
url = "https://example.com/exists-test"
|
|
item = EnrichedNewsItemDTO(
|
|
title="Exists Test",
|
|
url=url,
|
|
content_text="...",
|
|
source="Source",
|
|
timestamp=datetime.now(timezone.utc),
|
|
relevance_score=1,
|
|
summary_ru="...",
|
|
category="Cat",
|
|
anomalies_detected=[]
|
|
)
|
|
|
|
# Before storing
|
|
assert await sqlite_store.exists(url) is False
|
|
|
|
# After storing
|
|
await sqlite_store.store_with_anomalies(item, [])
|
|
assert await sqlite_store.exists(url) is True
|
|
|
|
# Other URL
|
|
assert await sqlite_store.exists("https://example.com/does-not-exist") is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_latest(sqlite_store):
|
|
"""4. Test get_latest - verify indexed timestamp ordering with pagination"""
|
|
base_time = datetime(2023, 1, 1, tzinfo=timezone.utc).timestamp()
|
|
|
|
# Store 5 items with increasing timestamps
|
|
for i in range(5):
|
|
item = EnrichedNewsItemDTO(
|
|
title=f"News Item {i}",
|
|
url=f"https://example.com/news-{i}",
|
|
content_text="...",
|
|
source="Source",
|
|
timestamp=datetime.fromtimestamp(base_time + i, tz=timezone.utc),
|
|
relevance_score=5,
|
|
summary_ru="...",
|
|
category="Tech" if i % 2 == 0 else "Science",
|
|
anomalies_detected=[]
|
|
)
|
|
await sqlite_store.store_with_anomalies(item, [])
|
|
|
|
# Latest items (descending timestamp)
|
|
latest = await sqlite_store.get_latest(limit=10)
|
|
assert len(latest) == 5
|
|
assert latest[0]["title"] == "News Item 4"
|
|
assert latest[-1]["title"] == "News Item 0"
|
|
|
|
# Pagination: limit 2, offset 1
|
|
paged = await sqlite_store.get_latest(limit=2, offset=1)
|
|
assert len(paged) == 2
|
|
assert paged[0]["title"] == "News Item 3"
|
|
assert paged[1]["title"] == "News Item 2"
|
|
|
|
# Category filter
|
|
science_only = await sqlite_store.get_latest(category="Science")
|
|
assert len(science_only) == 2
|
|
for item in science_only:
|
|
assert item["category"] == "Science"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_top_ranked(sqlite_store):
|
|
"""5. Test get_top_ranked - verify indexed relevance ordering with pagination"""
|
|
for i in range(5):
|
|
item = EnrichedNewsItemDTO(
|
|
title=f"Ranked Item {i}",
|
|
url=f"https://example.com/ranked-{i}",
|
|
content_text="...",
|
|
source="Source",
|
|
timestamp=datetime.now(timezone.utc),
|
|
relevance_score=i * 2, # Scores: 0, 2, 4, 6, 8
|
|
summary_ru="...",
|
|
category="Tech",
|
|
anomalies_detected=[]
|
|
)
|
|
await sqlite_store.store_with_anomalies(item, [])
|
|
|
|
# Top ranked items (descending score)
|
|
top = await sqlite_store.get_top_ranked(limit=10)
|
|
assert len(top) == 5
|
|
assert top[0]["relevance_score"] == 8
|
|
assert top[-1]["relevance_score"] == 0
|
|
|
|
# Pagination: limit 2, offset 1
|
|
paged = await sqlite_store.get_top_ranked(limit=2, offset=1)
|
|
assert len(paged) == 2
|
|
assert paged[0]["relevance_score"] == 6
|
|
assert paged[1]["relevance_score"] == 4
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_stats(sqlite_store):
|
|
"""6. Test get_stats - verify correct aggregation with cache"""
|
|
item1 = EnrichedNewsItemDTO(
|
|
title="N1", url="u1", content_text="C1", source="S1",
|
|
timestamp=datetime.now(timezone.utc), relevance_score=5,
|
|
summary_ru="S1", category="CatA", anomalies_detected=["WebGPU"]
|
|
)
|
|
item2 = EnrichedNewsItemDTO(
|
|
title="N2", url="u2", content_text="C2", source="S2",
|
|
timestamp=datetime.now(timezone.utc), relevance_score=5,
|
|
summary_ru="S2", category="CatA", anomalies_detected=["WebGPU", "Edge AI"]
|
|
)
|
|
|
|
await sqlite_store.store_with_anomalies(item1, [AnomalyType.WEBGPU])
|
|
await sqlite_store.store_with_anomalies(item2, [AnomalyType.WEBGPU, AnomalyType.EDGE_AI])
|
|
|
|
# Initial stats
|
|
stats = await sqlite_store.get_stats(use_cache=False)
|
|
assert stats["total_count"] == 2
|
|
assert stats["category_counts"]["CatA"] == 2
|
|
assert stats["source_counts"]["S1"] == 1
|
|
assert stats["source_counts"]["S2"] == 1
|
|
assert stats["anomaly_counts"]["WebGPU"] == 2
|
|
assert stats["anomaly_counts"]["Edge AI"] == 1
|
|
|
|
# Verify cache works: manually add item to DB without invalidating cache
|
|
with sqlite_store._get_connection() as conn:
|
|
conn.execute(
|
|
"INSERT INTO news_items (id, title, url, source, timestamp, relevance_score, category) VALUES (?,?,?,?,?,?,?)",
|
|
("cached-id", "cached", "cached-url", "cached-source", 0, 0, "CatB")
|
|
)
|
|
conn.commit()
|
|
|
|
# Should still return cached stats if use_cache=True
|
|
cached_stats = await sqlite_store.get_stats(use_cache=True)
|
|
assert cached_stats["total_count"] == 2
|
|
assert "CatB" not in cached_stats["category_counts"]
|
|
|
|
# Should return new stats if use_cache=False
|
|
new_stats = await sqlite_store.get_stats(use_cache=False)
|
|
assert new_stats["total_count"] == 3
|
|
assert new_stats["category_counts"]["CatB"] == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_acid_compliance(sqlite_store):
|
|
"""7. Test ACID compliance - verify transactions rollback on error"""
|
|
item = EnrichedNewsItemDTO(
|
|
title="Atomic Test",
|
|
url="https://example.com/atomic",
|
|
content_text="Should not be saved",
|
|
source="Source",
|
|
timestamp=datetime.now(timezone.utc),
|
|
relevance_score=5,
|
|
summary_ru="Summary",
|
|
category="Tech",
|
|
anomalies_detected=["WebGPU"]
|
|
)
|
|
|
|
# Use a mock to simulate an error mid-transaction
|
|
# We want it to fail after news_items is inserted but before anomalies are finished
|
|
|
|
original_get_conn = sqlite_store._get_connection
|
|
|
|
@contextmanager
|
|
def mocked_get_connection():
|
|
with original_get_conn() as conn:
|
|
# We can't patch conn.execute directly, so we'll wrap the connection
|
|
# but still allow it to behave like a connection for the rest.
|
|
|
|
# Since SQLiteStore uses conn.execute directly on the yielded connection,
|
|
# we can return a proxy object.
|
|
|
|
class ConnProxy:
|
|
def __init__(self, real_conn):
|
|
self.real_conn = real_conn
|
|
def execute(self, sql, *args):
|
|
if "anomaly_types" in sql or "news_anomalies" in sql:
|
|
raise sqlite3.Error("Simulated database failure")
|
|
return self.real_conn.execute(sql, *args)
|
|
def commit(self): return self.real_conn.commit()
|
|
def rollback(self): return self.real_conn.rollback()
|
|
def __getattr__(self, name): return getattr(self.real_conn, name)
|
|
# The connection is used in context blocks
|
|
def __enter__(self): return self
|
|
def __exit__(self, *args): pass
|
|
|
|
yield ConnProxy(conn)
|
|
|
|
# Patch the _get_connection method of the store
|
|
with patch.object(sqlite_store, '_get_connection', side_effect=mocked_get_connection):
|
|
with pytest.raises(sqlite3.Error, match="Simulated database failure"):
|
|
await sqlite_store.store_with_anomalies(item, [AnomalyType.WEBGPU])
|
|
|
|
# If ACID works, the news_items table should still be empty
|
|
assert await sqlite_store.count_all() == 0
|
|
|
|
with sqlite_store._get_connection() as conn:
|
|
count = conn.execute("SELECT COUNT(*) FROM news_items").fetchone()[0]
|
|
assert count == 0
|