diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index f04a164..97e32e5 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -245,7 +245,7 @@ class MySQLConnector: id_column = "idElabData" if table == "ELABDATADISP" else "id" max_retries = 3 last_id = start_id - buffered_group = [] # Buffer incomplete group at batch boundary + buffered_group = [] # Buffer group at batch boundary or with continuing key last_buffered_key = None while True: @@ -275,6 +275,11 @@ class MySQLConnector: if not rows: # End of partition: yield any buffered group if buffered_group: + logger.info( + f"[CONSOLIDATION DEBUG] End of partition - yielding final buffered group: " + f"key={last_buffered_key}, rows={len(buffered_group)}, " + f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}" + ) yield buffered_group return @@ -287,20 +292,39 @@ class MySQLConnector: int(r.get("NodeNum") or 0) )) - # If we have a buffered group, prepend it to continue - if buffered_group: - logger.info( - f"[CONSOLIDATION DEBUG] Resuming buffered group: key={last_buffered_key}, " - f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}, " - f"buffered_nodes={sorted([r.get('NodeNum') for r in buffered_group])}" - ) - sorted_rows = buffered_group + sorted_rows - buffered_group = [] - logger.info( - f"[CONSOLIDATION DEBUG] After prepending buffer: total_rows={len(sorted_rows)}, " - f"nodes={sorted([r.get('NodeNum') for r in sorted_rows])}" + # If we have a buffered group, check if current batch continues the same key + if buffered_group and sorted_rows: + first_row_key = ( + sorted_rows[0].get("UnitName"), + sorted_rows[0].get("ToolNameID"), + sorted_rows[0].get("EventDate"), + sorted_rows[0].get("EventTime") ) + if first_row_key == last_buffered_key: + # Same consolidation key continues - prepend buffered group + logger.info( + f"[CONSOLIDATION DEBUG] Continuing buffered group: key={last_buffered_key}, " + f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}, " + f"buffered_nodes={sorted([r.get('NodeNum') for r in buffered_group])}" + ) + sorted_rows = buffered_group + sorted_rows + buffered_group = [] + logger.info( + f"[CONSOLIDATION DEBUG] After prepending buffer: total_rows={len(sorted_rows)}, " + f"nodes={sorted([r.get('NodeNum') for r in sorted_rows])}" + ) + else: + # Key changed - yield buffered group first + logger.info( + f"[CONSOLIDATION DEBUG] Key changed, yielding buffered group: " + f"key={last_buffered_key}, rows={len(buffered_group)}, " + f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}" + ) + yield buffered_group + buffered_group = [] + last_buffered_key = None + # Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime) current_group = [] current_key = None @@ -344,6 +368,7 @@ class MySQLConnector: ) else: # More rows might exist - buffer the last group for next batch + # Don't know if more rows with same key will come, so buffer it if current_group: buffered_group = current_group last_buffered_key = current_key