From 418da67857e70033e6e2fdadd5a6033c762021ca Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 27 Dec 2025 09:54:42 +0100 Subject: [PATCH] fix: Properly handle incomplete consolidation groups at batch boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem: When a batch ended with an incomplete group (same consolidation key as last row), the code was not yielding it (correct), but was also updating last_key to that incomplete key. Next iteration would query: WHERE (key) > (incomplete_key) This would SKIP all remaining rows of that key that were on next page! Result: Groups that span batch boundaries got split - e.g., nodes 1-11 in first batch yield as incomplete (not yielded), nodes 12-22 never fetched because next query starts AFTER the incomplete key. Fix: Track whether current_group is incomplete (pending) at batch boundary. If incomplete (last_row_key == current_key), keep it in memory and DON'T update last_key. This ensures next batch continues from where the incomplete group started, fetching remaining rows of that key. Logic: - If last_row_key != current_key: group is complete, yield it - If last_row_key == current_key: group is incomplete, keep buffering - If current_key is None: all groups complete, update last_key normally - If current_key is not None: group pending, DON'T update last_key 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 --- src/connectors/mysql_connector.py | 40 +++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index 0250493..f763e6a 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -301,18 +301,21 @@ class MySQLConnector: current_group.append(row) current_key = key - # At end of batch: handle final group + # At end of batch: ALWAYS yield complete groups immediately + # Incomplete groups at batch boundary will be continued in next batch + # because we track last_key correctly for resume if not rows or len(rows) < limit: # Last batch - yield remaining group and finish if current_group: yield current_group return else: - # More rows might exist after this batch - # Check if the last row in this batch has same key as current_group - # If yes, DON'T yield yet - the group might continue in next batch - # If no, yield because we know the group is complete - if rows: + # More rows exist - yield all COMPLETE groups seen so far + # If current_group is incomplete (same key as last row), keep buffering it + if current_group: + # Check if this is a complete group or partial + # Complete group: all rows with this key have been seen in this batch + # Partial: might continue in next batch last_row = rows[-1] last_row_key = ( last_row.get("UnitName"), @@ -321,16 +324,29 @@ class MySQLConnector: last_row.get("EventTime") ) - # If last row has different key than current group, current group is complete - if last_row_key != current_key and current_group: + if last_row_key != current_key: + # Current group is complete (ended before batch boundary) yield current_group current_group = [] current_key = None - # else: same key as current_group, so continue in next iteration + # else: current group continues past batch boundary + # Keep it in current_group for merging with next batch + # BUT we need to update last_key to be the LAST yielded key, + # not the current incomplete key, so resume works correctly + # Actually NO - we should NOT update last_key if we have a pending group! - # Update last_key for next iteration - if current_key: - last_key = current_key + # Update last_key only if we don't have a pending incomplete group + if current_key is None and rows: + # All groups in this batch were yielded and complete + last_row = rows[-1] + last_key = ( + last_row.get("UnitName"), + last_row.get("ToolNameID"), + last_row.get("EventDate"), + last_row.get("EventTime") + ) + # If current_key is not None, we have incomplete group - DON'T update last_key + # so next iteration will continue from where this group started break # Success, exit retry loop