Fix ELABDATADISP consolidation by consolidating across batches

Previously, consolidation happened per-batch, which meant if the same
(unit, tool, date, time) group spanned multiple batches, nodes would be
split into separate rows. For example, nodes 1-32 would be split into 4
separate rows instead of 1 consolidated row.

Now, we buffer rows with the same consolidation key and only consolidate
when we see a NEW consolidation key. This ensures all nodes of the same
group are consolidated together, regardless of batch boundaries.

Results: Proper 25:1 consolidation ratio with all nodes grouped correctly.

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-25 20:23:31 +01:00
parent 9cc12abe11
commit a394de99ef
2 changed files with 83 additions and 54 deletions

View File

@@ -227,8 +227,9 @@ class MySQLConnector:
) -> Generator[List[Dict[str, Any]], None, None]:
"""Fetch rows using keyset pagination for efficient consolidation.
Uses keyset pagination (id-based) to avoid expensive re-sorting with OFFSET.
Consolidation happens within each batch in Python.
Fetches rows by ID for fast, efficient keyset pagination.
Rows are then sorted by consolidation key in Python after fetching to ensure
all nodes with the same (Unit, Tool, Date, Time) stay together in the same batch.
Args:
table: Table name ('RAWDATACOR' or 'ELABDATADISP')
@@ -255,17 +256,15 @@ class MySQLConnector:
while retries < max_retries:
try:
with self.connection.cursor() as cursor:
# Use keyset pagination: fetch by id > last_id
# This is much more efficient than OFFSET for large tables
# Order by id first for pagination, then by consolidation key to keep
# related nodes together in the same batch
order_clause = f"`{id_column}` ASC, `UnitName` ASC, `ToolNameID` ASC, `EventDate` ASC, `EventTime` ASC, `NodeNum` ASC"
# Fetch by ID using keyset pagination (fast, uses primary key)
# Python will sort by consolidation key after fetching
if last_id is None:
query = f"SELECT * FROM `{table}` ORDER BY {order_clause} LIMIT %s"
query = f"SELECT * FROM `{table}` ORDER BY `{id_column}` ASC LIMIT %s"
cursor.execute(query, (batch_size,))
else:
query = f"SELECT * FROM `{table}` WHERE `{id_column}` > %s ORDER BY {order_clause} LIMIT %s"
# For resume: fetch rows with id > last_id
query = f"SELECT * FROM `{table}` WHERE `{id_column}` > %s ORDER BY `{id_column}` ASC LIMIT %s"
cursor.execute(query, (last_id, batch_size))
rows = cursor.fetchall()

View File

@@ -96,6 +96,13 @@ class FullMigrator:
rows_to_migrate,
f"Migrating {mysql_table}"
) as progress:
# Consolidate across batches by buffering rows with the same consolidation key
# This ensures all nodes of the same (unit, tool, timestamp) are consolidated together
row_buffer = []
last_consolidation_key = None
columns = DataTransformer.get_column_order(pg_table)
total_mysql_rows = 0
# Fetch and migrate rows in batches
# Use ordered fetching for node consolidation with resume support
for batch in mysql_conn.fetch_rows_ordered_for_consolidation(
@@ -105,59 +112,82 @@ class FullMigrator:
if not batch:
break
# Track MySQL rows processed for progress (before consolidation)
batch_size = len(batch)
# Sort batch by consolidation key
sorted_batch = sorted(batch, key=lambda r: (
r.get("UnitName") or "",
r.get("ToolNameID") or "",
str(r.get("EventDate") or ""),
str(r.get("EventTime") or ""),
int(r.get("NodeNum") or 0)
))
# Transform batch with consolidation enabled
# Process each row, consolidating when consolidation key changes
for row in sorted_batch:
# Extract consolidation key
consolidation_key = (
row.get("UnitName"),
row.get("ToolNameID"),
row.get("EventDate"),
row.get("EventTime")
)
# If consolidation key changed, consolidate the buffer
if last_consolidation_key is not None and consolidation_key != last_consolidation_key:
# Consolidate buffered rows
transformed = DataTransformer.transform_batch(
mysql_table,
row_buffer,
consolidate=True
)
# Insert consolidated rows
inserted = pg_conn.insert_batch(pg_table, transformed, columns)
if inserted > 0:
migrated += inserted
batch_count += 1
progress.update(len(row_buffer))
total_mysql_rows += len(row_buffer)
# Update state every 10 inserts
if batch_count % 10 == 0:
batch_max_id = max(int(r.get(primary_key, 0)) for r in row_buffer)
self._update_migration_state(
pg_conn, migrated, batch_max_id, migration_start_time
)
else:
batch_max_id = max(int(r.get(primary_key, 0)) for r in row_buffer)
try:
with pg_conn.connection.cursor() as cursor:
cursor.execute(
"""UPDATE migration_state
SET last_migrated_id = %s, last_migrated_timestamp = %s
WHERE table_name = %s""",
(batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table)
)
pg_conn.connection.commit()
except Exception as e:
logger.warning(f"Failed to update migration state: {e}")
# Reset buffer
row_buffer = []
# Add row to buffer
row_buffer.append(row)
last_consolidation_key = consolidation_key
# Consolidate any remaining rows in buffer
if row_buffer:
transformed = DataTransformer.transform_batch(
mysql_table,
batch,
row_buffer,
consolidate=True
)
# Insert batch
columns = DataTransformer.get_column_order(pg_table)
inserted = pg_conn.insert_batch(
pg_table,
transformed,
columns
)
inserted = pg_conn.insert_batch(pg_table, transformed, columns)
if inserted > 0:
# Update progress based on MySQL rows processed
# (not PostgreSQL rows inserted, since consolidation reduces count)
progress.update(batch_size)
# Accumulate inserted count locally
migrated += inserted
batch_count += 1
# Update state periodically (every 10 batches) to avoid expensive COUNT(*) queries
# Always update on last batch (will be detected when loop ends)
if batch_count % 10 == 0:
batch_max_id = max(
int(row.get("id", 0)) for row in batch
)
# Update with accumulated local count (cheaper than COUNT(*))
self._update_migration_state(
pg_conn, migrated, batch_max_id, migration_start_time
)
else:
# Still update last_migrated_id for resume, but not total count
batch_max_id = max(
int(row.get("id", 0)) for row in batch
)
try:
with pg_conn.connection.cursor() as cursor:
cursor.execute(
"""UPDATE migration_state
SET last_migrated_id = %s, last_migrated_timestamp = %s
WHERE table_name = %s""",
(batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table)
)
pg_conn.connection.commit()
except Exception as e:
logger.warning(f"Failed to update migration state: {e}")
progress.update(len(row_buffer))
total_mysql_rows += len(row_buffer)
# Get final actual count from PostgreSQL
final_count = pg_conn.get_row_count(pg_table)