Simplify resume logic for partition-based consolidation
With partition-based consolidation, resume is now simpler: - No longer track last_migrated_id (not useful for partition iteration) - Resume capability: if rows exist in target table, migration was interrupted - Use total_rows_migrated count to calculate remaining work - Update state every 10 consolidations instead of maintaining per-batch state This aligns resume mechanism with the new partition-based architecture where we process complete consolidation groups, not sequential ID ranges. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -45,7 +45,6 @@ class FullMigrator:
|
||||
|
||||
mysql_table = self.config["mysql_table"]
|
||||
pg_table = self.config["postgres_table"]
|
||||
primary_key = self.config.get("primary_key", "id")
|
||||
|
||||
logger.info(f"Starting full migration of {mysql_table} -> {pg_table}")
|
||||
|
||||
@@ -64,10 +63,15 @@ class FullMigrator:
|
||||
)
|
||||
|
||||
# Check for previous migration state
|
||||
last_migrated_id = self._get_last_migrated_id(pg_conn, pg_table)
|
||||
# Note: With partition-based consolidation, we track progress differently
|
||||
# than with ID-based pagination. The resume capability is simplified:
|
||||
# - If data exists in table, migration was in progress
|
||||
# - Resume will continue from where we left off
|
||||
# - Full restart requires clearing the table
|
||||
|
||||
previous_migrated_count = self._get_previous_migrated_count(pg_conn, pg_table)
|
||||
|
||||
if last_migrated_id is not None:
|
||||
if previous_migrated_count > 0:
|
||||
pg_row_count = pg_conn.get_row_count(pg_table)
|
||||
logger.warning(
|
||||
f"Found previous migration state: {pg_row_count} rows already in {pg_table}"
|
||||
@@ -77,10 +81,9 @@ class FullMigrator:
|
||||
f"Migration already in progress for {pg_table}. "
|
||||
f"Use --resume to continue from last checkpoint, or delete data to restart."
|
||||
)
|
||||
logger.info(f"Resuming from ID > {last_migrated_id}")
|
||||
rows_to_migrate = total_rows - last_migrated_id
|
||||
logger.info(f"Resuming migration - found {pg_row_count} existing rows")
|
||||
rows_to_migrate = total_rows - previous_migrated_count
|
||||
else:
|
||||
last_migrated_id = None
|
||||
previous_migrated_count = 0
|
||||
rows_to_migrate = total_rows
|
||||
|
||||
@@ -128,25 +131,11 @@ class FullMigrator:
|
||||
batch_count += 1
|
||||
progress.update(len(group_rows))
|
||||
|
||||
# Update state every 10 consolidations
|
||||
# Update state every 10 consolidations to track progress
|
||||
if batch_count % 10 == 0:
|
||||
batch_max_id = max(int(r.get(primary_key, 0)) for r in group_rows)
|
||||
self._update_migration_state(
|
||||
pg_conn, migrated, batch_max_id, migration_start_time
|
||||
pg_conn, migrated, None, migration_start_time
|
||||
)
|
||||
else:
|
||||
batch_max_id = max(int(r.get(primary_key, 0)) for r in group_rows)
|
||||
try:
|
||||
with pg_conn.connection.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""UPDATE migration_state
|
||||
SET last_migrated_id = %s, last_migrated_timestamp = %s
|
||||
WHERE table_name = %s""",
|
||||
(batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table)
|
||||
)
|
||||
pg_conn.connection.commit()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to update migration state: {e}")
|
||||
|
||||
# Get final actual count from PostgreSQL
|
||||
final_count = pg_conn.get_row_count(pg_table)
|
||||
|
||||
Reference in New Issue
Block a user