fix: Merge consolidation groups with same key across batch boundaries
Fix critical issue where consolidation groups with the same consolidation key (UnitName, ToolNameID, EventDate, EventTime) but arriving in different batches were being yielded separately instead of being merged. Now when a buffered group has the same key as the start of the next batch, they are prepended and consolidated together. If the key changes, the buffered group is yielded before processing the new key's rows. This fixes the issue where nodes 1-11 and 12-22 (with the same consolidation key) were being inserted as two separate rows instead of one consolidated row with all 22 nodes. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -245,7 +245,7 @@ class MySQLConnector:
|
||||
id_column = "idElabData" if table == "ELABDATADISP" else "id"
|
||||
max_retries = 3
|
||||
last_id = start_id
|
||||
buffered_group = [] # Buffer incomplete group at batch boundary
|
||||
buffered_group = [] # Buffer group at batch boundary or with continuing key
|
||||
last_buffered_key = None
|
||||
|
||||
while True:
|
||||
@@ -275,6 +275,11 @@ class MySQLConnector:
|
||||
if not rows:
|
||||
# End of partition: yield any buffered group
|
||||
if buffered_group:
|
||||
logger.info(
|
||||
f"[CONSOLIDATION DEBUG] End of partition - yielding final buffered group: "
|
||||
f"key={last_buffered_key}, rows={len(buffered_group)}, "
|
||||
f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}"
|
||||
)
|
||||
yield buffered_group
|
||||
return
|
||||
|
||||
@@ -287,10 +292,19 @@ class MySQLConnector:
|
||||
int(r.get("NodeNum") or 0)
|
||||
))
|
||||
|
||||
# If we have a buffered group, prepend it to continue
|
||||
if buffered_group:
|
||||
# If we have a buffered group, check if current batch continues the same key
|
||||
if buffered_group and sorted_rows:
|
||||
first_row_key = (
|
||||
sorted_rows[0].get("UnitName"),
|
||||
sorted_rows[0].get("ToolNameID"),
|
||||
sorted_rows[0].get("EventDate"),
|
||||
sorted_rows[0].get("EventTime")
|
||||
)
|
||||
|
||||
if first_row_key == last_buffered_key:
|
||||
# Same consolidation key continues - prepend buffered group
|
||||
logger.info(
|
||||
f"[CONSOLIDATION DEBUG] Resuming buffered group: key={last_buffered_key}, "
|
||||
f"[CONSOLIDATION DEBUG] Continuing buffered group: key={last_buffered_key}, "
|
||||
f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}, "
|
||||
f"buffered_nodes={sorted([r.get('NodeNum') for r in buffered_group])}"
|
||||
)
|
||||
@@ -300,6 +314,16 @@ class MySQLConnector:
|
||||
f"[CONSOLIDATION DEBUG] After prepending buffer: total_rows={len(sorted_rows)}, "
|
||||
f"nodes={sorted([r.get('NodeNum') for r in sorted_rows])}"
|
||||
)
|
||||
else:
|
||||
# Key changed - yield buffered group first
|
||||
logger.info(
|
||||
f"[CONSOLIDATION DEBUG] Key changed, yielding buffered group: "
|
||||
f"key={last_buffered_key}, rows={len(buffered_group)}, "
|
||||
f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}"
|
||||
)
|
||||
yield buffered_group
|
||||
buffered_group = []
|
||||
last_buffered_key = None
|
||||
|
||||
# Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime)
|
||||
current_group = []
|
||||
@@ -344,6 +368,7 @@ class MySQLConnector:
|
||||
)
|
||||
else:
|
||||
# More rows might exist - buffer the last group for next batch
|
||||
# Don't know if more rows with same key will come, so buffer it
|
||||
if current_group:
|
||||
buffered_group = current_group
|
||||
last_buffered_key = current_key
|
||||
|
||||
Reference in New Issue
Block a user