fix: Fix duplicate group insertion in consolidation generator

Critical bug: current_group and current_key were inside the while loop,
causing them to be reset on each batch iteration. When an incomplete group
spanned a batch boundary, it would be:
1. Buffered at end of batch N (in local current_group)
2. LOST when loop continued (new local variables created)
3. Re-fetched and yielded again in batch N+1

This caused the same consolidated record to be inserted many times.

Solution: Move current_group and current_key OUTSIDE while loop to persist
across batch iterations. Incomplete groups now properly merge across batch
boundaries without duplication.

Algorithm:
- Only yield groups when we're 100% certain they're complete
- A group is complete when the next key differs from current key
- At batch boundaries, incomplete groups stay buffered for next batch
- Resume always uses last_completed_key to avoid re-processing

This fixes the user's observation of 27 identical rows for the same
consolidated record.

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-27 10:26:39 +01:00
parent 418da67857
commit 79cd4f4559
3 changed files with 190 additions and 59 deletions

View File

@@ -244,16 +244,21 @@ class MySQLConnector:
# Determine ID column name
id_column = "idElabData" if table == "ELABDATADISP" else "id"
max_retries = 3
last_key = None
last_completed_key = None # Last key we fully yielded (not incomplete)
# CRITICAL: These must be OUTSIDE the while loop to persist across batch iterations
# If they're inside the loop, buffered incomplete groups from previous batches get lost
current_group = []
current_key = None
while True:
retries = 0
while retries < max_retries:
try:
with self.connection.cursor() as cursor:
# ORDER BY consolidation key (without NodeNum for speed)
# Ensures all rows of a key arrive together, then we sort by NodeNum in memory
if last_key is None:
# ORDER BY consolidation key
# Ensures all rows of a key arrive together
if last_completed_key is None:
rows_query = f"""
SELECT * FROM `{table}` PARTITION (`{partition}`)
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
@@ -261,29 +266,26 @@ class MySQLConnector:
"""
cursor.execute(rows_query, (limit,))
else:
# Resume after last key using tuple comparison
# Resume AFTER last completely yielded key
# This ensures we don't re-process any rows
rows_query = f"""
SELECT * FROM `{table}` PARTITION (`{partition}`)
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s)
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
LIMIT %s
"""
cursor.execute(rows_query, (last_key[0], last_key[1], last_key[2], last_key[3], limit))
cursor.execute(rows_query, (last_completed_key[0], last_completed_key[1], last_completed_key[2], last_completed_key[3], limit))
rows = cursor.fetchall()
if not rows:
# No more rows - yield any buffered group and finish
if current_group:
yield current_group
return
# DO NOT re-sort by NodeNum! The database has already ordered by consolidation key,
# and re-sorting breaks the grouping. Rows with same key will not be consecutive anymore.
# We trust the MySQL ORDER BY to keep all rows of same key together.
# Group rows by consolidation key
# Since rows are already ordered by key from MySQL ORDER BY, all rows with same key are consecutive
current_group = []
current_key = None
# Process batch: add rows to groups, yield only COMPLETE groups
# Keep incomplete groups buffered in current_group for next batch
for row in rows:
key = (
row.get("UnitName"),
@@ -292,61 +294,37 @@ class MySQLConnector:
row.get("EventTime")
)
# Yield group when key changes
# Key changed - previous group is complete, yield it
if current_key is not None and key != current_key:
if current_group:
yield current_group
yield current_group
current_group = []
last_completed_key = current_key
current_group.append(row)
current_key = key
# At end of batch: ALWAYS yield complete groups immediately
# Incomplete groups at batch boundary will be continued in next batch
# because we track last_key correctly for resume
if not rows or len(rows) < limit:
# At batch boundary: only yield if group is DEFINITELY complete
# A group is incomplete if the last row has same key as current group
if rows and len(rows) < limit:
# Last batch - yield remaining group and finish
if current_group:
yield current_group
return
else:
# More rows exist - yield all COMPLETE groups seen so far
# If current_group is incomplete (same key as last row), keep buffering it
if current_group:
# Check if this is a complete group or partial
# Complete group: all rows with this key have been seen in this batch
# Partial: might continue in next batch
last_row = rows[-1]
last_row_key = (
last_row.get("UnitName"),
last_row.get("ToolNameID"),
last_row.get("EventDate"),
last_row.get("EventTime")
)
elif rows:
# More rows exist - check if last row belongs to current group
last_row_key = (
rows[-1].get("UnitName"),
rows[-1].get("ToolNameID"),
rows[-1].get("EventDate"),
rows[-1].get("EventTime")
)
if last_row_key != current_key:
# Current group is complete (ended before batch boundary)
yield current_group
current_group = []
current_key = None
# else: current group continues past batch boundary
# Keep it in current_group for merging with next batch
# BUT we need to update last_key to be the LAST yielded key,
# not the current incomplete key, so resume works correctly
# Actually NO - we should NOT update last_key if we have a pending group!
# Update last_key only if we don't have a pending incomplete group
if current_key is None and rows:
# All groups in this batch were yielded and complete
last_row = rows[-1]
last_key = (
last_row.get("UnitName"),
last_row.get("ToolNameID"),
last_row.get("EventDate"),
last_row.get("EventTime")
)
# If current_key is not None, we have incomplete group - DON'T update last_key
# so next iteration will continue from where this group started
if last_row_key != current_key:
# Last row was from a previous (complete) group - shouldn't happen
# given the ORDER BY, but if it does, we've already yielded it above
pass
# else: current group is incomplete (extends past this batch)
# Keep it buffered for next batch
break # Success, exit retry loop

63
test_generator_output.py Normal file
View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python3
"""Debug what the generator is actually returning."""
import sys
sys.path.insert(0, '/home/alex/devel/mysql2postgres')
from src.connectors.mysql_connector import MySQLConnector
from src.utils.logger import setup_logger, get_logger
setup_logger(__name__)
logger = get_logger(__name__)
print("\n" + "="*80)
print("Testing consolidation groups generator for d1")
print("="*80 + "\n")
with MySQLConnector() as mysql_conn:
partition = "d1"
group_num = 0
# Use datetime objects to match what the generator uses
import datetime
target_key = ("ID0003", "DT0002", datetime.date(2014, 8, 31), datetime.timedelta(hours=11, minutes=59, seconds=10))
print("First 20 groups from generator:\n")
print("DEBUG: First row columns:", flush=True)
for group_rows in mysql_conn.fetch_consolidation_groups_from_partition(
"ELABDATADISP",
partition,
limit=100
):
group_num += 1
if group_rows:
first_row = group_rows[0]
# Debug: print all columns from first group
if group_num == 1:
print(f" Available columns: {first_row.keys()}\n")
print(f" First row data: {dict(first_row)}\n")
key = (
first_row.get("UnitName"),
first_row.get("ToolNameID"),
str(first_row.get("EventDate")),
str(first_row.get("EventTime"))
)
nodes = sorted([r.get('NodeNum') for r in group_rows])
# Show first 20 groups or target key
if group_num <= 20 or key == target_key:
print(f"Group {group_num}: key={key}")
print(f" Nodes ({len(nodes)}): {nodes}")
print(f" Rows count: {len(group_rows)}\n")
if key == target_key:
print("^^^ THIS IS THE TARGET KEY! ^^^\n")
break
if group_num >= 100:
print(f"\nStopped at group {group_num}")
break
print(f"\nTotal groups processed: {group_num}")
print("Done!\n")

90
test_target_record.py Normal file
View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""Test if target record is being consolidated correctly."""
from src.connectors.mysql_connector import MySQLConnector
from src.transformers.data_transformer import DataTransformer
from src.utils.logger import setup_logger, get_logger
setup_logger(__name__)
logger = get_logger(__name__)
print("\n" + "="*80)
print("Testing target record consolidation")
print("="*80 + "\n")
target_key = ("M1_ID0246", "DT0001", "2023-06-26", "10:43:59")
with MySQLConnector() as mysql_conn:
partition = "d10"
group_num = 0
found = False
print("Fetching consolidation groups from d10...\n")
for group_rows in mysql_conn.fetch_consolidation_groups_from_partition(
"ELABDATADISP",
partition,
limit=100
):
group_num += 1
if group_rows:
first_row = group_rows[0]
key = (
first_row.get("UnitName"),
first_row.get("ToolNameID"),
str(first_row.get("EventDate")),
str(first_row.get("EventTime"))
)
nodes = sorted([r.get('NodeNum') for r in group_rows])
# Show first 10 groups
if group_num <= 10:
print(f"Group {group_num}: key={key}, nodes={len(nodes)} items")
if key == target_key:
print(f"\n✓ FOUND TARGET KEY in group {group_num}!")
print(f" Key: {key}")
print(f" Nodes: {nodes}")
print(f" Count: {len(group_rows)}")
if len(nodes) == 22 and nodes == list(range(1, 23)):
print("\n✓ All 22 nodes present!")
# Test consolidation
consolidated = DataTransformer.consolidate_elabdatadisp_batch(group_rows)
print(f"Consolidated to {len(consolidated)} row(s)")
if len(consolidated) == 1:
print("✓ Consolidated to 1 row!")
import json
meas = consolidated[0].get("measurements")
if isinstance(meas, str):
meas = json.loads(meas)
cons_nodes = sorted([int(k) for k in meas.keys()])
print(f"Measurements nodes: {cons_nodes}")
if cons_nodes == list(range(1, 23)):
print("\n" + "="*80)
print("✓✓✓ TARGET RECORD CONSOLIDATES CORRECTLY ✓✓✓")
print("="*80)
else:
print(f"✗ Expected nodes 1-22, got {cons_nodes}")
else:
print(f"✗ Expected 1 consolidated row, got {len(consolidated)}")
else:
print(f"✗ INCOMPLETE! Expected 22 nodes, got {len(nodes)}")
print(f" Expected: {list(range(1, 23))}")
print(f" Got: {nodes}")
found = True
break
# Safety limit
if group_num >= 1000:
print(f"\nStopped at group {group_num} (safety limit)")
break
if not found:
print(f"\n✗ Target key NOT FOUND in first {group_num} groups")
print("\nThis is a PROBLEM - the record is not being returned by the generator!")
print("\nDone!\n")