From dfc54cf86791a6bcf5d1074cfc6bd705d3abb4e8 Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 25 Dec 2025 22:53:20 +0100 Subject: [PATCH] perf: Batch INSERT statements to reduce database round-trips MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When processing partitions with many small consolidation groups (low consolidation ratio), the previous approach of inserting each group individually caused excessive database round-trips. Example from partition d0: - 572k MySQL rows - 514k unique consolidation keys (1.1x consolidation ratio) - 514k separate INSERT statements = severe performance bottleneck Changes: - Accumulate consolidated rows in a buffer (size = batch_size * 10) - Flush buffer to PostgreSQL when full or when partition is complete - Reduces 514k INSERT statements to ~50 batches for d0 - Significant performance improvement expected (8-10x faster for low-consolidation partitions) The progress tracker still counts MySQL source rows (before consolidation), so the progress bar remains accurate. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 --- src/migrator/full_migration.py | 44 ++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index 4c1dfcb..51394cf 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -111,6 +111,11 @@ class FullMigrator: logger.info(f"[{partition_idx}/{len(partitions)}] Processing partition {partition}...") partition_group_count = 0 + # Accumulate rows for batch insertion to reduce database round-trips + insert_buffer = [] + insert_buffer_size = self.settings.migration.batch_size * 10 # Larger INSERT batches + fetched_in_buffer = 0 # Track MySQL rows fetched (before consolidation) + # Fetch consolidation groups from partition # Each group is a list of rows with the same (unit, tool, date, time) for group_rows in mysql_conn.fetch_consolidation_groups_from_partition( @@ -127,19 +132,38 @@ class FullMigrator: consolidate=True ) - # Insert consolidated rows - inserted = pg_conn.insert_batch(pg_table, transformed, columns) + # Add to insert buffer instead of inserting immediately + insert_buffer.extend(transformed) + partition_group_count += len(transformed) + fetched_in_buffer += len(group_rows) + + # When buffer is full, flush to database + if len(insert_buffer) >= insert_buffer_size: + inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns) + if inserted > 0: + migrated += inserted + batch_count += 1 + progress.update(fetched_in_buffer) + + # Update state every 10 batches to track progress + if batch_count % 10 == 0: + self._update_migration_state( + pg_conn, migrated, None, migration_start_time + ) + + insert_buffer = [] + fetched_in_buffer = 0 + + # Flush remaining rows in buffer for this partition + if insert_buffer: + inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns) if inserted > 0: migrated += inserted batch_count += 1 - partition_group_count += 1 - progress.update(len(group_rows)) - - # Update state every 10 consolidations to track progress - if batch_count % 10 == 0: - self._update_migration_state( - pg_conn, migrated, None, migration_start_time - ) + progress.update(fetched_in_buffer) + self._update_migration_state( + pg_conn, migrated, None, migration_start_time + ) logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated")