diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index f763e6a..1ca8850 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -244,16 +244,21 @@ class MySQLConnector: # Determine ID column name id_column = "idElabData" if table == "ELABDATADISP" else "id" max_retries = 3 - last_key = None + last_completed_key = None # Last key we fully yielded (not incomplete) + + # CRITICAL: These must be OUTSIDE the while loop to persist across batch iterations + # If they're inside the loop, buffered incomplete groups from previous batches get lost + current_group = [] + current_key = None while True: retries = 0 while retries < max_retries: try: with self.connection.cursor() as cursor: - # ORDER BY consolidation key (without NodeNum for speed) - # Ensures all rows of a key arrive together, then we sort by NodeNum in memory - if last_key is None: + # ORDER BY consolidation key + # Ensures all rows of a key arrive together + if last_completed_key is None: rows_query = f""" SELECT * FROM `{table}` PARTITION (`{partition}`) ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC @@ -261,29 +266,26 @@ class MySQLConnector: """ cursor.execute(rows_query, (limit,)) else: - # Resume after last key using tuple comparison + # Resume AFTER last completely yielded key + # This ensures we don't re-process any rows rows_query = f""" SELECT * FROM `{table}` PARTITION (`{partition}`) WHERE (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s) ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC LIMIT %s """ - cursor.execute(rows_query, (last_key[0], last_key[1], last_key[2], last_key[3], limit)) + cursor.execute(rows_query, (last_completed_key[0], last_completed_key[1], last_completed_key[2], last_completed_key[3], limit)) rows = cursor.fetchall() if not rows: + # No more rows - yield any buffered group and finish + if current_group: + yield current_group return - # DO NOT re-sort by NodeNum! The database has already ordered by consolidation key, - # and re-sorting breaks the grouping. Rows with same key will not be consecutive anymore. - # We trust the MySQL ORDER BY to keep all rows of same key together. - - # Group rows by consolidation key - # Since rows are already ordered by key from MySQL ORDER BY, all rows with same key are consecutive - current_group = [] - current_key = None - + # Process batch: add rows to groups, yield only COMPLETE groups + # Keep incomplete groups buffered in current_group for next batch for row in rows: key = ( row.get("UnitName"), @@ -292,61 +294,37 @@ class MySQLConnector: row.get("EventTime") ) - # Yield group when key changes + # Key changed - previous group is complete, yield it if current_key is not None and key != current_key: - if current_group: - yield current_group + yield current_group current_group = [] + last_completed_key = current_key current_group.append(row) current_key = key - # At end of batch: ALWAYS yield complete groups immediately - # Incomplete groups at batch boundary will be continued in next batch - # because we track last_key correctly for resume - if not rows or len(rows) < limit: + # At batch boundary: only yield if group is DEFINITELY complete + # A group is incomplete if the last row has same key as current group + if rows and len(rows) < limit: # Last batch - yield remaining group and finish if current_group: yield current_group return - else: - # More rows exist - yield all COMPLETE groups seen so far - # If current_group is incomplete (same key as last row), keep buffering it - if current_group: - # Check if this is a complete group or partial - # Complete group: all rows with this key have been seen in this batch - # Partial: might continue in next batch - last_row = rows[-1] - last_row_key = ( - last_row.get("UnitName"), - last_row.get("ToolNameID"), - last_row.get("EventDate"), - last_row.get("EventTime") - ) + elif rows: + # More rows exist - check if last row belongs to current group + last_row_key = ( + rows[-1].get("UnitName"), + rows[-1].get("ToolNameID"), + rows[-1].get("EventDate"), + rows[-1].get("EventTime") + ) - if last_row_key != current_key: - # Current group is complete (ended before batch boundary) - yield current_group - current_group = [] - current_key = None - # else: current group continues past batch boundary - # Keep it in current_group for merging with next batch - # BUT we need to update last_key to be the LAST yielded key, - # not the current incomplete key, so resume works correctly - # Actually NO - we should NOT update last_key if we have a pending group! - - # Update last_key only if we don't have a pending incomplete group - if current_key is None and rows: - # All groups in this batch were yielded and complete - last_row = rows[-1] - last_key = ( - last_row.get("UnitName"), - last_row.get("ToolNameID"), - last_row.get("EventDate"), - last_row.get("EventTime") - ) - # If current_key is not None, we have incomplete group - DON'T update last_key - # so next iteration will continue from where this group started + if last_row_key != current_key: + # Last row was from a previous (complete) group - shouldn't happen + # given the ORDER BY, but if it does, we've already yielded it above + pass + # else: current group is incomplete (extends past this batch) + # Keep it buffered for next batch break # Success, exit retry loop diff --git a/test_generator_output.py b/test_generator_output.py new file mode 100644 index 0000000..deaa345 --- /dev/null +++ b/test_generator_output.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +"""Debug what the generator is actually returning.""" +import sys +sys.path.insert(0, '/home/alex/devel/mysql2postgres') + +from src.connectors.mysql_connector import MySQLConnector +from src.utils.logger import setup_logger, get_logger + +setup_logger(__name__) +logger = get_logger(__name__) + +print("\n" + "="*80) +print("Testing consolidation groups generator for d1") +print("="*80 + "\n") + +with MySQLConnector() as mysql_conn: + partition = "d1" + group_num = 0 + # Use datetime objects to match what the generator uses + import datetime + target_key = ("ID0003", "DT0002", datetime.date(2014, 8, 31), datetime.timedelta(hours=11, minutes=59, seconds=10)) + + print("First 20 groups from generator:\n") + print("DEBUG: First row columns:", flush=True) + + for group_rows in mysql_conn.fetch_consolidation_groups_from_partition( + "ELABDATADISP", + partition, + limit=100 + ): + group_num += 1 + if group_rows: + first_row = group_rows[0] + + # Debug: print all columns from first group + if group_num == 1: + print(f" Available columns: {first_row.keys()}\n") + print(f" First row data: {dict(first_row)}\n") + + key = ( + first_row.get("UnitName"), + first_row.get("ToolNameID"), + str(first_row.get("EventDate")), + str(first_row.get("EventTime")) + ) + nodes = sorted([r.get('NodeNum') for r in group_rows]) + + # Show first 20 groups or target key + if group_num <= 20 or key == target_key: + print(f"Group {group_num}: key={key}") + print(f" Nodes ({len(nodes)}): {nodes}") + print(f" Rows count: {len(group_rows)}\n") + + if key == target_key: + print("^^^ THIS IS THE TARGET KEY! ^^^\n") + break + + if group_num >= 100: + print(f"\nStopped at group {group_num}") + break + +print(f"\nTotal groups processed: {group_num}") +print("Done!\n") diff --git a/test_target_record.py b/test_target_record.py new file mode 100644 index 0000000..e2be236 --- /dev/null +++ b/test_target_record.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +"""Test if target record is being consolidated correctly.""" +from src.connectors.mysql_connector import MySQLConnector +from src.transformers.data_transformer import DataTransformer +from src.utils.logger import setup_logger, get_logger + +setup_logger(__name__) +logger = get_logger(__name__) + +print("\n" + "="*80) +print("Testing target record consolidation") +print("="*80 + "\n") + +target_key = ("M1_ID0246", "DT0001", "2023-06-26", "10:43:59") + +with MySQLConnector() as mysql_conn: + partition = "d10" + group_num = 0 + found = False + + print("Fetching consolidation groups from d10...\n") + + for group_rows in mysql_conn.fetch_consolidation_groups_from_partition( + "ELABDATADISP", + partition, + limit=100 + ): + group_num += 1 + if group_rows: + first_row = group_rows[0] + key = ( + first_row.get("UnitName"), + first_row.get("ToolNameID"), + str(first_row.get("EventDate")), + str(first_row.get("EventTime")) + ) + nodes = sorted([r.get('NodeNum') for r in group_rows]) + + # Show first 10 groups + if group_num <= 10: + print(f"Group {group_num}: key={key}, nodes={len(nodes)} items") + + if key == target_key: + print(f"\n✓ FOUND TARGET KEY in group {group_num}!") + print(f" Key: {key}") + print(f" Nodes: {nodes}") + print(f" Count: {len(group_rows)}") + + if len(nodes) == 22 and nodes == list(range(1, 23)): + print("\n✓ All 22 nodes present!") + + # Test consolidation + consolidated = DataTransformer.consolidate_elabdatadisp_batch(group_rows) + print(f"Consolidated to {len(consolidated)} row(s)") + + if len(consolidated) == 1: + print("✓ Consolidated to 1 row!") + import json + meas = consolidated[0].get("measurements") + if isinstance(meas, str): + meas = json.loads(meas) + cons_nodes = sorted([int(k) for k in meas.keys()]) + print(f"Measurements nodes: {cons_nodes}") + + if cons_nodes == list(range(1, 23)): + print("\n" + "="*80) + print("✓✓✓ TARGET RECORD CONSOLIDATES CORRECTLY ✓✓✓") + print("="*80) + else: + print(f"✗ Expected nodes 1-22, got {cons_nodes}") + else: + print(f"✗ Expected 1 consolidated row, got {len(consolidated)}") + else: + print(f"✗ INCOMPLETE! Expected 22 nodes, got {len(nodes)}") + print(f" Expected: {list(range(1, 23))}") + print(f" Got: {nodes}") + + found = True + break + + # Safety limit + if group_num >= 1000: + print(f"\nStopped at group {group_num} (safety limit)") + break + +if not found: + print(f"\n✗ Target key NOT FOUND in first {group_num} groups") + print("\nThis is a PROBLEM - the record is not being returned by the generator!") + +print("\nDone!\n")