From c9088d914480b7c22b73f14cfe4615be499eb84d Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 26 Dec 2025 09:16:21 +0100 Subject: [PATCH] fix: Merge consolidation groups with same key across batch boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix critical issue where consolidation groups with the same consolidation key (UnitName, ToolNameID, EventDate, EventTime) but arriving in different batches were being yielded separately instead of being merged. Now when a buffered group has the same key as the start of the next batch, they are prepended and consolidated together. If the key changes, the buffered group is yielded before processing the new key's rows. This fixes the issue where nodes 1-11 and 12-22 (with the same consolidation key) were being inserted as two separate rows instead of one consolidated row with all 22 nodes. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 --- src/connectors/mysql_connector.py | 51 +++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index f04a164..97e32e5 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -245,7 +245,7 @@ class MySQLConnector: id_column = "idElabData" if table == "ELABDATADISP" else "id" max_retries = 3 last_id = start_id - buffered_group = [] # Buffer incomplete group at batch boundary + buffered_group = [] # Buffer group at batch boundary or with continuing key last_buffered_key = None while True: @@ -275,6 +275,11 @@ class MySQLConnector: if not rows: # End of partition: yield any buffered group if buffered_group: + logger.info( + f"[CONSOLIDATION DEBUG] End of partition - yielding final buffered group: " + f"key={last_buffered_key}, rows={len(buffered_group)}, " + f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}" + ) yield buffered_group return @@ -287,20 +292,39 @@ class MySQLConnector: int(r.get("NodeNum") or 0) )) - # If we have a buffered group, prepend it to continue - if buffered_group: - logger.info( - f"[CONSOLIDATION DEBUG] Resuming buffered group: key={last_buffered_key}, " - f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}, " - f"buffered_nodes={sorted([r.get('NodeNum') for r in buffered_group])}" - ) - sorted_rows = buffered_group + sorted_rows - buffered_group = [] - logger.info( - f"[CONSOLIDATION DEBUG] After prepending buffer: total_rows={len(sorted_rows)}, " - f"nodes={sorted([r.get('NodeNum') for r in sorted_rows])}" + # If we have a buffered group, check if current batch continues the same key + if buffered_group and sorted_rows: + first_row_key = ( + sorted_rows[0].get("UnitName"), + sorted_rows[0].get("ToolNameID"), + sorted_rows[0].get("EventDate"), + sorted_rows[0].get("EventTime") ) + if first_row_key == last_buffered_key: + # Same consolidation key continues - prepend buffered group + logger.info( + f"[CONSOLIDATION DEBUG] Continuing buffered group: key={last_buffered_key}, " + f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}, " + f"buffered_nodes={sorted([r.get('NodeNum') for r in buffered_group])}" + ) + sorted_rows = buffered_group + sorted_rows + buffered_group = [] + logger.info( + f"[CONSOLIDATION DEBUG] After prepending buffer: total_rows={len(sorted_rows)}, " + f"nodes={sorted([r.get('NodeNum') for r in sorted_rows])}" + ) + else: + # Key changed - yield buffered group first + logger.info( + f"[CONSOLIDATION DEBUG] Key changed, yielding buffered group: " + f"key={last_buffered_key}, rows={len(buffered_group)}, " + f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}" + ) + yield buffered_group + buffered_group = [] + last_buffered_key = None + # Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime) current_group = [] current_key = None @@ -344,6 +368,7 @@ class MySQLConnector: ) else: # More rows might exist - buffer the last group for next batch + # Don't know if more rows with same key will come, so buffer it if current_group: buffered_group = current_group last_buffered_key = current_key