AI-Trend-Scout/scripts/migrate_legacy_data.py
Artur Mukhamadiev f4ae73bdae feat(database): SQLite shadow database for indexed queries
:Release Notes:
- Add ACID-compliant SQLiteStore (WAL mode, FULL sync, FK constraints)
- Add AnomalyType enum for normalized anomaly storage
- Add legacy data migration script (dry-run, batch, rollback)
- Update ChromaStore to delegate indexed queries to SQLite
- Add test suite for SQLiteStore (7 tests, all passing)

:Detailed Notes:
- SQLiteStore: news_items, anomaly_types, news_anomalies tables with indexes
- Performance: get_latest/get_top_ranked O(n)→O(log n), get_stats O(n)→O(1)
- ChromaDB remains primary vector store; SQLite provides indexed metadata queries

:Testing Performed:
- python3 -m pytest tests/ -v (112 passed)

:QA Notes:
- Tests verified by Python QA Engineer subagent

:Issues Addressed:
- get_latest/get_top_ranked fetched ALL items then sorted in Python
- get_stats iterated over ALL items
- anomalies_detected stored as comma-joined string (no index)

Change-Id: I708808b6e72889869afcf16d4ac274260242007a
2026-03-30 13:54:48 +03:00

1012 lines
35 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Migration Script: ChromaDB Legacy Data → SQLite Normalized Schema
This script migrates news items from ChromaDB (legacy format with comma-joined anomalies)
to SQLite (normalized schema with proper AnomalyType enum).
Features:
- Dry-run mode for validation without persistence
- Batch processing for large datasets
- Rollback capability with transaction management (ACID)
- Progress logging and error tracking
- Migration validation and report generation
Usage:
python scripts/migrate_legacy_data.py # Full migration
python scripts/migrate_legacy_data.py --dry-run # Validate without persisting
python scripts/migrate_legacy_data.py --batch-size 50 # Custom batch size
python scripts/migrate_legacy_data.py --rollback # Rollback last migration
"""
import argparse
import asyncio
import logging
import os
import sys
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import sqlite3
import json
# Add project root to path for imports
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
import chromadb
from chromadb.config import Settings
from src.storage.chroma_store import ChromaStore
from src.storage.sqlite_store import SQLiteStore
from src.processor.anomaly_types import AnomalyType, normalize_anomaly_list
from src.processor.dto import EnrichedNewsItemDTO
# ============================================================================
# Configuration & Constants
# ============================================================================
DEFAULT_BATCH_SIZE = 100
DEFAULT_CHROMA_PATH = "./chroma_db"
DEFAULT_SQLITE_PATH = "./data/migration_shadow.db"
DEFAULT_LOG_LEVEL = "INFO"
MIGRATION_STATE_FILE = "./data/migration_state.json"
class MigrationStatus(Enum):
"""Migration execution status."""
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
ROLLED_BACK = "rolled_back"
@dataclass
class MigrationConfig:
"""Configuration for migration execution."""
chroma_path: str = DEFAULT_CHROMA_PATH
sqlite_path: str = DEFAULT_SQLITE_PATH
batch_size: int = DEFAULT_BATCH_SIZE
dry_run: bool = False
rollback: bool = False
log_level: str = DEFAULT_LOG_LEVEL
@dataclass
class MigrationState:
"""State tracking for migration recovery."""
status: MigrationStatus = MigrationStatus.NOT_STARTED
started_at: Optional[str] = None
completed_at: Optional[str] = None
total_items: int = 0
processed_items: int = 0
successful_items: int = 0
failed_items: int = 0
error_log: List[Dict[str, Any]] = field(default_factory=list)
source_stats: Dict[str, Any] = field(default_factory=dict)
target_stats: Dict[str, Any] = field(default_factory=dict)
anomaly_transformations: Dict[str, int] = field(default_factory=dict)
batch_results: List[Dict[str, Any]] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return {
"status": self.status.value,
"started_at": self.started_at,
"completed_at": self.completed_at,
"total_items": self.total_items,
"processed_items": self.processed_items,
"successful_items": self.successful_items,
"failed_items": self.failed_items,
"error_log": self.error_log,
"source_stats": self.source_stats,
"target_stats": self.target_stats,
"anomaly_transformations": self.anomaly_transformations,
"batch_results": self.batch_results
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MigrationState":
state = cls()
state.status = MigrationStatus(data.get("status", "not_started"))
state.started_at = data.get("started_at")
state.completed_at = data.get("completed_at")
state.total_items = data.get("total_items", 0)
state.processed_items = data.get("processed_items", 0)
state.successful_items = data.get("successful_items", 0)
state.failed_items = data.get("failed_items", 0)
state.error_log = data.get("error_log", [])
state.source_stats = data.get("source_stats", {})
state.target_stats = data.get("target_stats", {})
state.anomaly_transformations = data.get("anomaly_transformations", {})
state.batch_results = data.get("batch_results", [])
return state
@dataclass
class MigrationReport:
"""Complete migration report for audit trail."""
success: bool
dry_run: bool
started_at: datetime
completed_at: datetime
duration_seconds: float
source_stats: Dict[str, Any]
target_stats: Dict[str, Any]
validation_results: Dict[str, Any]
anomaly_transformation_summary: Dict[str, Any]
error_summary: List[Dict[str, Any]]
batch_summary: List[Dict[str, Any]]
def to_dict(self) -> Dict[str, Any]:
return {
"success": self.success,
"dry_run": self.dry_run,
"started_at": self.started_at.isoformat(),
"completed_at": self.completed_at.isoformat(),
"duration_seconds": self.duration_seconds,
"source_stats": self.source_stats,
"target_stats": self.target_stats,
"validation_results": self.validation_results,
"anomaly_transformation_summary": self.anomaly_transformation_summary,
"error_summary": self.error_summary,
"batch_summary": self.batch_summary
}
# ============================================================================
# Logging Setup
# ============================================================================
def setup_logging(level: str) -> logging.Logger:
"""Configure structured logging for migration."""
log_level = getattr(logging, level.upper(), logging.INFO)
# Create formatter
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
# File handler for errors
log_dir = PROJECT_ROOT / "logs"
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(log_dir / "migration.log")
file_handler.setLevel(logging.ERROR)
file_handler.setFormatter(formatter)
# Configure root logger
logger = logging.getLogger("migration")
logger.setLevel(log_level)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
# ============================================================================
# State Persistence
# ============================================================================
class StateManager:
"""Manages migration state persistence for recovery and rollback."""
def __init__(self, state_file: Path, logger: logging.Logger):
self.state_file = state_file
self.logger = logger
self._ensure_state_dir()
def _ensure_state_dir(self) -> None:
self.state_file.parent.mkdir(parents=True, exist_ok=True)
def save_state(self, state: MigrationState) -> None:
"""Persist migration state to disk."""
try:
with open(self.state_file, "w") as f:
json.dump(state.to_dict(), f, indent=2, default=str)
self.logger.debug(f"State saved: {state.status.value}")
except Exception as e:
self.logger.error(f"Failed to save state: {e}")
raise
def load_state(self) -> Optional[MigrationState]:
"""Load previous migration state if exists."""
if not self.state_file.exists():
return None
try:
with open(self.state_file, "r") as f:
data = json.load(f)
return MigrationState.from_dict(data)
except Exception as e:
self.logger.warning(f"Could not load state file: {e}")
return None
def clear_state(self) -> None:
"""Remove state file after successful migration or rollback."""
if self.state_file.exists():
self.state_file.unlink()
self.logger.debug("State file cleared")
# ============================================================================
# Core Migration Logic
# ============================================================================
class LegacyDataMigrator:
"""
Migrates data from ChromaDB (legacy) to SQLite (normalized schema).
Key transformations:
- Comma-joined anomalies → Normalized AnomalyType enum
- Single metadata table → Normalized tables (news_items, anomaly_types, news_anomalies)
"""
def __init__(
self,
chroma_store: ChromaStore,
sqlite_store: SQLiteStore,
batch_size: int,
dry_run: bool,
logger: logging.Logger
):
self.chroma_store = chroma_store
self.sqlite_store = sqlite_store
self.batch_size = batch_size
self.dry_run = dry_run
self.logger = logger
self.state = MigrationState()
# Track anomaly transformations for reporting
self.anomaly_seen: Dict[str, int] = {}
self.anomaly_unknown: int = 0
async def get_source_stats(self) -> Dict[str, Any]:
"""Gather statistics from ChromaDB source."""
stats = await self.chroma_store.get_stats()
# Count anomaly types from raw data
anomaly_counts: Dict[str, int] = {}
all_items = await self._fetch_all_from_chroma()
for item in all_items:
for anomaly in item.anomalies_detected:
anomaly_counts[anomaly] = anomaly_counts.get(anomaly, 0) + 1
return {
"total_count": stats.get("total_count", 0),
"category_counts": {k: v for k, v in stats.items() if k.startswith("category_")},
"anomaly_counts": anomaly_counts,
"anomaly_unknown_count": self.anomaly_unknown
}
async def get_target_stats(self) -> Dict[str, Any]:
"""Gather statistics from SQLite target."""
stats = await self.sqlite_store.get_stats(use_cache=False)
return {
"total_count": stats.get("total_count", 0),
"category_counts": stats.get("category_counts", {}),
"source_counts": stats.get("source_counts", {}),
"anomaly_counts": stats.get("anomaly_counts", {})
}
async def _fetch_all_from_chroma(self) -> List[EnrichedNewsItemDTO]:
"""Fetch all items from ChromaDB using pagination."""
items = []
seen_urls = set()
# Use get_latest with high limit to fetch all items
# ChromaDB doesn't have a direct "get_all" so we use a large limit
try:
all_items = await self.chroma_store.get_latest(limit=10000)
items.extend(all_items)
except Exception as e:
self.logger.warning(f"get_latest failed: {e}, trying alternative fetch")
# Fallback: fetch via raw collection access
items = await self._raw_fetch_all()
return items
async def _raw_fetch_all(self) -> List[EnrichedNewsItemDTO]:
"""Raw fetch all items directly from ChromaDB collection."""
import asyncio
results = await asyncio.to_thread(
self.chroma_store.collection.get,
include=["metadatas", "documents"]
)
metadatas = results.get("metadatas") or []
documents = results.get("documents") or []
items = []
for meta, doc in zip(metadatas, documents):
if meta:
try:
dto = self.chroma_store._reconstruct_dto(meta, doc)
items.append(dto)
except Exception as e:
self.logger.warning(f"Failed to reconstruct DTO: {e}")
return items
def _transform_anomalies(self, anomalies: List[str]) -> List[AnomalyType]:
"""
Transform comma-joined anomaly strings to normalized AnomalyType enum.
This is the core transformation logic that normalizes legacy data.
"""
normalized = normalize_anomaly_list(anomalies)
# Track transformations for reporting
for anomaly_str in anomalies:
normalized_type = AnomalyType.from_string(anomaly_str)
key = f"{anomaly_str.strip()}{normalized_type.value}"
self.anomaly_seen[key] = self.anomaly_seen.get(key, 0) + 1
if normalized_type == AnomalyType.UNKNOWN:
self.anomaly_unknown += 1
return normalized
async def migrate_batch(
self,
items: List[EnrichedNewsItemDTO],
batch_num: int
) -> Tuple[int, int, List[Dict[str, Any]]]:
"""
Migrate a single batch of items.
Returns:
Tuple of (success_count, failure_count, errors)
"""
success_count = 0
failure_count = 0
errors = []
for item in items:
try:
# Transform anomalies from legacy format
anomaly_types = self._transform_anomalies(item.anomalies_detected)
if not self.dry_run:
# Store in SQLite with normalized anomalies
await self.sqlite_store.store_with_anomalies(item, anomaly_types)
success_count += 1
self.logger.debug(
f"Batch {batch_num}: Migrated item '{item.title[:50]}...' "
f"with {len(anomaly_types)} anomalies"
)
except Exception as e:
failure_count += 1
error_record = {
"batch": batch_num,
"url": item.url,
"title": item.title[:100] if item.title else "N/A",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
errors.append(error_record)
self.logger.error(
f"Batch {batch_num}: Failed to migrate '{item.url}': {e}"
)
return success_count, failure_count, errors
async def execute(self) -> MigrationState:
"""
Execute the full migration process.
Steps:
1. Gather source statistics
2. Fetch all items from ChromaDB
3. Process in batches
4. Validate counts
5. Update state
"""
self.state.status = MigrationStatus.IN_PROGRESS
self.state.started_at = datetime.now().isoformat()
self.logger.info("=" * 70)
self.logger.info("LEGACY DATA MIGRATION: ChromaDB → SQLite")
self.logger.info("=" * 70)
self.logger.info(f"Dry-run mode: {self.dry_run}")
self.logger.info(f"Batch size: {self.batch_size}")
self.logger.info(f"Source: {self.chroma_store.collection.name}")
self.logger.info("-" * 70)
try:
# Step 1: Gather source statistics
self.logger.info("Step 1/5: Gathering source statistics...")
self.state.source_stats = await self.get_source_stats()
self.state.total_items = self.state.source_stats.get("total_count", 0)
self.logger.info(
f" Source items: {self.state.total_items}, "
f"Anomalies: {len(self.state.source_stats.get('anomaly_counts', {}))}"
)
# Step 2: Fetch all items from ChromaDB
self.logger.info("Step 2/5: Fetching items from ChromaDB...")
all_items = await self._fetch_all_from_chroma()
self.logger.info(f" Fetched {len(all_items)} items from ChromaDB")
# Step 3: Process in batches
self.logger.info("Step 3/5: Processing batches...")
total_success = 0
total_failures = 0
all_errors = []
batch_results = []
for i in range(0, len(all_items), self.batch_size):
batch_num = (i // self.batch_size) + 1
batch = all_items[i:i + self.batch_size]
self.logger.info(
f" Processing batch {batch_num} "
f"(items {i+1}-{min(i+self.batch_size, len(all_items))})"
)
success, failures, errors = await self.migrate_batch(batch, batch_num)
total_success += success
total_failures += failures
all_errors.extend(errors)
self.state.processed_items = i + len(batch)
self.state.successful_items = total_success
self.state.failed_items = total_failures
batch_results.append({
"batch_num": batch_num,
"size": len(batch),
"success": success,
"failures": failures
})
self.logger.info(
f" Batch {batch_num} complete: "
f"{success} success, {failures} failures"
)
self.state.batch_results = batch_results
self.state.error_log = all_errors
# Step 4: Validate
self.logger.info("Step 4/5: Validating migration...")
self.state.target_stats = await self.get_target_stats()
if not self.dry_run:
source_count = self.state.source_stats.get("total_count", 0)
target_count = self.state.target_stats.get("total_count", 0)
if source_count != target_count:
self.logger.warning(
f"Count mismatch: Source={source_count}, Target={target_count}"
)
else:
self.logger.info(f" Count validation passed: {target_count} items")
else:
self.logger.info(" Dry-run: Skipping target validation")
# Track anomaly transformations
self.state.anomaly_transformations = {
"transformation_map": self.anomaly_seen,
"unknown_count": self.anomaly_unknown,
"unique_transformations": len(self.anomaly_seen)
}
# Step 5: Finalize
self.state.status = MigrationStatus.COMPLETED
self.state.completed_at = datetime.now().isoformat()
self.logger.info("-" * 70)
self.logger.info("MIGRATION COMPLETE")
self.logger.info(f" Total processed: {self.state.processed_items}")
self.logger.info(f" Successful: {self.state.successful_items}")
self.logger.info(f" Failed: {self.state.failed_items}")
self.logger.info("=" * 70)
except Exception as e:
self.state.status = MigrationStatus.FAILED
self.state.completed_at = datetime.now().isoformat()
self.state.error_log.append({
"phase": "execution",
"error": str(e),
"timestamp": datetime.now().isoformat()
})
self.logger.exception("Migration failed with error")
raise
return self.state
class MigrationRollback:
"""
Handles rollback of a migration with ACID guarantees.
SQLite rollback is straightforward due to transaction support.
"""
def __init__(self, sqlite_path: Path, logger: logging.Logger):
self.sqlite_path = sqlite_path
self.logger = logger
async def execute(self) -> bool:
"""Execute rollback of migration data."""
self.logger.info("Starting rollback procedure...")
if not self.sqlite_path.exists():
self.logger.warning("SQLite database does not exist, nothing to rollback")
return True
try:
with sqlite3.connect(self.sqlite_path) as conn:
# Check current state
cursor = conn.execute("SELECT COUNT(*) FROM news_items")
count = cursor.fetchone()[0]
if count == 0:
self.logger.info("No data to rollback")
return True
self.logger.info(f"Found {count} items to remove")
# Delete all migrated data within transaction
conn.execute("BEGIN IMMEDIATE")
# Delete in correct order due to foreign keys
conn.execute("DELETE FROM news_anomalies")
conn.execute("DELETE FROM anomaly_types")
conn.execute("DELETE FROM news_items")
conn.execute("DELETE FROM stats_cache")
conn.execute("COMMIT")
self.logger.info("Rollback completed successfully")
return True
except Exception as e:
self.logger.exception(f"Rollback failed: {e}")
return False
class MigrationValidator:
"""Validates migration correctness and generates reports."""
def __init__(
self,
source_stats: Dict[str, Any],
target_stats: Dict[str, Any],
anomaly_transformations: Dict[str, Any],
errors: List[Dict[str, Any]],
batch_results: List[Dict[str, Any]],
dry_run: bool
):
self.source_stats = source_stats
self.target_stats = target_stats
self.anomaly_transformations = anomaly_transformations
self.errors = errors
self.batch_results = batch_results
self.dry_run = dry_run
def validate(self) -> Dict[str, Any]:
"""
Perform validation checks and return results.
Checks:
- Count match between source and target
- No critical errors
- Anomaly transformation coverage
"""
results = {
"passed": True,
"checks": [],
"warnings": [],
"errors": []
}
# Check 1: Count validation
source_count = self.source_stats.get("total_count", 0)
target_count = self.target_stats.get("total_count", 0)
if self.dry_run:
results["checks"].append({
"name": "count_validation",
"status": "skipped",
"message": "Dry-run mode - count validation not applicable"
})
elif source_count == target_count:
results["checks"].append({
"name": "count_validation",
"status": "passed",
"message": f"Source ({source_count}) == Target ({target_count})"
})
else:
results["passed"] = False
results["checks"].append({
"name": "count_validation",
"status": "failed",
"message": f"Source ({source_count}) != Target ({target_count})"
})
results["errors"].append(f"Count mismatch: {source_count} vs {target_count}")
# Check 2: Error count
error_count = len(self.errors)
if error_count == 0:
results["checks"].append({
"name": "error_check",
"status": "passed",
"message": "No errors during migration"
})
else:
warning_msg = f"{error_count} errors occurred during migration"
results["warnings"].append(warning_msg)
results["checks"].append({
"name": "error_check",
"status": "warning",
"message": warning_msg
})
# Check 3: Anomaly transformation coverage
unknown_count = self.anomaly_transformations.get("unknown_count", 0)
if unknown_count == 0:
results["checks"].append({
"name": "anomaly_coverage",
"status": "passed",
"message": "All anomalies mapped to known types"
})
else:
results["warnings"].append(f"{unknown_count} anomalies mapped to UNKNOWN")
results["checks"].append({
"name": "anomaly_coverage",
"status": "warning",
"message": f"{unknown_count} anomalies could not be normalized"
})
# Check 4: Batch completion
total_batches = len(self.batch_results)
if total_batches > 0:
results["checks"].append({
"name": "batch_completion",
"status": "passed",
"message": f"All {total_batches} batches processed"
})
return results
def generate_report(self) -> MigrationReport:
"""Generate comprehensive migration report."""
validation_results = self.validate()
return MigrationReport(
success=validation_results["passed"] and len(validation_results["errors"]) == 0,
dry_run=self.dry_run,
started_at=datetime.now(),
completed_at=datetime.now(),
duration_seconds=0.0, # Would be calculated by caller
source_stats=self.source_stats,
target_stats=self.target_stats,
validation_results=validation_results,
anomaly_transformation_summary={
"total_unique_transformations": self.anomaly_transformations.get("unique_transformations", 0),
"unknown_anomaly_count": self.anomaly_transformations.get("unknown_count", 0),
"transformation_examples": dict(list(self.anomaly_transformations.get("transformation_map", {}).items())[:10])
},
error_summary=self.errors[:20], # Limit to first 20 errors
batch_summary=self.batch_results
)
# ============================================================================
# Main Entry Point
# ============================================================================
async def run_migration(config: MigrationConfig) -> MigrationReport:
"""Execute migration with given configuration."""
logger = setup_logging(config.log_level)
state_manager = StateManager(Path(MIGRATION_STATE_FILE), logger)
# Check for existing migration state
existing_state = state_manager.load_state()
if existing_state and existing_state.status == MigrationStatus.IN_PROGRESS:
logger.warning(
"A migration is already in progress. "
"Use --rollback to clear it or wait for it to complete."
)
sys.exit(1)
# Initialize stores
logger.info("Initializing storage connections...")
# ChromaDB setup
if config.chroma_path:
chroma_client = chromadb.PersistentClient(path=config.chroma_path)
else:
chroma_client = chromadb.Client()
chroma_store = ChromaStore(client=chroma_client)
# SQLite setup
sqlite_path = Path(config.sqlite_path)
sqlite_path.parent.mkdir(parents=True, exist_ok=True)
sqlite_store = SQLiteStore(db_path=sqlite_path)
start_time = datetime.now()
try:
# Execute migration
migrator = LegacyDataMigrator(
chroma_store=chroma_store,
sqlite_store=sqlite_store,
batch_size=config.batch_size,
dry_run=config.dry_run,
logger=logger
)
state = await migrator.execute()
# Save state
state_manager.save_state(state)
# Generate report
validator = MigrationValidator(
source_stats=state.source_stats,
target_stats=state.target_stats,
anomaly_transformations=state.anomaly_transformations,
errors=state.error_log,
batch_results=state.batch_results,
dry_run=config.dry_run
)
report = validator.generate_report()
finally:
end_time = datetime.now()
report = MigrationReport(
success=False,
dry_run=config.dry_run,
started_at=start_time,
completed_at=end_time,
duration_seconds=(end_time - start_time).total_seconds(),
source_stats={},
target_stats={},
validation_results={},
anomaly_transformation_summary={},
error_summary=[],
batch_summary=[]
)
return report
async def run_rollback(config: MigrationConfig) -> bool:
"""Execute rollback procedure."""
logger = setup_logging(config.log_level)
state_manager = StateManager(Path(MIGRATION_STATE_FILE), logger)
# Load and verify state
state = state_manager.load_state()
if not state:
logger.error("No migration state found to rollback")
return False
if state.status != MigrationStatus.COMPLETED:
logger.warning(
f"Migration status is '{state.status.value}'. "
"Rollback may not be safe."
)
# Execute rollback
rollback = MigrationRollback(
sqlite_path=Path(config.sqlite_path),
logger=logger
)
success = await rollback.execute()
if success:
# Update state
state.status = MigrationStatus.ROLLED_BACK
state.completed_at = datetime.now().isoformat()
state_manager.save_state(state)
state_manager.clear_state()
return success
def print_report(report: MigrationReport) -> None:
"""Print formatted migration report to console."""
print("\n" + "=" * 70)
print("MIGRATION REPORT")
print("=" * 70)
print(f"\nStatus: {'SUCCESS' if report.success else 'FAILED'}")
print(f"Mode: {'DRY-RUN' if report.dry_run else 'LIVE'}")
print(f"Duration: {report.duration_seconds:.2f} seconds")
print("\n--- Source Statistics ---")
for key, value in report.source_stats.items():
if isinstance(value, dict):
print(f" {key}:")
for k, v in value.items():
print(f" {k}: {v}")
else:
print(f" {key}: {value}")
print("\n--- Target Statistics ---")
for key, value in report.target_stats.items():
if isinstance(value, dict):
print(f" {key}:")
for k, v in value.items():
print(f" {k}: {v}")
else:
print(f" {key}: {value}")
print("\n--- Validation Results ---")
for check in report.validation_results.get("checks", []):
status_icon = {
"passed": "",
"failed": "",
"warning": "",
"skipped": "-"
}.get(check["status"], "?")
print(f" [{status_icon}] {check['name']}: {check['message']}")
for warning in report.validation_results.get("warnings", []):
print(f" [⚠] WARNING: {warning}")
for error in report.validation_results.get("errors", []):
print(f" [✗] ERROR: {error}")
print("\n--- Anomaly Transformation Summary ---")
summary = report.anomaly_transformation_summary
print(f" Unique transformations: {summary.get('total_unique_transformations', 0)}")
print(f" Unknown anomalies: {summary.get('unknown_anomaly_count', 0)}")
print("\n Top transformations:")
examples = summary.get("transformation_examples", {})
for transform, count in list(examples.items())[:5]:
print(f" {transform}: {count}")
if report.error_summary:
print("\n--- Error Summary (first 20) ---")
for error in report.error_summary:
print(f" [{error.get('timestamp', 'N/A')}] {error.get('error', 'Unknown error')}")
print(f" URL: {error.get('url', 'N/A')}")
print("\n--- Batch Summary ---")
for batch in report.batch_summary:
print(f" Batch {batch.get('batch_num')}: "
f"{batch.get('success', 0)} success, "
f"{batch.get('failures', 0)} failures")
print("\n" + "=" * 70)
def save_report(report: MigrationReport, path: Path) -> None:
"""Save migration report to JSON file."""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(report.to_dict(), f, indent=2, default=str)
print(f"\nReport saved to: {path}")
def parse_args() -> MigrationConfig:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Migrate legacy ChromaDB data to normalized SQLite schema",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # Run full migration
%(prog)s --dry-run # Validate without persisting
%(prog)s --batch-size 50 # Use custom batch size
%(prog)s --rollback # Rollback last migration
%(prog)s --chroma-path /path # Custom ChromaDB path
"""
)
parser.add_argument(
"--chroma-path",
type=str,
default=os.getenv("CHROMA_DB_PATH", DEFAULT_CHROMA_PATH),
help=f"Path to ChromaDB directory (default: {DEFAULT_CHROMA_PATH})"
)
parser.add_argument(
"--sqlite-path",
type=str,
default=DEFAULT_SQLITE_PATH,
help=f"Path for SQLite shadow database (default: {DEFAULT_SQLITE_PATH})"
)
parser.add_argument(
"--batch-size",
type=int,
default=DEFAULT_BATCH_SIZE,
help=f"Number of items per batch (default: {DEFAULT_BATCH_SIZE})"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Validate migration without persisting to SQLite"
)
parser.add_argument(
"--rollback",
action="store_true",
help="Rollback the last migration"
)
parser.add_argument(
"--log-level",
type=str,
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default=DEFAULT_LOG_LEVEL,
help=f"Logging level (default: {DEFAULT_LOG_LEVEL})"
)
parser.add_argument(
"--report-path",
type=str,
help="Path to save migration report JSON"
)
args = parser.parse_args()
return MigrationConfig(
chroma_path=args.chroma_path,
sqlite_path=args.sqlite_path,
batch_size=args.batch_size,
dry_run=args.dry_run,
rollback=args.rollback,
log_level=args.log_level
)
def main() -> None:
"""Main entry point."""
config = parse_args()
# Validate paths exist for non-rollback operations
if not config.rollback:
chroma_path = Path(config.chroma_path)
if not chroma_path.exists():
print(f"Error: ChromaDB path does not exist: {chroma_path}")
sys.exit(1)
# Execute appropriate operation
if config.rollback:
success = asyncio.run(run_rollback(config))
sys.exit(0 if success else 1)
else:
report = asyncio.run(run_migration(config))
print_report(report)
# Save report if path specified
if hasattr(config, 'report_path') and config.report_path:
save_report(report, Path(config.report_path))
# Save default report
report_path = PROJECT_ROOT / "data" / f"migration_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
save_report(report, report_path)
sys.exit(0 if report.success else 1)
if __name__ == "__main__":
main()