diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index 65c5155..c5d5c02 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -70,6 +70,7 @@ class FullMigrator: # - Full restart requires clearing the table previous_migrated_count = self._get_previous_migrated_count(pg_conn, pg_table) + last_completed_partition = self._get_last_completed_partition(pg_conn, pg_table) if previous_migrated_count > 0: pg_row_count = pg_conn.get_row_count(pg_table) @@ -82,11 +83,14 @@ class FullMigrator: f"Use --resume to continue from last checkpoint, or delete data to restart." ) logger.info(f"Resuming migration - found {pg_row_count} existing rows") + if last_completed_partition: + logger.info(f"Last completed partition: {last_completed_partition}") # Progress bar tracks MySQL rows processed (before consolidation) # Consolidation reduces count but not the rows we need to fetch rows_to_migrate = total_rows else: previous_migrated_count = 0 + last_completed_partition = None rows_to_migrate = total_rows if dry_run: @@ -108,6 +112,11 @@ class FullMigrator: logger.info(f"Found {len(partitions)} partitions for {mysql_table}") for partition_idx, partition in enumerate(partitions, 1): + # Skip partitions already completed in previous run + if last_completed_partition and partition <= last_completed_partition: + logger.info(f"[{partition_idx}/{len(partitions)}] Skipping partition {partition} (already completed)") + continue + logger.info(f"[{partition_idx}/{len(partitions)}] Processing partition {partition}...") partition_group_count = 0 @@ -147,7 +156,8 @@ class FullMigrator: progress.update(fetched_in_buffer) # Update migration state after every batch flush self._update_migration_state( - pg_conn, migrated, None, migration_start_time + pg_conn, migrated, None, migration_start_time, + last_partition=partition ) logger.debug( f"Partition {partition}: flushed {inserted} rows, " @@ -165,7 +175,8 @@ class FullMigrator: batch_count += 1 progress.update(fetched_in_buffer) self._update_migration_state( - pg_conn, migrated, None, migration_start_time + pg_conn, migrated, None, migration_start_time, + last_partition=partition ) logger.debug( f"Partition {partition} final flush: {inserted} rows, " @@ -254,13 +265,37 @@ class FullMigrator: pass return 0 + def _get_last_completed_partition(self, pg_conn: PostgreSQLConnector, pg_table: str) -> Optional[str]: + """Get the last completed partition from migration_state table. + + Args: + pg_conn: PostgreSQL connection + pg_table: PostgreSQL table name + + Returns: + Last completed partition name or None if no previous migration + """ + try: + with pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT last_completed_partition FROM migration_state WHERE table_name = %s", + (pg_table,) + ) + result = cursor.fetchone() + if result and result[0]: + return result[0] + except Exception: + pass + return None + def _update_migration_state( self, pg_conn: PostgreSQLConnector, rows_migrated: int, last_id: Optional[int] = None, migration_start_time: Optional[str] = None, - is_final: bool = False + is_final: bool = False, + last_partition: Optional[str] = None ) -> None: """Update migration state in PostgreSQL and state file. @@ -270,6 +305,7 @@ class FullMigrator: last_id: Last ID that was migrated (for resume capability) migration_start_time: When the migration started (ISO format) is_final: If True, mark migration as completed + last_partition: Name of the last completed partition """ pg_table = self.config["postgres_table"] now = datetime.utcnow() @@ -280,14 +316,16 @@ class FullMigrator: with pg_conn.connection.cursor() as cursor: query = f""" INSERT INTO migration_state - (table_name, last_migrated_timestamp, last_migrated_id, total_rows_migrated, migration_completed_at, status) - VALUES (%s, %s, %s, %s, %s, %s) + (table_name, last_migrated_timestamp, last_migrated_id, total_rows_migrated, + migration_completed_at, status, last_completed_partition) + VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (table_name) DO UPDATE SET last_migrated_timestamp = EXCLUDED.last_migrated_timestamp, last_migrated_id = EXCLUDED.last_migrated_id, total_rows_migrated = EXCLUDED.total_rows_migrated, migration_completed_at = EXCLUDED.migration_completed_at, - status = EXCLUDED.status + status = EXCLUDED.status, + last_completed_partition = EXCLUDED.last_completed_partition """ cursor.execute( query, @@ -297,7 +335,8 @@ class FullMigrator: last_id, rows_migrated, now if status == "completed" else None, - status + status, + last_partition ) ) pg_conn.connection.commit()