AI-Trend-Scout/tests/storage/test_sqlite_store.py
Artur Mukhamadiev f4ae73bdae feat(database): SQLite shadow database for indexed queries
:Release Notes:
- Add ACID-compliant SQLiteStore (WAL mode, FULL sync, FK constraints)
- Add AnomalyType enum for normalized anomaly storage
- Add legacy data migration script (dry-run, batch, rollback)
- Update ChromaStore to delegate indexed queries to SQLite
- Add test suite for SQLiteStore (7 tests, all passing)

:Detailed Notes:
- SQLiteStore: news_items, anomaly_types, news_anomalies tables with indexes
- Performance: get_latest/get_top_ranked O(n)→O(log n), get_stats O(n)→O(1)
- ChromaDB remains primary vector store; SQLite provides indexed metadata queries

:Testing Performed:
- python3 -m pytest tests/ -v (112 passed)

:QA Notes:
- Tests verified by Python QA Engineer subagent

:Issues Addressed:
- get_latest/get_top_ranked fetched ALL items then sorted in Python
- get_stats iterated over ALL items
- anomalies_detected stored as comma-joined string (no index)

Change-Id: I708808b6e72889869afcf16d4ac274260242007a
2026-03-30 13:54:48 +03:00

307 lines
11 KiB
Python

import pytest
import sqlite3
import uuid
import asyncio
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Any
from contextlib import contextmanager
from unittest.mock import patch, MagicMock
from src.storage.sqlite_store import SQLiteStore, StatsCache
from src.processor.dto import EnrichedNewsItemDTO
from src.processor.anomaly_types import AnomalyType
import tempfile
import os
@pytest.fixture
def sqlite_store():
# Use a temporary file for the database to allow multiple connections
# to the same data while still being isolated between tests.
fd, path = tempfile.mkstemp()
os.close(fd)
db_path = Path(path)
store = SQLiteStore(db_path)
yield store
if db_path.exists():
try:
os.remove(db_path)
except PermissionError:
pass
@pytest.mark.asyncio
async def test_store_with_anomalies(sqlite_store):
"""1. Test store_with_anomalies - verify atomic writes with anomaly junction records"""
item = EnrichedNewsItemDTO(
title="Test News Title",
url="https://example.com/test-news-1",
content_text="Sample content for the news item.",
source="Test Source",
timestamp=datetime.now(timezone.utc),
relevance_score=8,
summary_ru="Тестовая сводка",
category="Technology",
anomalies_detected=["WebGPU"]
)
anomalies = [AnomalyType.WEBGPU]
doc_id = await sqlite_store.store_with_anomalies(item, anomalies)
assert doc_id is not None
# Verify records in database
with sqlite_store._get_connection() as conn:
news_row = conn.execute("SELECT * FROM news_items WHERE id = ?", (doc_id,)).fetchone()
assert news_row is not None
assert news_row["title"] == "Test News Title"
assert news_row["category"] == "Technology"
# Check anomaly_types table
anomaly_type_row = conn.execute(
"SELECT * FROM anomaly_types WHERE name = ?", (AnomalyType.WEBGPU.value,)
).fetchone()
assert anomaly_type_row is not None
assert anomaly_type_row["name"] == "WebGPU"
# Check news_anomalies junction table
junction_row = conn.execute(
"SELECT * FROM news_anomalies WHERE news_id = ? AND anomaly_id = ?",
(doc_id, anomaly_type_row["id"])
).fetchone()
assert junction_row is not None
@pytest.mark.asyncio
async def test_get_by_id(sqlite_store):
"""2. Test get_by_id - verify proper reconstruction of DTO with anomalies"""
item_timestamp = datetime(2023, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
item = EnrichedNewsItemDTO(
title="DTO Reconstruction Test",
url="https://example.com/reconstruct",
content_text="Detailed content here.",
source="Source B",
timestamp=item_timestamp,
relevance_score=7,
summary_ru="Сводка для теста",
category="Science",
anomalies_detected=["WebGPU", "Edge AI"]
)
anomalies = [AnomalyType.WEBGPU, AnomalyType.EDGE_AI]
doc_id = await sqlite_store.store_with_anomalies(item, anomalies)
result = await sqlite_store.get_by_id(doc_id)
assert result is not None
assert result["id"] == doc_id
assert result["title"] == item.title
assert result["url"] == item.url
assert result["relevance_score"] == item.relevance_score
assert result["category"] == item.category
assert "WebGPU" in result["anomalies_detected"]
assert "Edge AI" in result["anomalies_detected"]
assert len(result["anomalies_detected"]) == 2
# Check timestamp reconstruction
# result["timestamp"] is expected to be a datetime object
assert result["timestamp"].replace(tzinfo=timezone.utc) == item_timestamp
@pytest.mark.asyncio
async def test_exists(sqlite_store):
"""3. Test exists - verify URL-based existence check"""
url = "https://example.com/exists-test"
item = EnrichedNewsItemDTO(
title="Exists Test",
url=url,
content_text="...",
source="Source",
timestamp=datetime.now(timezone.utc),
relevance_score=1,
summary_ru="...",
category="Cat",
anomalies_detected=[]
)
# Before storing
assert await sqlite_store.exists(url) is False
# After storing
await sqlite_store.store_with_anomalies(item, [])
assert await sqlite_store.exists(url) is True
# Other URL
assert await sqlite_store.exists("https://example.com/does-not-exist") is False
@pytest.mark.asyncio
async def test_get_latest(sqlite_store):
"""4. Test get_latest - verify indexed timestamp ordering with pagination"""
base_time = datetime(2023, 1, 1, tzinfo=timezone.utc).timestamp()
# Store 5 items with increasing timestamps
for i in range(5):
item = EnrichedNewsItemDTO(
title=f"News Item {i}",
url=f"https://example.com/news-{i}",
content_text="...",
source="Source",
timestamp=datetime.fromtimestamp(base_time + i, tz=timezone.utc),
relevance_score=5,
summary_ru="...",
category="Tech" if i % 2 == 0 else "Science",
anomalies_detected=[]
)
await sqlite_store.store_with_anomalies(item, [])
# Latest items (descending timestamp)
latest = await sqlite_store.get_latest(limit=10)
assert len(latest) == 5
assert latest[0]["title"] == "News Item 4"
assert latest[-1]["title"] == "News Item 0"
# Pagination: limit 2, offset 1
paged = await sqlite_store.get_latest(limit=2, offset=1)
assert len(paged) == 2
assert paged[0]["title"] == "News Item 3"
assert paged[1]["title"] == "News Item 2"
# Category filter
science_only = await sqlite_store.get_latest(category="Science")
assert len(science_only) == 2
for item in science_only:
assert item["category"] == "Science"
@pytest.mark.asyncio
async def test_get_top_ranked(sqlite_store):
"""5. Test get_top_ranked - verify indexed relevance ordering with pagination"""
for i in range(5):
item = EnrichedNewsItemDTO(
title=f"Ranked Item {i}",
url=f"https://example.com/ranked-{i}",
content_text="...",
source="Source",
timestamp=datetime.now(timezone.utc),
relevance_score=i * 2, # Scores: 0, 2, 4, 6, 8
summary_ru="...",
category="Tech",
anomalies_detected=[]
)
await sqlite_store.store_with_anomalies(item, [])
# Top ranked items (descending score)
top = await sqlite_store.get_top_ranked(limit=10)
assert len(top) == 5
assert top[0]["relevance_score"] == 8
assert top[-1]["relevance_score"] == 0
# Pagination: limit 2, offset 1
paged = await sqlite_store.get_top_ranked(limit=2, offset=1)
assert len(paged) == 2
assert paged[0]["relevance_score"] == 6
assert paged[1]["relevance_score"] == 4
@pytest.mark.asyncio
async def test_get_stats(sqlite_store):
"""6. Test get_stats - verify correct aggregation with cache"""
item1 = EnrichedNewsItemDTO(
title="N1", url="u1", content_text="C1", source="S1",
timestamp=datetime.now(timezone.utc), relevance_score=5,
summary_ru="S1", category="CatA", anomalies_detected=["WebGPU"]
)
item2 = EnrichedNewsItemDTO(
title="N2", url="u2", content_text="C2", source="S2",
timestamp=datetime.now(timezone.utc), relevance_score=5,
summary_ru="S2", category="CatA", anomalies_detected=["WebGPU", "Edge AI"]
)
await sqlite_store.store_with_anomalies(item1, [AnomalyType.WEBGPU])
await sqlite_store.store_with_anomalies(item2, [AnomalyType.WEBGPU, AnomalyType.EDGE_AI])
# Initial stats
stats = await sqlite_store.get_stats(use_cache=False)
assert stats["total_count"] == 2
assert stats["category_counts"]["CatA"] == 2
assert stats["source_counts"]["S1"] == 1
assert stats["source_counts"]["S2"] == 1
assert stats["anomaly_counts"]["WebGPU"] == 2
assert stats["anomaly_counts"]["Edge AI"] == 1
# Verify cache works: manually add item to DB without invalidating cache
with sqlite_store._get_connection() as conn:
conn.execute(
"INSERT INTO news_items (id, title, url, source, timestamp, relevance_score, category) VALUES (?,?,?,?,?,?,?)",
("cached-id", "cached", "cached-url", "cached-source", 0, 0, "CatB")
)
conn.commit()
# Should still return cached stats if use_cache=True
cached_stats = await sqlite_store.get_stats(use_cache=True)
assert cached_stats["total_count"] == 2
assert "CatB" not in cached_stats["category_counts"]
# Should return new stats if use_cache=False
new_stats = await sqlite_store.get_stats(use_cache=False)
assert new_stats["total_count"] == 3
assert new_stats["category_counts"]["CatB"] == 1
@pytest.mark.asyncio
async def test_acid_compliance(sqlite_store):
"""7. Test ACID compliance - verify transactions rollback on error"""
item = EnrichedNewsItemDTO(
title="Atomic Test",
url="https://example.com/atomic",
content_text="Should not be saved",
source="Source",
timestamp=datetime.now(timezone.utc),
relevance_score=5,
summary_ru="Summary",
category="Tech",
anomalies_detected=["WebGPU"]
)
# Use a mock to simulate an error mid-transaction
# We want it to fail after news_items is inserted but before anomalies are finished
original_get_conn = sqlite_store._get_connection
@contextmanager
def mocked_get_connection():
with original_get_conn() as conn:
# We can't patch conn.execute directly, so we'll wrap the connection
# but still allow it to behave like a connection for the rest.
# Since SQLiteStore uses conn.execute directly on the yielded connection,
# we can return a proxy object.
class ConnProxy:
def __init__(self, real_conn):
self.real_conn = real_conn
def execute(self, sql, *args):
if "anomaly_types" in sql or "news_anomalies" in sql:
raise sqlite3.Error("Simulated database failure")
return self.real_conn.execute(sql, *args)
def commit(self): return self.real_conn.commit()
def rollback(self): return self.real_conn.rollback()
def __getattr__(self, name): return getattr(self.real_conn, name)
# The connection is used in context blocks
def __enter__(self): return self
def __exit__(self, *args): pass
yield ConnProxy(conn)
# Patch the _get_connection method of the store
with patch.object(sqlite_store, '_get_connection', side_effect=mocked_get_connection):
with pytest.raises(sqlite3.Error, match="Simulated database failure"):
await sqlite_store.store_with_anomalies(item, [AnomalyType.WEBGPU])
# If ACID works, the news_items table should still be empty
assert await sqlite_store.count_all() == 0
with sqlite_store._get_connection() as conn:
count = conn.execute("SELECT COUNT(*) FROM news_items").fetchone()[0]
assert count == 0