feat: Track last completed partition for accurate resume capability

Problem: Resume was re-processing all partitions from the beginning because
migration_state didn't track which partition was the last one completed.
This caused duplicate data insertion and wasted time.

Solution:
1. Added 'last_completed_partition' column to migration_state table
2. Created _get_last_completed_partition() method to retrieve saved state
3. Updated _update_migration_state() to accept and save last_partition parameter
4. Modified migration loop to:
   - Retrieve last_completed_partition on resume
   - Skip partitions that were already completed (partition <= last_completed_partition)
   - Update last_completed_partition after each partition finishes
   - Log which partitions are being skipped during resume

Now when resuming:
- Only processes partitions after the last completed one
- Avoids re-migrating already completed partitions
- Provides clear logging showing which partitions are skipped

For example, if migration was at partition d5 when interrupted, resume will:
- Skip d0 through d5 (logging each skip)
- Continue with d6 onwards

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-25 23:30:37 +01:00
parent 3532631f3f
commit e5c87d145f

View File

@@ -70,6 +70,7 @@ class FullMigrator:
# - Full restart requires clearing the table
previous_migrated_count = self._get_previous_migrated_count(pg_conn, pg_table)
last_completed_partition = self._get_last_completed_partition(pg_conn, pg_table)
if previous_migrated_count > 0:
pg_row_count = pg_conn.get_row_count(pg_table)
@@ -82,11 +83,14 @@ class FullMigrator:
f"Use --resume to continue from last checkpoint, or delete data to restart."
)
logger.info(f"Resuming migration - found {pg_row_count} existing rows")
if last_completed_partition:
logger.info(f"Last completed partition: {last_completed_partition}")
# Progress bar tracks MySQL rows processed (before consolidation)
# Consolidation reduces count but not the rows we need to fetch
rows_to_migrate = total_rows
else:
previous_migrated_count = 0
last_completed_partition = None
rows_to_migrate = total_rows
if dry_run:
@@ -108,6 +112,11 @@ class FullMigrator:
logger.info(f"Found {len(partitions)} partitions for {mysql_table}")
for partition_idx, partition in enumerate(partitions, 1):
# Skip partitions already completed in previous run
if last_completed_partition and partition <= last_completed_partition:
logger.info(f"[{partition_idx}/{len(partitions)}] Skipping partition {partition} (already completed)")
continue
logger.info(f"[{partition_idx}/{len(partitions)}] Processing partition {partition}...")
partition_group_count = 0
@@ -147,7 +156,8 @@ class FullMigrator:
progress.update(fetched_in_buffer)
# Update migration state after every batch flush
self._update_migration_state(
pg_conn, migrated, None, migration_start_time
pg_conn, migrated, None, migration_start_time,
last_partition=partition
)
logger.debug(
f"Partition {partition}: flushed {inserted} rows, "
@@ -165,7 +175,8 @@ class FullMigrator:
batch_count += 1
progress.update(fetched_in_buffer)
self._update_migration_state(
pg_conn, migrated, None, migration_start_time
pg_conn, migrated, None, migration_start_time,
last_partition=partition
)
logger.debug(
f"Partition {partition} final flush: {inserted} rows, "
@@ -254,13 +265,37 @@ class FullMigrator:
pass
return 0
def _get_last_completed_partition(self, pg_conn: PostgreSQLConnector, pg_table: str) -> Optional[str]:
"""Get the last completed partition from migration_state table.
Args:
pg_conn: PostgreSQL connection
pg_table: PostgreSQL table name
Returns:
Last completed partition name or None if no previous migration
"""
try:
with pg_conn.connection.cursor() as cursor:
cursor.execute(
"SELECT last_completed_partition FROM migration_state WHERE table_name = %s",
(pg_table,)
)
result = cursor.fetchone()
if result and result[0]:
return result[0]
except Exception:
pass
return None
def _update_migration_state(
self,
pg_conn: PostgreSQLConnector,
rows_migrated: int,
last_id: Optional[int] = None,
migration_start_time: Optional[str] = None,
is_final: bool = False
is_final: bool = False,
last_partition: Optional[str] = None
) -> None:
"""Update migration state in PostgreSQL and state file.
@@ -270,6 +305,7 @@ class FullMigrator:
last_id: Last ID that was migrated (for resume capability)
migration_start_time: When the migration started (ISO format)
is_final: If True, mark migration as completed
last_partition: Name of the last completed partition
"""
pg_table = self.config["postgres_table"]
now = datetime.utcnow()
@@ -280,14 +316,16 @@ class FullMigrator:
with pg_conn.connection.cursor() as cursor:
query = f"""
INSERT INTO migration_state
(table_name, last_migrated_timestamp, last_migrated_id, total_rows_migrated, migration_completed_at, status)
VALUES (%s, %s, %s, %s, %s, %s)
(table_name, last_migrated_timestamp, last_migrated_id, total_rows_migrated,
migration_completed_at, status, last_completed_partition)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (table_name) DO UPDATE SET
last_migrated_timestamp = EXCLUDED.last_migrated_timestamp,
last_migrated_id = EXCLUDED.last_migrated_id,
total_rows_migrated = EXCLUDED.total_rows_migrated,
migration_completed_at = EXCLUDED.migration_completed_at,
status = EXCLUDED.status
status = EXCLUDED.status,
last_completed_partition = EXCLUDED.last_completed_partition
"""
cursor.execute(
query,
@@ -297,7 +335,8 @@ class FullMigrator:
last_id,
rows_migrated,
now if status == "completed" else None,
status
status,
last_partition
)
)
pg_conn.connection.commit()