fix: Track last_migrated_id during migration, not just at final update

Problem: During batch flushes, last_id was passed as None to migration_state
updates. This meant the migration_state table never had the last_migrated_id
populated, making resume from specific ID impossible.

Solution: Call _get_last_migrated_id() after each batch flush and partition
completion to get the actual last inserted ID, and pass it to migration_state
updates. This ensures resume can pick up from the exact row that was last
migrated.

Changes:
- After each batch flush: get current_last_id and pass to _update_migration_state
- After partition completion: get final_last_id and pass to _update_migration_state
- This enables proper resume from specific row, not just partition boundaries

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-26 20:27:00 +01:00
parent 8c48e5eecb
commit 0e52f72dbe

View File

@@ -197,10 +197,12 @@ class FullMigrator:
migrated += inserted migrated += inserted
batch_count += 1 batch_count += 1
progress.update(fetched_in_buffer) progress.update(fetched_in_buffer)
# Get the last inserted ID for resume capability
current_last_id = self._get_last_migrated_id(pg_conn, pg_table)
# Update migration state after every batch flush # Update migration state after every batch flush
# Keep last_completed_partition if we've completed partitions before # Keep last_completed_partition if we've completed partitions before
self._update_migration_state( self._update_migration_state(
pg_conn, migrated, None, migration_start_time, pg_conn, migrated, current_last_id, migration_start_time,
last_partition=last_processed_partition last_partition=last_processed_partition
) )
logger.debug( logger.debug(
@@ -218,9 +220,11 @@ class FullMigrator:
migrated += inserted migrated += inserted
batch_count += 1 batch_count += 1
progress.update(fetched_in_buffer) progress.update(fetched_in_buffer)
# Get the last inserted ID for resume capability
current_last_id = self._get_last_migrated_id(pg_conn, pg_table)
# Keep last_completed_partition if we've completed partitions before # Keep last_completed_partition if we've completed partitions before
self._update_migration_state( self._update_migration_state(
pg_conn, migrated, None, migration_start_time, pg_conn, migrated, current_last_id, migration_start_time,
last_partition=last_processed_partition last_partition=last_processed_partition
) )
logger.debug( logger.debug(
@@ -231,8 +235,10 @@ class FullMigrator:
# NOW partition is complete - update with completed partition # NOW partition is complete - update with completed partition
logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated") logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated")
last_processed_partition = partition # Track this partition as processed last_processed_partition = partition # Track this partition as processed
# Get final last ID for this partition
final_last_id = self._get_last_migrated_id(pg_conn, pg_table)
self._update_migration_state( self._update_migration_state(
pg_conn, migrated, None, migration_start_time, pg_conn, migrated, final_last_id, migration_start_time,
last_partition=partition last_partition=partition
) )