fix: Simplify consolidation grouping to use NodeNum decrease as boundary
Replace complex buffering logic with simpler approach: detect consolidation group boundaries by NodeNum sequence. When NodeNum decreases (e.g., from 18 back to 1), we know a new measurement has started. Changes: - Sort rows by (consolidation_key, NodeNum) instead of just consolidation_key - Detect group boundary when NodeNum decreases - Still buffer incomplete groups at batch boundaries - Merge buffered groups with same consolidation key in next batch This approach is more intuitive and handles the case where nodes of the same measurement are split across batches with non-contiguous IDs. Example: Nodes 1-11 with ID 132657553-132657655, then nodes 12-22 with ID 298-308 - now correctly consolidated into single group instead of 15 separate rows. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -283,7 +283,9 @@ class MySQLConnector:
|
|||||||
yield buffered_group
|
yield buffered_group
|
||||||
return
|
return
|
||||||
|
|
||||||
# Sort fetched rows by consolidation key for grouping
|
# Sort by consolidation key THEN by NodeNum
|
||||||
|
# This ensures all nodes of the same measurement are together,
|
||||||
|
# and when NodeNum decreases, we know we've started a new measurement
|
||||||
sorted_rows = sorted(rows, key=lambda r: (
|
sorted_rows = sorted(rows, key=lambda r: (
|
||||||
r.get("UnitName") or "",
|
r.get("UnitName") or "",
|
||||||
r.get("ToolNameID") or "",
|
r.get("ToolNameID") or "",
|
||||||
@@ -292,7 +294,7 @@ class MySQLConnector:
|
|||||||
int(r.get("NodeNum") or 0)
|
int(r.get("NodeNum") or 0)
|
||||||
))
|
))
|
||||||
|
|
||||||
# If we have a buffered group, check if current batch continues the same key
|
# Prepend any buffered group that belongs to the same consolidation key
|
||||||
if buffered_group and sorted_rows:
|
if buffered_group and sorted_rows:
|
||||||
first_row_key = (
|
first_row_key = (
|
||||||
sorted_rows[0].get("UnitName"),
|
sorted_rows[0].get("UnitName"),
|
||||||
@@ -302,33 +304,27 @@ class MySQLConnector:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if first_row_key == last_buffered_key:
|
if first_row_key == last_buffered_key:
|
||||||
# Same consolidation key continues - prepend buffered group
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[CONSOLIDATION DEBUG] Continuing buffered group: key={last_buffered_key}, "
|
f"[CONSOLIDATION DEBUG] Merging buffered group with new batch: key={last_buffered_key}, "
|
||||||
f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}, "
|
f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}"
|
||||||
f"buffered_nodes={sorted([r.get('NodeNum') for r in buffered_group])}"
|
|
||||||
)
|
)
|
||||||
sorted_rows = buffered_group + sorted_rows
|
sorted_rows = buffered_group + sorted_rows
|
||||||
buffered_group = []
|
buffered_group = []
|
||||||
logger.info(
|
|
||||||
f"[CONSOLIDATION DEBUG] After prepending buffer: total_rows={len(sorted_rows)}, "
|
|
||||||
f"nodes={sorted([r.get('NodeNum') for r in sorted_rows])}"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
# Key changed - yield buffered group first
|
# Buffered group belongs to different key - yield it first
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[CONSOLIDATION DEBUG] Key changed, yielding buffered group: "
|
f"[CONSOLIDATION DEBUG] Yielding buffered group (key change): "
|
||||||
f"key={last_buffered_key}, rows={len(buffered_group)}, "
|
f"key={last_buffered_key}, rows={len(buffered_group)}"
|
||||||
f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}"
|
|
||||||
)
|
)
|
||||||
yield buffered_group
|
yield buffered_group
|
||||||
buffered_group = []
|
buffered_group = []
|
||||||
last_buffered_key = None
|
last_buffered_key = None
|
||||||
|
|
||||||
# Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime)
|
# Group rows by consolidation key + detect group boundaries by NodeNum
|
||||||
|
# When NodeNum decreases, we've moved to a new measurement
|
||||||
current_group = []
|
current_group = []
|
||||||
current_key = None
|
current_key = None
|
||||||
is_final_batch = len(rows) < limit # True if this is the last batch
|
last_node_num = None
|
||||||
|
|
||||||
for row in sorted_rows:
|
for row in sorted_rows:
|
||||||
key = (
|
key = (
|
||||||
@@ -337,54 +333,42 @@ class MySQLConnector:
|
|||||||
row.get("EventDate"),
|
row.get("EventDate"),
|
||||||
row.get("EventTime")
|
row.get("EventTime")
|
||||||
)
|
)
|
||||||
|
node_num = int(row.get("NodeNum") or 0)
|
||||||
|
|
||||||
# If key changed, yield previous group and start new one
|
# Detect group boundary: key changed OR NodeNum decreased (new measurement started)
|
||||||
if current_key is not None and key != current_key:
|
if current_key is not None and (key != current_key or (last_node_num is not None and node_num < last_node_num)):
|
||||||
if current_group:
|
if current_group:
|
||||||
nodes = sorted([r.get('NodeNum') for r in current_group])
|
nodes = sorted([r.get('NodeNum') for r in current_group])
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[CONSOLIDATION DEBUG] Key change within batch - yielding: key={current_key}, "
|
f"[CONSOLIDATION DEBUG] Group boundary detected: key={current_key}, "
|
||||||
f"rows={len(current_group)}, nodes={nodes}"
|
f"rows={len(current_group)}, nodes={nodes}"
|
||||||
)
|
)
|
||||||
yield current_group
|
yield current_group
|
||||||
logger.debug(
|
|
||||||
f"Group yielded: key={current_key}, "
|
|
||||||
f"rows_in_group={len(current_group)}, "
|
|
||||||
f"max_id={current_group[-1][id_column]}"
|
|
||||||
)
|
|
||||||
current_group = []
|
current_group = []
|
||||||
|
|
||||||
current_group.append(row)
|
current_group.append(row)
|
||||||
current_key = key
|
current_key = key
|
||||||
|
last_node_num = node_num
|
||||||
|
|
||||||
# At end of batch: handle the final group
|
# At end of batch: handle the final group
|
||||||
if is_final_batch:
|
if not rows or len(rows) < limit:
|
||||||
# This is the last batch - yield the remaining group
|
# This is the last batch - yield the remaining group
|
||||||
if current_group:
|
if current_group:
|
||||||
|
nodes = sorted([r.get('NodeNum') for r in current_group])
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[CONSOLIDATION DEBUG] Final batch - yielding group: key={current_key}, "
|
f"[CONSOLIDATION DEBUG] Final batch - yielding group: key={current_key}, "
|
||||||
f"rows={len(current_group)}, nodes={sorted([r.get('NodeNum') for r in current_group])}"
|
f"rows={len(current_group)}, nodes={nodes}"
|
||||||
)
|
)
|
||||||
yield current_group
|
yield current_group
|
||||||
logger.debug(
|
|
||||||
f"Final group yielded: key={current_key}, "
|
|
||||||
f"rows_in_group={len(current_group)}, "
|
|
||||||
f"max_id={current_group[-1][id_column]}"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
# More rows might exist - buffer the last group for next batch
|
# More rows might exist - buffer the last group
|
||||||
# Don't know if more rows with same key will come, so buffer it
|
|
||||||
if current_group:
|
if current_group:
|
||||||
buffered_group = current_group
|
buffered_group = current_group
|
||||||
last_buffered_key = current_key
|
last_buffered_key = current_key
|
||||||
|
nodes = sorted([r.get('NodeNum') for r in current_group])
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[CONSOLIDATION DEBUG] Buffering group at boundary: key={current_key}, "
|
f"[CONSOLIDATION DEBUG] Buffering group at batch boundary: key={current_key}, "
|
||||||
f"rows={len(current_group)}, nodes={sorted([r.get('NodeNum') for r in current_group])}"
|
f"rows={len(current_group)}, nodes={nodes}"
|
||||||
)
|
|
||||||
logger.debug(
|
|
||||||
f"Group buffered at boundary: key={current_key}, "
|
|
||||||
f"rows_in_group={len(current_group)}, "
|
|
||||||
f"max_id={current_group[-1][id_column]}"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
last_id = rows[-1][id_column]
|
last_id = rows[-1][id_column]
|
||||||
|
|||||||
Reference in New Issue
Block a user