From 255fb1c5200d6708c86411c0751c16e9b7be4a1a Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 25 Dec 2025 21:54:40 +0100 Subject: [PATCH] Simplify resume logic for partition-based consolidation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With partition-based consolidation, resume is now simpler: - No longer track last_migrated_id (not useful for partition iteration) - Resume capability: if rows exist in target table, migration was interrupted - Use total_rows_migrated count to calculate remaining work - Update state every 10 consolidations instead of maintaining per-batch state This aligns resume mechanism with the new partition-based architecture where we process complete consolidation groups, not sequential ID ranges. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 --- src/migrator/full_migration.py | 33 +++++++++++---------------------- 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index aaa89c8..e437f93 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -45,7 +45,6 @@ class FullMigrator: mysql_table = self.config["mysql_table"] pg_table = self.config["postgres_table"] - primary_key = self.config.get("primary_key", "id") logger.info(f"Starting full migration of {mysql_table} -> {pg_table}") @@ -64,10 +63,15 @@ class FullMigrator: ) # Check for previous migration state - last_migrated_id = self._get_last_migrated_id(pg_conn, pg_table) + # Note: With partition-based consolidation, we track progress differently + # than with ID-based pagination. The resume capability is simplified: + # - If data exists in table, migration was in progress + # - Resume will continue from where we left off + # - Full restart requires clearing the table + previous_migrated_count = self._get_previous_migrated_count(pg_conn, pg_table) - if last_migrated_id is not None: + if previous_migrated_count > 0: pg_row_count = pg_conn.get_row_count(pg_table) logger.warning( f"Found previous migration state: {pg_row_count} rows already in {pg_table}" @@ -77,10 +81,9 @@ class FullMigrator: f"Migration already in progress for {pg_table}. " f"Use --resume to continue from last checkpoint, or delete data to restart." ) - logger.info(f"Resuming from ID > {last_migrated_id}") - rows_to_migrate = total_rows - last_migrated_id + logger.info(f"Resuming migration - found {pg_row_count} existing rows") + rows_to_migrate = total_rows - previous_migrated_count else: - last_migrated_id = None previous_migrated_count = 0 rows_to_migrate = total_rows @@ -128,25 +131,11 @@ class FullMigrator: batch_count += 1 progress.update(len(group_rows)) - # Update state every 10 consolidations + # Update state every 10 consolidations to track progress if batch_count % 10 == 0: - batch_max_id = max(int(r.get(primary_key, 0)) for r in group_rows) self._update_migration_state( - pg_conn, migrated, batch_max_id, migration_start_time + pg_conn, migrated, None, migration_start_time ) - else: - batch_max_id = max(int(r.get(primary_key, 0)) for r in group_rows) - try: - with pg_conn.connection.cursor() as cursor: - cursor.execute( - """UPDATE migration_state - SET last_migrated_id = %s, last_migrated_timestamp = %s - WHERE table_name = %s""", - (batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table) - ) - pg_conn.connection.commit() - except Exception as e: - logger.warning(f"Failed to update migration state: {e}") # Get final actual count from PostgreSQL final_count = pg_conn.get_row_count(pg_table)