#!/usr/bin/env python3 """ Migration Script: ChromaDB Legacy Data → SQLite Normalized Schema This script migrates news items from ChromaDB (legacy format with comma-joined anomalies) to SQLite (normalized schema with proper AnomalyType enum). Features: - Dry-run mode for validation without persistence - Batch processing for large datasets - Rollback capability with transaction management (ACID) - Progress logging and error tracking - Migration validation and report generation Usage: python scripts/migrate_legacy_data.py # Full migration python scripts/migrate_legacy_data.py --dry-run # Validate without persisting python scripts/migrate_legacy_data.py --batch-size 50 # Custom batch size python scripts/migrate_legacy_data.py --rollback # Rollback last migration """ import argparse import asyncio import logging import os import sys import uuid from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import sqlite3 import json # Add project root to path for imports PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) import chromadb from chromadb.config import Settings from src.storage.chroma_store import ChromaStore from src.storage.sqlite_store import SQLiteStore from src.processor.anomaly_types import AnomalyType, normalize_anomaly_list from src.processor.dto import EnrichedNewsItemDTO # ============================================================================ # Configuration & Constants # ============================================================================ DEFAULT_BATCH_SIZE = 100 DEFAULT_CHROMA_PATH = "./chroma_db" DEFAULT_SQLITE_PATH = "./data/migration_shadow.db" DEFAULT_LOG_LEVEL = "INFO" MIGRATION_STATE_FILE = "./data/migration_state.json" class MigrationStatus(Enum): """Migration execution status.""" NOT_STARTED = "not_started" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" ROLLED_BACK = "rolled_back" @dataclass class MigrationConfig: """Configuration for migration execution.""" chroma_path: str = DEFAULT_CHROMA_PATH sqlite_path: str = DEFAULT_SQLITE_PATH batch_size: int = DEFAULT_BATCH_SIZE dry_run: bool = False rollback: bool = False log_level: str = DEFAULT_LOG_LEVEL @dataclass class MigrationState: """State tracking for migration recovery.""" status: MigrationStatus = MigrationStatus.NOT_STARTED started_at: Optional[str] = None completed_at: Optional[str] = None total_items: int = 0 processed_items: int = 0 successful_items: int = 0 failed_items: int = 0 error_log: List[Dict[str, Any]] = field(default_factory=list) source_stats: Dict[str, Any] = field(default_factory=dict) target_stats: Dict[str, Any] = field(default_factory=dict) anomaly_transformations: Dict[str, int] = field(default_factory=dict) batch_results: List[Dict[str, Any]] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return { "status": self.status.value, "started_at": self.started_at, "completed_at": self.completed_at, "total_items": self.total_items, "processed_items": self.processed_items, "successful_items": self.successful_items, "failed_items": self.failed_items, "error_log": self.error_log, "source_stats": self.source_stats, "target_stats": self.target_stats, "anomaly_transformations": self.anomaly_transformations, "batch_results": self.batch_results } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "MigrationState": state = cls() state.status = MigrationStatus(data.get("status", "not_started")) state.started_at = data.get("started_at") state.completed_at = data.get("completed_at") state.total_items = data.get("total_items", 0) state.processed_items = data.get("processed_items", 0) state.successful_items = data.get("successful_items", 0) state.failed_items = data.get("failed_items", 0) state.error_log = data.get("error_log", []) state.source_stats = data.get("source_stats", {}) state.target_stats = data.get("target_stats", {}) state.anomaly_transformations = data.get("anomaly_transformations", {}) state.batch_results = data.get("batch_results", []) return state @dataclass class MigrationReport: """Complete migration report for audit trail.""" success: bool dry_run: bool started_at: datetime completed_at: datetime duration_seconds: float source_stats: Dict[str, Any] target_stats: Dict[str, Any] validation_results: Dict[str, Any] anomaly_transformation_summary: Dict[str, Any] error_summary: List[Dict[str, Any]] batch_summary: List[Dict[str, Any]] def to_dict(self) -> Dict[str, Any]: return { "success": self.success, "dry_run": self.dry_run, "started_at": self.started_at.isoformat(), "completed_at": self.completed_at.isoformat(), "duration_seconds": self.duration_seconds, "source_stats": self.source_stats, "target_stats": self.target_stats, "validation_results": self.validation_results, "anomaly_transformation_summary": self.anomaly_transformation_summary, "error_summary": self.error_summary, "batch_summary": self.batch_summary } # ============================================================================ # Logging Setup # ============================================================================ def setup_logging(level: str) -> logging.Logger: """Configure structured logging for migration.""" log_level = getattr(logging, level.upper(), logging.INFO) # Create formatter formatter = logging.Formatter( fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(log_level) console_handler.setFormatter(formatter) # File handler for errors log_dir = PROJECT_ROOT / "logs" log_dir.mkdir(exist_ok=True) file_handler = logging.FileHandler(log_dir / "migration.log") file_handler.setLevel(logging.ERROR) file_handler.setFormatter(formatter) # Configure root logger logger = logging.getLogger("migration") logger.setLevel(log_level) logger.addHandler(console_handler) logger.addHandler(file_handler) return logger # ============================================================================ # State Persistence # ============================================================================ class StateManager: """Manages migration state persistence for recovery and rollback.""" def __init__(self, state_file: Path, logger: logging.Logger): self.state_file = state_file self.logger = logger self._ensure_state_dir() def _ensure_state_dir(self) -> None: self.state_file.parent.mkdir(parents=True, exist_ok=True) def save_state(self, state: MigrationState) -> None: """Persist migration state to disk.""" try: with open(self.state_file, "w") as f: json.dump(state.to_dict(), f, indent=2, default=str) self.logger.debug(f"State saved: {state.status.value}") except Exception as e: self.logger.error(f"Failed to save state: {e}") raise def load_state(self) -> Optional[MigrationState]: """Load previous migration state if exists.""" if not self.state_file.exists(): return None try: with open(self.state_file, "r") as f: data = json.load(f) return MigrationState.from_dict(data) except Exception as e: self.logger.warning(f"Could not load state file: {e}") return None def clear_state(self) -> None: """Remove state file after successful migration or rollback.""" if self.state_file.exists(): self.state_file.unlink() self.logger.debug("State file cleared") # ============================================================================ # Core Migration Logic # ============================================================================ class LegacyDataMigrator: """ Migrates data from ChromaDB (legacy) to SQLite (normalized schema). Key transformations: - Comma-joined anomalies → Normalized AnomalyType enum - Single metadata table → Normalized tables (news_items, anomaly_types, news_anomalies) """ def __init__( self, chroma_store: ChromaStore, sqlite_store: SQLiteStore, batch_size: int, dry_run: bool, logger: logging.Logger ): self.chroma_store = chroma_store self.sqlite_store = sqlite_store self.batch_size = batch_size self.dry_run = dry_run self.logger = logger self.state = MigrationState() # Track anomaly transformations for reporting self.anomaly_seen: Dict[str, int] = {} self.anomaly_unknown: int = 0 async def get_source_stats(self) -> Dict[str, Any]: """Gather statistics from ChromaDB source.""" stats = await self.chroma_store.get_stats() # Count anomaly types from raw data anomaly_counts: Dict[str, int] = {} all_items = await self._fetch_all_from_chroma() for item in all_items: for anomaly in item.anomalies_detected: anomaly_counts[anomaly] = anomaly_counts.get(anomaly, 0) + 1 return { "total_count": stats.get("total_count", 0), "category_counts": {k: v for k, v in stats.items() if k.startswith("category_")}, "anomaly_counts": anomaly_counts, "anomaly_unknown_count": self.anomaly_unknown } async def get_target_stats(self) -> Dict[str, Any]: """Gather statistics from SQLite target.""" stats = await self.sqlite_store.get_stats(use_cache=False) return { "total_count": stats.get("total_count", 0), "category_counts": stats.get("category_counts", {}), "source_counts": stats.get("source_counts", {}), "anomaly_counts": stats.get("anomaly_counts", {}) } async def _fetch_all_from_chroma(self) -> List[EnrichedNewsItemDTO]: """Fetch all items from ChromaDB using pagination.""" items = [] seen_urls = set() # Use get_latest with high limit to fetch all items # ChromaDB doesn't have a direct "get_all" so we use a large limit try: all_items = await self.chroma_store.get_latest(limit=10000) items.extend(all_items) except Exception as e: self.logger.warning(f"get_latest failed: {e}, trying alternative fetch") # Fallback: fetch via raw collection access items = await self._raw_fetch_all() return items async def _raw_fetch_all(self) -> List[EnrichedNewsItemDTO]: """Raw fetch all items directly from ChromaDB collection.""" import asyncio results = await asyncio.to_thread( self.chroma_store.collection.get, include=["metadatas", "documents"] ) metadatas = results.get("metadatas") or [] documents = results.get("documents") or [] items = [] for meta, doc in zip(metadatas, documents): if meta: try: dto = self.chroma_store._reconstruct_dto(meta, doc) items.append(dto) except Exception as e: self.logger.warning(f"Failed to reconstruct DTO: {e}") return items def _transform_anomalies(self, anomalies: List[str]) -> List[AnomalyType]: """ Transform comma-joined anomaly strings to normalized AnomalyType enum. This is the core transformation logic that normalizes legacy data. """ normalized = normalize_anomaly_list(anomalies) # Track transformations for reporting for anomaly_str in anomalies: normalized_type = AnomalyType.from_string(anomaly_str) key = f"{anomaly_str.strip()} → {normalized_type.value}" self.anomaly_seen[key] = self.anomaly_seen.get(key, 0) + 1 if normalized_type == AnomalyType.UNKNOWN: self.anomaly_unknown += 1 return normalized async def migrate_batch( self, items: List[EnrichedNewsItemDTO], batch_num: int ) -> Tuple[int, int, List[Dict[str, Any]]]: """ Migrate a single batch of items. Returns: Tuple of (success_count, failure_count, errors) """ success_count = 0 failure_count = 0 errors = [] for item in items: try: # Transform anomalies from legacy format anomaly_types = self._transform_anomalies(item.anomalies_detected) if not self.dry_run: # Store in SQLite with normalized anomalies await self.sqlite_store.store_with_anomalies(item, anomaly_types) success_count += 1 self.logger.debug( f"Batch {batch_num}: Migrated item '{item.title[:50]}...' " f"with {len(anomaly_types)} anomalies" ) except Exception as e: failure_count += 1 error_record = { "batch": batch_num, "url": item.url, "title": item.title[:100] if item.title else "N/A", "error": str(e), "timestamp": datetime.now().isoformat() } errors.append(error_record) self.logger.error( f"Batch {batch_num}: Failed to migrate '{item.url}': {e}" ) return success_count, failure_count, errors async def execute(self) -> MigrationState: """ Execute the full migration process. Steps: 1. Gather source statistics 2. Fetch all items from ChromaDB 3. Process in batches 4. Validate counts 5. Update state """ self.state.status = MigrationStatus.IN_PROGRESS self.state.started_at = datetime.now().isoformat() self.logger.info("=" * 70) self.logger.info("LEGACY DATA MIGRATION: ChromaDB → SQLite") self.logger.info("=" * 70) self.logger.info(f"Dry-run mode: {self.dry_run}") self.logger.info(f"Batch size: {self.batch_size}") self.logger.info(f"Source: {self.chroma_store.collection.name}") self.logger.info("-" * 70) try: # Step 1: Gather source statistics self.logger.info("Step 1/5: Gathering source statistics...") self.state.source_stats = await self.get_source_stats() self.state.total_items = self.state.source_stats.get("total_count", 0) self.logger.info( f" Source items: {self.state.total_items}, " f"Anomalies: {len(self.state.source_stats.get('anomaly_counts', {}))}" ) # Step 2: Fetch all items from ChromaDB self.logger.info("Step 2/5: Fetching items from ChromaDB...") all_items = await self._fetch_all_from_chroma() self.logger.info(f" Fetched {len(all_items)} items from ChromaDB") # Step 3: Process in batches self.logger.info("Step 3/5: Processing batches...") total_success = 0 total_failures = 0 all_errors = [] batch_results = [] for i in range(0, len(all_items), self.batch_size): batch_num = (i // self.batch_size) + 1 batch = all_items[i:i + self.batch_size] self.logger.info( f" Processing batch {batch_num} " f"(items {i+1}-{min(i+self.batch_size, len(all_items))})" ) success, failures, errors = await self.migrate_batch(batch, batch_num) total_success += success total_failures += failures all_errors.extend(errors) self.state.processed_items = i + len(batch) self.state.successful_items = total_success self.state.failed_items = total_failures batch_results.append({ "batch_num": batch_num, "size": len(batch), "success": success, "failures": failures }) self.logger.info( f" Batch {batch_num} complete: " f"{success} success, {failures} failures" ) self.state.batch_results = batch_results self.state.error_log = all_errors # Step 4: Validate self.logger.info("Step 4/5: Validating migration...") self.state.target_stats = await self.get_target_stats() if not self.dry_run: source_count = self.state.source_stats.get("total_count", 0) target_count = self.state.target_stats.get("total_count", 0) if source_count != target_count: self.logger.warning( f"Count mismatch: Source={source_count}, Target={target_count}" ) else: self.logger.info(f" Count validation passed: {target_count} items") else: self.logger.info(" Dry-run: Skipping target validation") # Track anomaly transformations self.state.anomaly_transformations = { "transformation_map": self.anomaly_seen, "unknown_count": self.anomaly_unknown, "unique_transformations": len(self.anomaly_seen) } # Step 5: Finalize self.state.status = MigrationStatus.COMPLETED self.state.completed_at = datetime.now().isoformat() self.logger.info("-" * 70) self.logger.info("MIGRATION COMPLETE") self.logger.info(f" Total processed: {self.state.processed_items}") self.logger.info(f" Successful: {self.state.successful_items}") self.logger.info(f" Failed: {self.state.failed_items}") self.logger.info("=" * 70) except Exception as e: self.state.status = MigrationStatus.FAILED self.state.completed_at = datetime.now().isoformat() self.state.error_log.append({ "phase": "execution", "error": str(e), "timestamp": datetime.now().isoformat() }) self.logger.exception("Migration failed with error") raise return self.state class MigrationRollback: """ Handles rollback of a migration with ACID guarantees. SQLite rollback is straightforward due to transaction support. """ def __init__(self, sqlite_path: Path, logger: logging.Logger): self.sqlite_path = sqlite_path self.logger = logger async def execute(self) -> bool: """Execute rollback of migration data.""" self.logger.info("Starting rollback procedure...") if not self.sqlite_path.exists(): self.logger.warning("SQLite database does not exist, nothing to rollback") return True try: with sqlite3.connect(self.sqlite_path) as conn: # Check current state cursor = conn.execute("SELECT COUNT(*) FROM news_items") count = cursor.fetchone()[0] if count == 0: self.logger.info("No data to rollback") return True self.logger.info(f"Found {count} items to remove") # Delete all migrated data within transaction conn.execute("BEGIN IMMEDIATE") # Delete in correct order due to foreign keys conn.execute("DELETE FROM news_anomalies") conn.execute("DELETE FROM anomaly_types") conn.execute("DELETE FROM news_items") conn.execute("DELETE FROM stats_cache") conn.execute("COMMIT") self.logger.info("Rollback completed successfully") return True except Exception as e: self.logger.exception(f"Rollback failed: {e}") return False class MigrationValidator: """Validates migration correctness and generates reports.""" def __init__( self, source_stats: Dict[str, Any], target_stats: Dict[str, Any], anomaly_transformations: Dict[str, Any], errors: List[Dict[str, Any]], batch_results: List[Dict[str, Any]], dry_run: bool ): self.source_stats = source_stats self.target_stats = target_stats self.anomaly_transformations = anomaly_transformations self.errors = errors self.batch_results = batch_results self.dry_run = dry_run def validate(self) -> Dict[str, Any]: """ Perform validation checks and return results. Checks: - Count match between source and target - No critical errors - Anomaly transformation coverage """ results = { "passed": True, "checks": [], "warnings": [], "errors": [] } # Check 1: Count validation source_count = self.source_stats.get("total_count", 0) target_count = self.target_stats.get("total_count", 0) if self.dry_run: results["checks"].append({ "name": "count_validation", "status": "skipped", "message": "Dry-run mode - count validation not applicable" }) elif source_count == target_count: results["checks"].append({ "name": "count_validation", "status": "passed", "message": f"Source ({source_count}) == Target ({target_count})" }) else: results["passed"] = False results["checks"].append({ "name": "count_validation", "status": "failed", "message": f"Source ({source_count}) != Target ({target_count})" }) results["errors"].append(f"Count mismatch: {source_count} vs {target_count}") # Check 2: Error count error_count = len(self.errors) if error_count == 0: results["checks"].append({ "name": "error_check", "status": "passed", "message": "No errors during migration" }) else: warning_msg = f"{error_count} errors occurred during migration" results["warnings"].append(warning_msg) results["checks"].append({ "name": "error_check", "status": "warning", "message": warning_msg }) # Check 3: Anomaly transformation coverage unknown_count = self.anomaly_transformations.get("unknown_count", 0) if unknown_count == 0: results["checks"].append({ "name": "anomaly_coverage", "status": "passed", "message": "All anomalies mapped to known types" }) else: results["warnings"].append(f"{unknown_count} anomalies mapped to UNKNOWN") results["checks"].append({ "name": "anomaly_coverage", "status": "warning", "message": f"{unknown_count} anomalies could not be normalized" }) # Check 4: Batch completion total_batches = len(self.batch_results) if total_batches > 0: results["checks"].append({ "name": "batch_completion", "status": "passed", "message": f"All {total_batches} batches processed" }) return results def generate_report(self) -> MigrationReport: """Generate comprehensive migration report.""" validation_results = self.validate() return MigrationReport( success=validation_results["passed"] and len(validation_results["errors"]) == 0, dry_run=self.dry_run, started_at=datetime.now(), completed_at=datetime.now(), duration_seconds=0.0, # Would be calculated by caller source_stats=self.source_stats, target_stats=self.target_stats, validation_results=validation_results, anomaly_transformation_summary={ "total_unique_transformations": self.anomaly_transformations.get("unique_transformations", 0), "unknown_anomaly_count": self.anomaly_transformations.get("unknown_count", 0), "transformation_examples": dict(list(self.anomaly_transformations.get("transformation_map", {}).items())[:10]) }, error_summary=self.errors[:20], # Limit to first 20 errors batch_summary=self.batch_results ) # ============================================================================ # Main Entry Point # ============================================================================ async def run_migration(config: MigrationConfig) -> MigrationReport: """Execute migration with given configuration.""" logger = setup_logging(config.log_level) state_manager = StateManager(Path(MIGRATION_STATE_FILE), logger) # Check for existing migration state existing_state = state_manager.load_state() if existing_state and existing_state.status == MigrationStatus.IN_PROGRESS: logger.warning( "A migration is already in progress. " "Use --rollback to clear it or wait for it to complete." ) sys.exit(1) # Initialize stores logger.info("Initializing storage connections...") # ChromaDB setup if config.chroma_path: chroma_client = chromadb.PersistentClient(path=config.chroma_path) else: chroma_client = chromadb.Client() chroma_store = ChromaStore(client=chroma_client) # SQLite setup sqlite_path = Path(config.sqlite_path) sqlite_path.parent.mkdir(parents=True, exist_ok=True) sqlite_store = SQLiteStore(db_path=sqlite_path) start_time = datetime.now() try: # Execute migration migrator = LegacyDataMigrator( chroma_store=chroma_store, sqlite_store=sqlite_store, batch_size=config.batch_size, dry_run=config.dry_run, logger=logger ) state = await migrator.execute() # Save state state_manager.save_state(state) # Generate report validator = MigrationValidator( source_stats=state.source_stats, target_stats=state.target_stats, anomaly_transformations=state.anomaly_transformations, errors=state.error_log, batch_results=state.batch_results, dry_run=config.dry_run ) report = validator.generate_report() finally: end_time = datetime.now() report = MigrationReport( success=False, dry_run=config.dry_run, started_at=start_time, completed_at=end_time, duration_seconds=(end_time - start_time).total_seconds(), source_stats={}, target_stats={}, validation_results={}, anomaly_transformation_summary={}, error_summary=[], batch_summary=[] ) return report async def run_rollback(config: MigrationConfig) -> bool: """Execute rollback procedure.""" logger = setup_logging(config.log_level) state_manager = StateManager(Path(MIGRATION_STATE_FILE), logger) # Load and verify state state = state_manager.load_state() if not state: logger.error("No migration state found to rollback") return False if state.status != MigrationStatus.COMPLETED: logger.warning( f"Migration status is '{state.status.value}'. " "Rollback may not be safe." ) # Execute rollback rollback = MigrationRollback( sqlite_path=Path(config.sqlite_path), logger=logger ) success = await rollback.execute() if success: # Update state state.status = MigrationStatus.ROLLED_BACK state.completed_at = datetime.now().isoformat() state_manager.save_state(state) state_manager.clear_state() return success def print_report(report: MigrationReport) -> None: """Print formatted migration report to console.""" print("\n" + "=" * 70) print("MIGRATION REPORT") print("=" * 70) print(f"\nStatus: {'SUCCESS' if report.success else 'FAILED'}") print(f"Mode: {'DRY-RUN' if report.dry_run else 'LIVE'}") print(f"Duration: {report.duration_seconds:.2f} seconds") print("\n--- Source Statistics ---") for key, value in report.source_stats.items(): if isinstance(value, dict): print(f" {key}:") for k, v in value.items(): print(f" {k}: {v}") else: print(f" {key}: {value}") print("\n--- Target Statistics ---") for key, value in report.target_stats.items(): if isinstance(value, dict): print(f" {key}:") for k, v in value.items(): print(f" {k}: {v}") else: print(f" {key}: {value}") print("\n--- Validation Results ---") for check in report.validation_results.get("checks", []): status_icon = { "passed": "✓", "failed": "✗", "warning": "⚠", "skipped": "-" }.get(check["status"], "?") print(f" [{status_icon}] {check['name']}: {check['message']}") for warning in report.validation_results.get("warnings", []): print(f" [⚠] WARNING: {warning}") for error in report.validation_results.get("errors", []): print(f" [✗] ERROR: {error}") print("\n--- Anomaly Transformation Summary ---") summary = report.anomaly_transformation_summary print(f" Unique transformations: {summary.get('total_unique_transformations', 0)}") print(f" Unknown anomalies: {summary.get('unknown_anomaly_count', 0)}") print("\n Top transformations:") examples = summary.get("transformation_examples", {}) for transform, count in list(examples.items())[:5]: print(f" {transform}: {count}") if report.error_summary: print("\n--- Error Summary (first 20) ---") for error in report.error_summary: print(f" [{error.get('timestamp', 'N/A')}] {error.get('error', 'Unknown error')}") print(f" URL: {error.get('url', 'N/A')}") print("\n--- Batch Summary ---") for batch in report.batch_summary: print(f" Batch {batch.get('batch_num')}: " f"{batch.get('success', 0)} success, " f"{batch.get('failures', 0)} failures") print("\n" + "=" * 70) def save_report(report: MigrationReport, path: Path) -> None: """Save migration report to JSON file.""" path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: json.dump(report.to_dict(), f, indent=2, default=str) print(f"\nReport saved to: {path}") def parse_args() -> MigrationConfig: """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Migrate legacy ChromaDB data to normalized SQLite schema", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s # Run full migration %(prog)s --dry-run # Validate without persisting %(prog)s --batch-size 50 # Use custom batch size %(prog)s --rollback # Rollback last migration %(prog)s --chroma-path /path # Custom ChromaDB path """ ) parser.add_argument( "--chroma-path", type=str, default=os.getenv("CHROMA_DB_PATH", DEFAULT_CHROMA_PATH), help=f"Path to ChromaDB directory (default: {DEFAULT_CHROMA_PATH})" ) parser.add_argument( "--sqlite-path", type=str, default=DEFAULT_SQLITE_PATH, help=f"Path for SQLite shadow database (default: {DEFAULT_SQLITE_PATH})" ) parser.add_argument( "--batch-size", type=int, default=DEFAULT_BATCH_SIZE, help=f"Number of items per batch (default: {DEFAULT_BATCH_SIZE})" ) parser.add_argument( "--dry-run", action="store_true", help="Validate migration without persisting to SQLite" ) parser.add_argument( "--rollback", action="store_true", help="Rollback the last migration" ) parser.add_argument( "--log-level", type=str, choices=["DEBUG", "INFO", "WARNING", "ERROR"], default=DEFAULT_LOG_LEVEL, help=f"Logging level (default: {DEFAULT_LOG_LEVEL})" ) parser.add_argument( "--report-path", type=str, help="Path to save migration report JSON" ) args = parser.parse_args() return MigrationConfig( chroma_path=args.chroma_path, sqlite_path=args.sqlite_path, batch_size=args.batch_size, dry_run=args.dry_run, rollback=args.rollback, log_level=args.log_level ) def main() -> None: """Main entry point.""" config = parse_args() # Validate paths exist for non-rollback operations if not config.rollback: chroma_path = Path(config.chroma_path) if not chroma_path.exists(): print(f"Error: ChromaDB path does not exist: {chroma_path}") sys.exit(1) # Execute appropriate operation if config.rollback: success = asyncio.run(run_rollback(config)) sys.exit(0 if success else 1) else: report = asyncio.run(run_migration(config)) print_report(report) # Save report if path specified if hasattr(config, 'report_path') and config.report_path: save_report(report, Path(config.report_path)) # Save default report report_path = PROJECT_ROOT / "data" / f"migration_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" save_report(report, report_path) sys.exit(0 if report.success else 1) if __name__ == "__main__": main()