From 3532631f3f358b63ee569dd0745b4dc4256a9dce Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 25 Dec 2025 23:01:33 +0100 Subject: [PATCH] fix: Reduce INSERT buffer size and update state after every flush MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problems identified: 1. Buffer size of batch_size * 10 (100k rows) was too large, causing migration_state to not update for several minutes on low-consolidation partitions 2. State updates only happened every 10 batches, not reflecting actual progress Changes: - Reduce insert_buffer_size from 10x to 5x batch_size (50k rows) - Update migration_state after EVERY batch flush, not every 10 batches - Add debug logging showing flush operations and total migrated count - This provides better visibility into migration progress and checkpointing For partitions with low consolidation ratio (like d0 with 1.1x), this ensures migration_state is updated more frequently, supporting better resume capability and providing visibility into actual progress. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 --- src/migrator/full_migration.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index 51394cf..65c5155 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -113,7 +113,8 @@ class FullMigrator: # Accumulate rows for batch insertion to reduce database round-trips insert_buffer = [] - insert_buffer_size = self.settings.migration.batch_size * 10 # Larger INSERT batches + # Use smaller batch size for more frequent updates: batch_size * 5 = 50k rows + insert_buffer_size = self.settings.migration.batch_size * 5 fetched_in_buffer = 0 # Track MySQL rows fetched (before consolidation) # Fetch consolidation groups from partition @@ -144,12 +145,14 @@ class FullMigrator: migrated += inserted batch_count += 1 progress.update(fetched_in_buffer) - - # Update state every 10 batches to track progress - if batch_count % 10 == 0: - self._update_migration_state( - pg_conn, migrated, None, migration_start_time - ) + # Update migration state after every batch flush + self._update_migration_state( + pg_conn, migrated, None, migration_start_time + ) + logger.debug( + f"Partition {partition}: flushed {inserted} rows, " + f"total migrated: {migrated}" + ) insert_buffer = [] fetched_in_buffer = 0 @@ -164,6 +167,10 @@ class FullMigrator: self._update_migration_state( pg_conn, migrated, None, migration_start_time ) + logger.debug( + f"Partition {partition} final flush: {inserted} rows, " + f"total migrated: {migrated}" + ) logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated")