diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index 0250493..f763e6a 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -301,18 +301,21 @@ class MySQLConnector: current_group.append(row) current_key = key - # At end of batch: handle final group + # At end of batch: ALWAYS yield complete groups immediately + # Incomplete groups at batch boundary will be continued in next batch + # because we track last_key correctly for resume if not rows or len(rows) < limit: # Last batch - yield remaining group and finish if current_group: yield current_group return else: - # More rows might exist after this batch - # Check if the last row in this batch has same key as current_group - # If yes, DON'T yield yet - the group might continue in next batch - # If no, yield because we know the group is complete - if rows: + # More rows exist - yield all COMPLETE groups seen so far + # If current_group is incomplete (same key as last row), keep buffering it + if current_group: + # Check if this is a complete group or partial + # Complete group: all rows with this key have been seen in this batch + # Partial: might continue in next batch last_row = rows[-1] last_row_key = ( last_row.get("UnitName"), @@ -321,16 +324,29 @@ class MySQLConnector: last_row.get("EventTime") ) - # If last row has different key than current group, current group is complete - if last_row_key != current_key and current_group: + if last_row_key != current_key: + # Current group is complete (ended before batch boundary) yield current_group current_group = [] current_key = None - # else: same key as current_group, so continue in next iteration + # else: current group continues past batch boundary + # Keep it in current_group for merging with next batch + # BUT we need to update last_key to be the LAST yielded key, + # not the current incomplete key, so resume works correctly + # Actually NO - we should NOT update last_key if we have a pending group! - # Update last_key for next iteration - if current_key: - last_key = current_key + # Update last_key only if we don't have a pending incomplete group + if current_key is None and rows: + # All groups in this batch were yielded and complete + last_row = rows[-1] + last_key = ( + last_row.get("UnitName"), + last_row.get("ToolNameID"), + last_row.get("EventDate"), + last_row.get("EventTime") + ) + # If current_key is not None, we have incomplete group - DON'T update last_key + # so next iteration will continue from where this group started break # Success, exit retry loop