perf: Batch INSERT statements to reduce database round-trips
When processing partitions with many small consolidation groups (low consolidation ratio), the previous approach of inserting each group individually caused excessive database round-trips. Example from partition d0: - 572k MySQL rows - 514k unique consolidation keys (1.1x consolidation ratio) - 514k separate INSERT statements = severe performance bottleneck Changes: - Accumulate consolidated rows in a buffer (size = batch_size * 10) - Flush buffer to PostgreSQL when full or when partition is complete - Reduces 514k INSERT statements to ~50 batches for d0 - Significant performance improvement expected (8-10x faster for low-consolidation partitions) The progress tracker still counts MySQL source rows (before consolidation), so the progress bar remains accurate. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -111,6 +111,11 @@ class FullMigrator:
|
|||||||
logger.info(f"[{partition_idx}/{len(partitions)}] Processing partition {partition}...")
|
logger.info(f"[{partition_idx}/{len(partitions)}] Processing partition {partition}...")
|
||||||
partition_group_count = 0
|
partition_group_count = 0
|
||||||
|
|
||||||
|
# Accumulate rows for batch insertion to reduce database round-trips
|
||||||
|
insert_buffer = []
|
||||||
|
insert_buffer_size = self.settings.migration.batch_size * 10 # Larger INSERT batches
|
||||||
|
fetched_in_buffer = 0 # Track MySQL rows fetched (before consolidation)
|
||||||
|
|
||||||
# Fetch consolidation groups from partition
|
# Fetch consolidation groups from partition
|
||||||
# Each group is a list of rows with the same (unit, tool, date, time)
|
# Each group is a list of rows with the same (unit, tool, date, time)
|
||||||
for group_rows in mysql_conn.fetch_consolidation_groups_from_partition(
|
for group_rows in mysql_conn.fetch_consolidation_groups_from_partition(
|
||||||
@@ -127,19 +132,38 @@ class FullMigrator:
|
|||||||
consolidate=True
|
consolidate=True
|
||||||
)
|
)
|
||||||
|
|
||||||
# Insert consolidated rows
|
# Add to insert buffer instead of inserting immediately
|
||||||
inserted = pg_conn.insert_batch(pg_table, transformed, columns)
|
insert_buffer.extend(transformed)
|
||||||
|
partition_group_count += len(transformed)
|
||||||
|
fetched_in_buffer += len(group_rows)
|
||||||
|
|
||||||
|
# When buffer is full, flush to database
|
||||||
|
if len(insert_buffer) >= insert_buffer_size:
|
||||||
|
inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns)
|
||||||
|
if inserted > 0:
|
||||||
|
migrated += inserted
|
||||||
|
batch_count += 1
|
||||||
|
progress.update(fetched_in_buffer)
|
||||||
|
|
||||||
|
# Update state every 10 batches to track progress
|
||||||
|
if batch_count % 10 == 0:
|
||||||
|
self._update_migration_state(
|
||||||
|
pg_conn, migrated, None, migration_start_time
|
||||||
|
)
|
||||||
|
|
||||||
|
insert_buffer = []
|
||||||
|
fetched_in_buffer = 0
|
||||||
|
|
||||||
|
# Flush remaining rows in buffer for this partition
|
||||||
|
if insert_buffer:
|
||||||
|
inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns)
|
||||||
if inserted > 0:
|
if inserted > 0:
|
||||||
migrated += inserted
|
migrated += inserted
|
||||||
batch_count += 1
|
batch_count += 1
|
||||||
partition_group_count += 1
|
progress.update(fetched_in_buffer)
|
||||||
progress.update(len(group_rows))
|
self._update_migration_state(
|
||||||
|
pg_conn, migrated, None, migration_start_time
|
||||||
# Update state every 10 consolidations to track progress
|
)
|
||||||
if batch_count % 10 == 0:
|
|
||||||
self._update_migration_state(
|
|
||||||
pg_conn, migrated, None, migration_start_time
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated")
|
logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user