diff --git a/src/migrator/incremental_migration.py b/src/migrator/incremental_migration.py index a0ad6c2..7e80277 100644 --- a/src/migrator/incremental_migration.py +++ b/src/migrator/incremental_migration.py @@ -158,6 +158,8 @@ class IncrementalMigrator: # Fetch and migrate rows in batches batch_count = 0 + progress = None + for batch in mysql_conn.fetch_rows_since( mysql_table, last_timestamp @@ -172,11 +174,23 @@ class IncrementalMigrator: ) progress.__enter__() - # Transform batch - transformed = DataTransformer.transform_batch( - mysql_table, - batch - ) + # For tables that need consolidation, group rows by consolidation key + # and consolidate before transforming + if mysql_table in ("RAWDATACOR", "ELABDATADISP"): + # Group batch by consolidation key + consolidated_batch = self._consolidate_batch(batch, mysql_table) + # Transform consolidated batch + transformed = DataTransformer.transform_batch( + mysql_table, + consolidated_batch, + consolidate=False # Already consolidated above + ) + else: + # No consolidation needed for other tables + transformed = DataTransformer.transform_batch( + mysql_table, + batch + ) # Insert batch columns = DataTransformer.get_column_order(pg_table) @@ -187,7 +201,8 @@ class IncrementalMigrator: ) migrated += inserted - progress.update(inserted) + if progress: + progress.update(inserted) if batch_count == 0: logger.info(f"No new rows to migrate for {mysql_table}") @@ -267,11 +282,23 @@ class IncrementalMigrator: if not batch: break - # Transform batch - transformed = DataTransformer.transform_batch( - mysql_table, - batch - ) + # For tables that need consolidation, group rows by consolidation key + # and consolidate before transforming + if mysql_table in ("RAWDATACOR", "ELABDATADISP"): + # Group batch by consolidation key + consolidated_batch = self._consolidate_batch(batch, mysql_table) + # Transform consolidated batch + transformed = DataTransformer.transform_batch( + mysql_table, + consolidated_batch, + consolidate=False # Already consolidated above + ) + else: + # No consolidation needed for other tables + transformed = DataTransformer.transform_batch( + mysql_table, + batch + ) # Insert batch columns = DataTransformer.get_column_order(pg_table) @@ -304,6 +331,59 @@ class IncrementalMigrator: return migrated + def _consolidate_batch(self, batch: list, mysql_table: str) -> list: + """Consolidate batch by grouping rows with same consolidation key. + + Args: + batch: List of rows from MySQL + mysql_table: Table name (RAWDATACOR or ELABDATADISP) + + Returns: + List of rows, already consolidated by consolidation key + """ + if not batch: + return batch + + # Group rows by consolidation key + groups = {} + for row in batch: + # Build consolidation key + if mysql_table == "ELABDATADISP": + key = ( + row.get("UnitName"), + row.get("ToolNameID"), + row.get("EventDate"), + row.get("EventTime") + ) + else: # RAWDATACOR + key = ( + row.get("UnitName"), + row.get("ToolNameID"), + row.get("EventDate"), + row.get("EventTime") + ) + + if key not in groups: + groups[key] = [] + groups[key].append(row) + + # Consolidate each group and flatten back to list + consolidated = [] + for key, group_rows in groups.items(): + if len(group_rows) == 1: + # Single row, no consolidation needed + consolidated.append(group_rows[0]) + else: + # Multiple rows with same key, consolidate them + consolidated_group = DataTransformer.consolidate_elabdatadisp_batch( + group_rows + ) if mysql_table == "ELABDATADISP" else DataTransformer.consolidate_rawdatacor_batch( + group_rows + ) + consolidated.extend(consolidated_group) + + return consolidated + def run_incremental_migration( table: str,