diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index 4c1dfcb..51394cf 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -111,6 +111,11 @@ class FullMigrator: logger.info(f"[{partition_idx}/{len(partitions)}] Processing partition {partition}...") partition_group_count = 0 + # Accumulate rows for batch insertion to reduce database round-trips + insert_buffer = [] + insert_buffer_size = self.settings.migration.batch_size * 10 # Larger INSERT batches + fetched_in_buffer = 0 # Track MySQL rows fetched (before consolidation) + # Fetch consolidation groups from partition # Each group is a list of rows with the same (unit, tool, date, time) for group_rows in mysql_conn.fetch_consolidation_groups_from_partition( @@ -127,19 +132,38 @@ class FullMigrator: consolidate=True ) - # Insert consolidated rows - inserted = pg_conn.insert_batch(pg_table, transformed, columns) + # Add to insert buffer instead of inserting immediately + insert_buffer.extend(transformed) + partition_group_count += len(transformed) + fetched_in_buffer += len(group_rows) + + # When buffer is full, flush to database + if len(insert_buffer) >= insert_buffer_size: + inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns) + if inserted > 0: + migrated += inserted + batch_count += 1 + progress.update(fetched_in_buffer) + + # Update state every 10 batches to track progress + if batch_count % 10 == 0: + self._update_migration_state( + pg_conn, migrated, None, migration_start_time + ) + + insert_buffer = [] + fetched_in_buffer = 0 + + # Flush remaining rows in buffer for this partition + if insert_buffer: + inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns) if inserted > 0: migrated += inserted batch_count += 1 - partition_group_count += 1 - progress.update(len(group_rows)) - - # Update state every 10 consolidations to track progress - if batch_count % 10 == 0: - self._update_migration_state( - pg_conn, migrated, None, migration_start_time - ) + progress.update(fetched_in_buffer) + self._update_migration_state( + pg_conn, migrated, None, migration_start_time + ) logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated")