diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index aaeb8f6..a4e7350 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -421,7 +421,7 @@ class MySQLConnector: # Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime) current_group = [] - last_key = None + current_key = None is_final_batch = len(rows) < limit # True if this is the last batch for row in sorted_rows: @@ -433,31 +433,39 @@ class MySQLConnector: ) # If key changed, yield previous group and start new one - if last_key is not None and key != last_key: + if current_key is not None and key != current_key: if current_group: yield current_group + logger.debug( + f"Group yielded: key={current_key}, " + f"rows_in_group={len(current_group)}, " + f"max_id={current_group[-1][id_column]}" + ) current_group = [] current_group.append(row) - last_key = key + current_key = key # At end of batch: handle the final group if is_final_batch: - # This is the last batch - yield all remaining groups + # This is the last batch - yield the remaining group if current_group: - logger.debug( - f"Final batch: yielding group key={last_key}, " - f"rows_in_group={len(current_group)}, total_rows_fetched={len(rows)}" - ) yield current_group + logger.debug( + f"Final group yielded: key={current_key}, " + f"rows_in_group={len(current_group)}, " + f"max_id={current_group[-1][id_column]}" + ) else: # More rows might exist - buffer the last group for next batch - buffered_group = current_group - last_buffered_key = last_key - logger.debug( - f"Buffering group at boundary: key={last_key}, " - f"rows_in_group={len(current_group)}, total_rows_fetched={len(rows)}" - ) + if current_group: + buffered_group = current_group + last_buffered_key = current_key + logger.debug( + f"Group buffered at boundary: key={current_key}, " + f"rows_in_group={len(current_group)}, " + f"max_id={current_group[-1][id_column]}" + ) last_id = rows[-1][id_column] break # Success, exit retry loop