"""Incremental migration from MySQL to PostgreSQL based on timestamps.""" from datetime import datetime from typing import Optional import psycopg from config import get_settings, TABLE_CONFIGS from src.connectors.mysql_connector import MySQLConnector from src.connectors.postgres_connector import PostgreSQLConnector from src.transformers.data_transformer import DataTransformer from src.utils.logger import get_logger, setup_logger from src.utils.progress import ProgressTracker from src.migrator.state import MigrationState logger = get_logger(__name__) class IncrementalMigrator: """Perform incremental migration based on timestamps.""" def __init__(self, table: str, state_file: str = "migration_state.json"): """Initialize incremental migrator. Args: table: Table name to migrate state_file: Path to migration state file """ if table not in TABLE_CONFIGS: raise ValueError(f"Unknown table: {table}") self.table = table self.config = TABLE_CONFIGS[table] self.settings = get_settings() self.state = MigrationState(state_file) def _get_last_timestamp_from_db( self, pg_conn: PostgreSQLConnector, pg_table: str ) -> Optional[str]: """Get last migration timestamp from PostgreSQL migration_state table. Args: pg_conn: PostgreSQL connector pg_table: PostgreSQL table name Returns: ISO format timestamp or None if not found """ try: with pg_conn.connection.cursor() as cursor: cursor.execute( "SELECT last_migrated_timestamp FROM migration_state WHERE table_name = %s", (pg_table,) ) result = cursor.fetchone() if result and result[0]: return result[0].isoformat() except psycopg.Error: return None return None def migrate(self, dry_run: bool = False, use_id: bool = False) -> int: """Perform incremental migration since last sync. Args: dry_run: If True, log what would be done but don't modify data use_id: If True, use ID-based resumption, else use timestamp-based Returns: Number of rows migrated """ setup_logger(__name__) mysql_table = self.config["mysql_table"] pg_table = self.config["postgres_table"] primary_key = self.config.get("primary_key", "id") logger.info( f"Starting incremental migration of {mysql_table} -> {pg_table} " f"({'ID-based' if use_id else 'timestamp-based'})" ) try: with MySQLConnector() as mysql_conn: with PostgreSQLConnector() as pg_conn: if use_id: return self._migrate_by_id( mysql_conn, pg_conn, mysql_table, pg_table, primary_key, dry_run ) else: return self._migrate_by_timestamp( mysql_conn, pg_conn, mysql_table, pg_table, dry_run ) except Exception as e: logger.error(f"Incremental migration failed: {e}") raise def _migrate_by_timestamp( self, mysql_conn: MySQLConnector, pg_conn: PostgreSQLConnector, mysql_table: str, pg_table: str, dry_run: bool ) -> int: """Migrate rows using timestamp-based resumption. Args: mysql_conn: MySQL connector pg_conn: PostgreSQL connector mysql_table: MySQL table name pg_table: PostgreSQL table name dry_run: If True, don't modify data Returns: Number of rows migrated """ # Try to get last migration timestamp from state file first last_timestamp = self.state.get_last_timestamp(pg_table) # If not in state file, try to get from PostgreSQL migration_state table if last_timestamp is None: try: last_timestamp = self._get_last_timestamp_from_db(pg_conn, pg_table) if last_timestamp: logger.info(f"Found previous migration state in database: {last_timestamp}") except Exception as e: logger.debug(f"Could not read from migration_state table: {e}") last_timestamp = None if last_timestamp is None: logger.info( f"No previous migration found for {pg_table}. " "Use 'migrate --full' for initial migration." ) return 0 logger.info(f"Last migration timestamp: {last_timestamp}") # Count rows to migrate timestamp_col = "updated_at" if mysql_table == "ELABDATADISP" else "created_at" # Get max timestamp from PostgreSQL pg_max_timestamp = pg_conn.get_max_timestamp( pg_table, timestamp_col ) logger.info(f"Current max timestamp in PostgreSQL: {pg_max_timestamp}") if dry_run: logger.info("[DRY RUN] Would migrate rows after timestamp") return 0 migrated = 0 migration_start_time = datetime.utcnow().isoformat() # Fetch and migrate rows in batches batch_count = 0 for batch in mysql_conn.fetch_rows_since( mysql_table, last_timestamp ): batch_count += 1 if batch_count == 1: # Create progress tracker with unknown total progress = ProgressTracker( len(batch), f"Migrating {mysql_table} (incremental)" ) progress.__enter__() # Transform batch transformed = DataTransformer.transform_batch( mysql_table, batch ) # Insert batch columns = DataTransformer.get_column_order(pg_table) inserted = pg_conn.insert_batch( pg_table, transformed, columns ) migrated += inserted progress.update(inserted) if batch_count == 0: logger.info(f"No new rows to migrate for {mysql_table}") return 0 progress.__exit__(None, None, None) # Update migration state self.state.set_last_timestamp(pg_table, migration_start_time) self.state.increment_migration_count(pg_table, migrated) logger.info( f"✓ Incremental migration complete: {migrated} rows migrated " f"to {pg_table}" ) return migrated def _migrate_by_id( self, mysql_conn: MySQLConnector, pg_conn: PostgreSQLConnector, mysql_table: str, pg_table: str, primary_key: str, dry_run: bool ) -> int: """Migrate rows using ID-based resumption (resumable from last ID). Args: mysql_conn: MySQL connector pg_conn: PostgreSQL connector mysql_table: MySQL table name pg_table: PostgreSQL table name primary_key: Primary key column name dry_run: If True, don't modify data Returns: Number of rows migrated """ # Get last migrated ID from state total_count = mysql_conn.get_row_count(mysql_table) state_dict = self.state.state.get(pg_table, {}) last_id = state_dict.get("last_id") previously_migrated = state_dict.get("total_migrated", 0) if last_id is None: logger.info( f"No previous ID-based migration found for {pg_table}. " "Starting from beginning." ) remaining = total_count else: remaining = total_count - last_id logger.info( f"Resuming ID-based migration from ID > {last_id}\n" f"Previously migrated: {previously_migrated} rows\n" f"Remaining to migrate: {remaining} rows" ) if dry_run: logger.info(f"[DRY RUN] Would migrate {remaining} rows") return remaining migrated = 0 with ProgressTracker( remaining, f"Migrating {mysql_table} (resumable)" ) as progress: # Fetch and migrate rows in batches for batch in mysql_conn.fetch_rows_from_id( mysql_table, primary_key, last_id ): if not batch: break # Transform batch transformed = DataTransformer.transform_batch( mysql_table, batch ) # Insert batch columns = DataTransformer.get_column_order(pg_table) inserted = pg_conn.insert_batch( pg_table, transformed, columns ) if inserted > 0: # Get the max ID from the batch batch_max_id = max( int(row.get(primary_key, 0)) for row in batch ) migrated += inserted progress.update(inserted) # Update state after each batch if pg_table not in self.state.state: self.state.state[pg_table] = {} self.state.state[pg_table]["last_id"] = batch_max_id self.state.state[pg_table]["total_migrated"] = previously_migrated + migrated self.state.state[pg_table]["last_updated"] = datetime.utcnow().isoformat() self.state._save_state() logger.info( f"✓ ID-based incremental migration complete: {migrated} rows migrated " f"to {pg_table}" ) return migrated def run_incremental_migration( table: str, dry_run: bool = False, state_file: str = "migration_state.json", use_id: bool = False ) -> int: """Run incremental migration for a table. Args: table: Table name to migrate dry_run: If True, show what would be done without modifying data state_file: Path to migration state file use_id: If True, use ID-based resumption, else use timestamp-based Returns: Number of rows migrated """ migrator = IncrementalMigrator(table, state_file) return migrator.migrate(dry_run=dry_run, use_id=use_id)