diff --git a/debug_consolidation.py b/debug_consolidation.py new file mode 100644 index 0000000..c2ca4a6 --- /dev/null +++ b/debug_consolidation.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +"""Debug script to trace consolidation for a specific group.""" +import sys +from datetime import date, time +from src.connectors.mysql_connector import MySQLConnector +from src.transformers.data_transformer import DataTransformer +from src.utils.logger import setup_logger, get_logger + +setup_logger(__name__) +logger = get_logger(__name__) + +# Test consolidation key +UNIT_NAME = "M1_ID0246" +TOOL_NAME_ID = "DT0001" +EVENT_DATE = date(2023, 6, 26) +EVENT_TIME = time(10, 43, 59) +PARTITION = "d10" + +print(f"\n{'='*80}") +print(f"Tracing consolidation for:") +print(f" Unit: {UNIT_NAME}") +print(f" Tool: {TOOL_NAME_ID}") +print(f" Date: {EVENT_DATE}") +print(f" Time: {EVENT_TIME}") +print(f" Partition: {PARTITION}") +print(f"{'='*80}\n") + +with MySQLConnector() as mysql_conn: + # First, get all rows from MySQL + query = f""" + SELECT * FROM `ELABDATADISP` PARTITION (`{PARTITION}`) + WHERE UnitName = %s AND ToolNameID = %s + AND EventDate = %s AND EventTime = %s + ORDER BY idElabData ASC + """ + + with mysql_conn.connection.cursor() as cursor: + cursor.execute(query, (UNIT_NAME, TOOL_NAME_ID, EVENT_DATE, EVENT_TIME)) + all_rows = cursor.fetchall() + + print(f"Total rows found in MySQL: {len(all_rows)}") + print(f"\nNodes present (sorted by idElabData):") + for row in all_rows: + print(f" NodeNum={row['NodeNum']:2d}, idElabData={row['idElabData']:10d}") + + # Now simulate what fetch_consolidation_groups_from_partition does + print(f"\n{'='*80}") + print(f"Simulating batch fetching with consolidation grouping:") + print(f"{'='*80}\n") + + # Group by consolidation key first (as the real code does via iterator) + all_groups_fetched = [] + for group_rows in mysql_conn.fetch_consolidation_groups_from_partition( + "ELABDATADISP", + PARTITION, + limit=5000 # Default batch size + ): + all_groups_fetched.append(group_rows) + + # Check if this is our target group + if group_rows: + key = ( + group_rows[0].get("UnitName"), + group_rows[0].get("ToolNameID"), + group_rows[0].get("EventDate"), + group_rows[0].get("EventTime") + ) + if key == (UNIT_NAME, TOOL_NAME_ID, EVENT_DATE, EVENT_TIME): + print(f"Found target group!") + print(f" Group size: {len(group_rows)} rows") + print(f" Nodes in group: {sorted([r['NodeNum'] for r in group_rows])}") + print(f" idElabData range: {min(r['idElabData'] for r in group_rows)} - {max(r['idElabData'] for r in group_rows)}") + + # Now check consolidation + print(f"\n{'='*80}") + print(f"Testing consolidation logic:") + print(f"{'='*80}\n") + + # Find all groups for this consolidation key in all fetched data + consolidated_results = {} + for group_rows in all_groups_fetched: + if not group_rows: + continue + + key = ( + group_rows[0].get("UnitName"), + group_rows[0].get("ToolNameID"), + group_rows[0].get("EventDate"), + group_rows[0].get("EventTime") + ) + + if key == (UNIT_NAME, TOOL_NAME_ID, EVENT_DATE, EVENT_TIME): + print(f"\nGroup received by consolidate_elabdatadisp_batch():") + print(f" Rows: {len(group_rows)}") + print(f" Nodes: {sorted([r['NodeNum'] for r in group_rows])}") + + # Run consolidation + consolidated = DataTransformer.consolidate_elabdatadisp_batch(group_rows) + print(f"\nAfter consolidation:") + print(f" Consolidated rows: {len(consolidated)}") + + for cons_row in consolidated: + if "measurements" in cons_row: + nodes_in_measurements = sorted([int(k) for k in cons_row["measurements"].keys()]) + print(f" Nodes in JSONB measurements: {nodes_in_measurements}") + + consolidated_results[key] = { + "rows": len(group_rows), + "nodes_fetched": sorted([r['NodeNum'] for r in group_rows]), + "nodes_consolidated": nodes_in_measurements + } + + if not consolidated_results: + print("\n⚠️ Target consolidation key NOT found in any group!") + else: + print(f"\n{'='*80}") + print(f"Summary:") + print(f"{'='*80}") + for key, result in consolidated_results.items(): + print(f"\nKey: {key}") + print(f" MySQL rows fetched: {result['rows']}") + print(f" Nodes in fetched rows: {result['nodes_fetched']}") + print(f" Nodes in consolidated JSONB: {result['nodes_consolidated']}") + + if set(result['nodes_fetched']) == set(result['nodes_consolidated']): + print(f" ✓ Consolidation is COMPLETE") + else: + missing = set(result['nodes_fetched']) - set(result['nodes_consolidated']) + extra = set(result['nodes_consolidated']) - set(result['nodes_fetched']) + print(f" ✗ Consolidation is INCOMPLETE") + if missing: + print(f" Missing nodes: {sorted(missing)}") + if extra: + print(f" Extra nodes: {sorted(extra)}") diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index c65a6df..f04a164 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -289,12 +289,17 @@ class MySQLConnector: # If we have a buffered group, prepend it to continue if buffered_group: - logger.debug( - f"Resuming buffered group: key={last_buffered_key}, " - f"prev_buffered_rows={len(buffered_group)}, new_rows={len(rows)}" + logger.info( + f"[CONSOLIDATION DEBUG] Resuming buffered group: key={last_buffered_key}, " + f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}, " + f"buffered_nodes={sorted([r.get('NodeNum') for r in buffered_group])}" ) sorted_rows = buffered_group + sorted_rows buffered_group = [] + logger.info( + f"[CONSOLIDATION DEBUG] After prepending buffer: total_rows={len(sorted_rows)}, " + f"nodes={sorted([r.get('NodeNum') for r in sorted_rows])}" + ) # Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime) current_group = [] @@ -327,6 +332,10 @@ class MySQLConnector: if is_final_batch: # This is the last batch - yield the remaining group if current_group: + logger.info( + f"[CONSOLIDATION DEBUG] Final batch - yielding group: key={current_key}, " + f"rows={len(current_group)}, nodes={sorted([r.get('NodeNum') for r in current_group])}" + ) yield current_group logger.debug( f"Final group yielded: key={current_key}, " @@ -338,6 +347,10 @@ class MySQLConnector: if current_group: buffered_group = current_group last_buffered_key = current_key + logger.info( + f"[CONSOLIDATION DEBUG] Buffering group at boundary: key={current_key}, " + f"rows={len(current_group)}, nodes={sorted([r.get('NodeNum') for r in current_group])}" + ) logger.debug( f"Group buffered at boundary: key={current_key}, " f"rows_in_group={len(current_group)}, " diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index 80d332f..9f1334e 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -166,9 +166,9 @@ class FullMigrator: batch_count += 1 progress.update(fetched_in_buffer) # Update migration state after every batch flush + # Do NOT set last_completed_partition yet - partition is still being processed self._update_migration_state( - pg_conn, migrated, None, migration_start_time, - last_partition=partition + pg_conn, migrated, None, migration_start_time ) logger.debug( f"Partition {partition}: flushed {inserted} rows, " @@ -185,16 +185,21 @@ class FullMigrator: migrated += inserted batch_count += 1 progress.update(fetched_in_buffer) + # Still don't set last_completed_partition - partition is still being finalized self._update_migration_state( - pg_conn, migrated, None, migration_start_time, - last_partition=partition + pg_conn, migrated, None, migration_start_time ) logger.debug( f"Partition {partition} final flush: {inserted} rows, " f"total migrated: {migrated}" ) + # NOW partition is complete - update with completed partition logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated") + self._update_migration_state( + pg_conn, migrated, None, migration_start_time, + last_partition=partition + ) # Get final actual count from PostgreSQL final_count = pg_conn.get_row_count(pg_table) diff --git a/test_d10_migration.py b/test_d10_migration.py new file mode 100644 index 0000000..f0991e4 --- /dev/null +++ b/test_d10_migration.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +"""Test migration of just d10 partition with consolidation debugging.""" +import sys +from src.migrator.full_migration import FullMigrator +from src.utils.logger import setup_logger, get_logger +from src.connectors.postgres_connector import PostgreSQLConnector + +setup_logger(__name__) +logger = get_logger(__name__) + +print("\n" + "="*80) +print("Testing ELABDATADISP migration for partition d10 with debugging") +print("="*80 + "\n") + +# Clear the target table first +print("Clearing target table...") +with PostgreSQLConnector() as pg_conn: + with pg_conn.connection.cursor() as cursor: + cursor.execute("DELETE FROM elabdatadisp") + pg_conn.connection.commit() + print("Target table cleared.") + +# Now run migration +print("\nStarting migration...") +try: + migrator = FullMigrator("ELABDATADISP") + result = migrator.migrate(dry_run=False, resume=False) + print(f"\nMigration result: {result} rows") +except Exception as e: + logger.error(f"Migration error: {e}", exc_info=True) + print(f"Migration error: {e}") + sys.exit(1) + +print("\n" + "="*80) +print("Migration complete - check logs for [CONSOLIDATION DEBUG] messages") +print("="*80 + "\n")