From 287a7ffb510e9e1f2b2cba233a177c796f892f78 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 27 Dec 2025 08:49:40 +0100 Subject: [PATCH] feat: Add consolidation support to incremental migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously incremental migration (both timestamp-based and ID-based) did not consolidate rows, resulting in one row per node instead of consolidated measurements with JSONB nodes. Solution: Add _consolidate_batch() method to group rows by consolidation key and consolidate them before transformation. Apply consolidation in both: 1. _migrate_by_timestamp() - timestamp-based incremental migration 2. _migrate_by_id() - ID-based incremental migration Changes: - For RAWDATACOR and ELABDATADISP tables: consolidate batch by grouping rows with same consolidation key before transforming - Pass consolidate=False to transform_batch since rows are already consolidated - Handle cases where batch has single rows (no consolidation needed) This ensures incremental migration produces the same consolidated output as full migration, with multiple nodes properly merged into single row with JSONB measurements. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 --- src/migrator/incremental_migration.py | 102 +++++++++++++++++++++++--- 1 file changed, 91 insertions(+), 11 deletions(-) diff --git a/src/migrator/incremental_migration.py b/src/migrator/incremental_migration.py index a0ad6c2..7e80277 100644 --- a/src/migrator/incremental_migration.py +++ b/src/migrator/incremental_migration.py @@ -158,6 +158,8 @@ class IncrementalMigrator: # Fetch and migrate rows in batches batch_count = 0 + progress = None + for batch in mysql_conn.fetch_rows_since( mysql_table, last_timestamp @@ -172,11 +174,23 @@ class IncrementalMigrator: ) progress.__enter__() - # Transform batch - transformed = DataTransformer.transform_batch( - mysql_table, - batch - ) + # For tables that need consolidation, group rows by consolidation key + # and consolidate before transforming + if mysql_table in ("RAWDATACOR", "ELABDATADISP"): + # Group batch by consolidation key + consolidated_batch = self._consolidate_batch(batch, mysql_table) + # Transform consolidated batch + transformed = DataTransformer.transform_batch( + mysql_table, + consolidated_batch, + consolidate=False # Already consolidated above + ) + else: + # No consolidation needed for other tables + transformed = DataTransformer.transform_batch( + mysql_table, + batch + ) # Insert batch columns = DataTransformer.get_column_order(pg_table) @@ -187,7 +201,8 @@ class IncrementalMigrator: ) migrated += inserted - progress.update(inserted) + if progress: + progress.update(inserted) if batch_count == 0: logger.info(f"No new rows to migrate for {mysql_table}") @@ -267,11 +282,23 @@ class IncrementalMigrator: if not batch: break - # Transform batch - transformed = DataTransformer.transform_batch( - mysql_table, - batch - ) + # For tables that need consolidation, group rows by consolidation key + # and consolidate before transforming + if mysql_table in ("RAWDATACOR", "ELABDATADISP"): + # Group batch by consolidation key + consolidated_batch = self._consolidate_batch(batch, mysql_table) + # Transform consolidated batch + transformed = DataTransformer.transform_batch( + mysql_table, + consolidated_batch, + consolidate=False # Already consolidated above + ) + else: + # No consolidation needed for other tables + transformed = DataTransformer.transform_batch( + mysql_table, + batch + ) # Insert batch columns = DataTransformer.get_column_order(pg_table) @@ -304,6 +331,59 @@ class IncrementalMigrator: return migrated + def _consolidate_batch(self, batch: list, mysql_table: str) -> list: + """Consolidate batch by grouping rows with same consolidation key. + + Args: + batch: List of rows from MySQL + mysql_table: Table name (RAWDATACOR or ELABDATADISP) + + Returns: + List of rows, already consolidated by consolidation key + """ + if not batch: + return batch + + # Group rows by consolidation key + groups = {} + for row in batch: + # Build consolidation key + if mysql_table == "ELABDATADISP": + key = ( + row.get("UnitName"), + row.get("ToolNameID"), + row.get("EventDate"), + row.get("EventTime") + ) + else: # RAWDATACOR + key = ( + row.get("UnitName"), + row.get("ToolNameID"), + row.get("EventDate"), + row.get("EventTime") + ) + + if key not in groups: + groups[key] = [] + groups[key].append(row) + + # Consolidate each group and flatten back to list + consolidated = [] + for key, group_rows in groups.items(): + if len(group_rows) == 1: + # Single row, no consolidation needed + consolidated.append(group_rows[0]) + else: + # Multiple rows with same key, consolidate them + consolidated_group = DataTransformer.consolidate_elabdatadisp_batch( + group_rows + ) if mysql_table == "ELABDATADISP" else DataTransformer.consolidate_rawdatacor_batch( + group_rows + ) + consolidated.extend(consolidated_group) + + return consolidated + def run_incremental_migration( table: str,