feat: Add consolidation support to incremental migration
Previously incremental migration (both timestamp-based and ID-based) did not consolidate rows, resulting in one row per node instead of consolidated measurements with JSONB nodes. Solution: Add _consolidate_batch() method to group rows by consolidation key and consolidate them before transformation. Apply consolidation in both: 1. _migrate_by_timestamp() - timestamp-based incremental migration 2. _migrate_by_id() - ID-based incremental migration Changes: - For RAWDATACOR and ELABDATADISP tables: consolidate batch by grouping rows with same consolidation key before transforming - Pass consolidate=False to transform_batch since rows are already consolidated - Handle cases where batch has single rows (no consolidation needed) This ensures incremental migration produces the same consolidated output as full migration, with multiple nodes properly merged into single row with JSONB measurements. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -158,6 +158,8 @@ class IncrementalMigrator:
|
||||
|
||||
# Fetch and migrate rows in batches
|
||||
batch_count = 0
|
||||
progress = None
|
||||
|
||||
for batch in mysql_conn.fetch_rows_since(
|
||||
mysql_table,
|
||||
last_timestamp
|
||||
@@ -172,11 +174,23 @@ class IncrementalMigrator:
|
||||
)
|
||||
progress.__enter__()
|
||||
|
||||
# Transform batch
|
||||
transformed = DataTransformer.transform_batch(
|
||||
mysql_table,
|
||||
batch
|
||||
)
|
||||
# For tables that need consolidation, group rows by consolidation key
|
||||
# and consolidate before transforming
|
||||
if mysql_table in ("RAWDATACOR", "ELABDATADISP"):
|
||||
# Group batch by consolidation key
|
||||
consolidated_batch = self._consolidate_batch(batch, mysql_table)
|
||||
# Transform consolidated batch
|
||||
transformed = DataTransformer.transform_batch(
|
||||
mysql_table,
|
||||
consolidated_batch,
|
||||
consolidate=False # Already consolidated above
|
||||
)
|
||||
else:
|
||||
# No consolidation needed for other tables
|
||||
transformed = DataTransformer.transform_batch(
|
||||
mysql_table,
|
||||
batch
|
||||
)
|
||||
|
||||
# Insert batch
|
||||
columns = DataTransformer.get_column_order(pg_table)
|
||||
@@ -187,7 +201,8 @@ class IncrementalMigrator:
|
||||
)
|
||||
|
||||
migrated += inserted
|
||||
progress.update(inserted)
|
||||
if progress:
|
||||
progress.update(inserted)
|
||||
|
||||
if batch_count == 0:
|
||||
logger.info(f"No new rows to migrate for {mysql_table}")
|
||||
@@ -267,11 +282,23 @@ class IncrementalMigrator:
|
||||
if not batch:
|
||||
break
|
||||
|
||||
# Transform batch
|
||||
transformed = DataTransformer.transform_batch(
|
||||
mysql_table,
|
||||
batch
|
||||
)
|
||||
# For tables that need consolidation, group rows by consolidation key
|
||||
# and consolidate before transforming
|
||||
if mysql_table in ("RAWDATACOR", "ELABDATADISP"):
|
||||
# Group batch by consolidation key
|
||||
consolidated_batch = self._consolidate_batch(batch, mysql_table)
|
||||
# Transform consolidated batch
|
||||
transformed = DataTransformer.transform_batch(
|
||||
mysql_table,
|
||||
consolidated_batch,
|
||||
consolidate=False # Already consolidated above
|
||||
)
|
||||
else:
|
||||
# No consolidation needed for other tables
|
||||
transformed = DataTransformer.transform_batch(
|
||||
mysql_table,
|
||||
batch
|
||||
)
|
||||
|
||||
# Insert batch
|
||||
columns = DataTransformer.get_column_order(pg_table)
|
||||
@@ -304,6 +331,59 @@ class IncrementalMigrator:
|
||||
|
||||
return migrated
|
||||
|
||||
def _consolidate_batch(self, batch: list, mysql_table: str) -> list:
|
||||
"""Consolidate batch by grouping rows with same consolidation key.
|
||||
|
||||
Args:
|
||||
batch: List of rows from MySQL
|
||||
mysql_table: Table name (RAWDATACOR or ELABDATADISP)
|
||||
|
||||
Returns:
|
||||
List of rows, already consolidated by consolidation key
|
||||
"""
|
||||
if not batch:
|
||||
return batch
|
||||
|
||||
# Group rows by consolidation key
|
||||
groups = {}
|
||||
for row in batch:
|
||||
# Build consolidation key
|
||||
if mysql_table == "ELABDATADISP":
|
||||
key = (
|
||||
row.get("UnitName"),
|
||||
row.get("ToolNameID"),
|
||||
row.get("EventDate"),
|
||||
row.get("EventTime")
|
||||
)
|
||||
else: # RAWDATACOR
|
||||
key = (
|
||||
row.get("UnitName"),
|
||||
row.get("ToolNameID"),
|
||||
row.get("EventDate"),
|
||||
row.get("EventTime")
|
||||
)
|
||||
|
||||
if key not in groups:
|
||||
groups[key] = []
|
||||
groups[key].append(row)
|
||||
|
||||
# Consolidate each group and flatten back to list
|
||||
consolidated = []
|
||||
for key, group_rows in groups.items():
|
||||
if len(group_rows) == 1:
|
||||
# Single row, no consolidation needed
|
||||
consolidated.append(group_rows[0])
|
||||
else:
|
||||
# Multiple rows with same key, consolidate them
|
||||
consolidated_group = DataTransformer.consolidate_elabdatadisp_batch(
|
||||
group_rows
|
||||
) if mysql_table == "ELABDATADISP" else DataTransformer.consolidate_rawdatacor_batch(
|
||||
group_rows
|
||||
)
|
||||
consolidated.extend(consolidated_group)
|
||||
|
||||
return consolidated
|
||||
|
||||
|
||||
def run_incremental_migration(
|
||||
table: str,
|
||||
|
||||
Reference in New Issue
Block a user