diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index 53ae041..d5dc61a 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -283,7 +283,9 @@ class MySQLConnector: yield buffered_group return - # Sort fetched rows by consolidation key for grouping + # Sort by consolidation key THEN by NodeNum + # This ensures all nodes of the same measurement are together, + # and when NodeNum decreases, we know we've started a new measurement sorted_rows = sorted(rows, key=lambda r: ( r.get("UnitName") or "", r.get("ToolNameID") or "", @@ -292,7 +294,7 @@ class MySQLConnector: int(r.get("NodeNum") or 0) )) - # If we have a buffered group, check if current batch continues the same key + # Prepend any buffered group that belongs to the same consolidation key if buffered_group and sorted_rows: first_row_key = ( sorted_rows[0].get("UnitName"), @@ -302,33 +304,27 @@ class MySQLConnector: ) if first_row_key == last_buffered_key: - # Same consolidation key continues - prepend buffered group logger.info( - f"[CONSOLIDATION DEBUG] Continuing buffered group: key={last_buffered_key}, " - f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}, " - f"buffered_nodes={sorted([r.get('NodeNum') for r in buffered_group])}" + f"[CONSOLIDATION DEBUG] Merging buffered group with new batch: key={last_buffered_key}, " + f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}" ) sorted_rows = buffered_group + sorted_rows buffered_group = [] - logger.info( - f"[CONSOLIDATION DEBUG] After prepending buffer: total_rows={len(sorted_rows)}, " - f"nodes={sorted([r.get('NodeNum') for r in sorted_rows])}" - ) else: - # Key changed - yield buffered group first + # Buffered group belongs to different key - yield it first logger.info( - f"[CONSOLIDATION DEBUG] Key changed, yielding buffered group: " - f"key={last_buffered_key}, rows={len(buffered_group)}, " - f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}" + f"[CONSOLIDATION DEBUG] Yielding buffered group (key change): " + f"key={last_buffered_key}, rows={len(buffered_group)}" ) yield buffered_group buffered_group = [] last_buffered_key = None - # Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime) + # Group rows by consolidation key + detect group boundaries by NodeNum + # When NodeNum decreases, we've moved to a new measurement current_group = [] current_key = None - is_final_batch = len(rows) < limit # True if this is the last batch + last_node_num = None for row in sorted_rows: key = ( @@ -337,54 +333,42 @@ class MySQLConnector: row.get("EventDate"), row.get("EventTime") ) + node_num = int(row.get("NodeNum") or 0) - # If key changed, yield previous group and start new one - if current_key is not None and key != current_key: + # Detect group boundary: key changed OR NodeNum decreased (new measurement started) + if current_key is not None and (key != current_key or (last_node_num is not None and node_num < last_node_num)): if current_group: nodes = sorted([r.get('NodeNum') for r in current_group]) logger.info( - f"[CONSOLIDATION DEBUG] Key change within batch - yielding: key={current_key}, " + f"[CONSOLIDATION DEBUG] Group boundary detected: key={current_key}, " f"rows={len(current_group)}, nodes={nodes}" ) yield current_group - logger.debug( - f"Group yielded: key={current_key}, " - f"rows_in_group={len(current_group)}, " - f"max_id={current_group[-1][id_column]}" - ) current_group = [] current_group.append(row) current_key = key + last_node_num = node_num # At end of batch: handle the final group - if is_final_batch: + if not rows or len(rows) < limit: # This is the last batch - yield the remaining group if current_group: + nodes = sorted([r.get('NodeNum') for r in current_group]) logger.info( f"[CONSOLIDATION DEBUG] Final batch - yielding group: key={current_key}, " - f"rows={len(current_group)}, nodes={sorted([r.get('NodeNum') for r in current_group])}" + f"rows={len(current_group)}, nodes={nodes}" ) yield current_group - logger.debug( - f"Final group yielded: key={current_key}, " - f"rows_in_group={len(current_group)}, " - f"max_id={current_group[-1][id_column]}" - ) else: - # More rows might exist - buffer the last group for next batch - # Don't know if more rows with same key will come, so buffer it + # More rows might exist - buffer the last group if current_group: buffered_group = current_group last_buffered_key = current_key + nodes = sorted([r.get('NodeNum') for r in current_group]) logger.info( - f"[CONSOLIDATION DEBUG] Buffering group at boundary: key={current_key}, " - f"rows={len(current_group)}, nodes={sorted([r.get('NodeNum') for r in current_group])}" - ) - logger.debug( - f"Group buffered at boundary: key={current_key}, " - f"rows_in_group={len(current_group)}, " - f"max_id={current_group[-1][id_column]}" + f"[CONSOLIDATION DEBUG] Buffering group at batch boundary: key={current_key}, " + f"rows={len(current_group)}, nodes={nodes}" ) last_id = rows[-1][id_column]