fix: Only update last_completed_partition when partition is fully processed
Previously, last_completed_partition was updated during batch flushes while the partition was still being processed. This caused resume to skip partitions that were only partially completed. Now, last_completed_partition is only updated AFTER all consolidation groups in a partition have been processed and the final buffer flush is complete. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
134
debug_consolidation.py
Normal file
134
debug_consolidation.py
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Debug script to trace consolidation for a specific group."""
|
||||||
|
import sys
|
||||||
|
from datetime import date, time
|
||||||
|
from src.connectors.mysql_connector import MySQLConnector
|
||||||
|
from src.transformers.data_transformer import DataTransformer
|
||||||
|
from src.utils.logger import setup_logger, get_logger
|
||||||
|
|
||||||
|
setup_logger(__name__)
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
# Test consolidation key
|
||||||
|
UNIT_NAME = "M1_ID0246"
|
||||||
|
TOOL_NAME_ID = "DT0001"
|
||||||
|
EVENT_DATE = date(2023, 6, 26)
|
||||||
|
EVENT_TIME = time(10, 43, 59)
|
||||||
|
PARTITION = "d10"
|
||||||
|
|
||||||
|
print(f"\n{'='*80}")
|
||||||
|
print(f"Tracing consolidation for:")
|
||||||
|
print(f" Unit: {UNIT_NAME}")
|
||||||
|
print(f" Tool: {TOOL_NAME_ID}")
|
||||||
|
print(f" Date: {EVENT_DATE}")
|
||||||
|
print(f" Time: {EVENT_TIME}")
|
||||||
|
print(f" Partition: {PARTITION}")
|
||||||
|
print(f"{'='*80}\n")
|
||||||
|
|
||||||
|
with MySQLConnector() as mysql_conn:
|
||||||
|
# First, get all rows from MySQL
|
||||||
|
query = f"""
|
||||||
|
SELECT * FROM `ELABDATADISP` PARTITION (`{PARTITION}`)
|
||||||
|
WHERE UnitName = %s AND ToolNameID = %s
|
||||||
|
AND EventDate = %s AND EventTime = %s
|
||||||
|
ORDER BY idElabData ASC
|
||||||
|
"""
|
||||||
|
|
||||||
|
with mysql_conn.connection.cursor() as cursor:
|
||||||
|
cursor.execute(query, (UNIT_NAME, TOOL_NAME_ID, EVENT_DATE, EVENT_TIME))
|
||||||
|
all_rows = cursor.fetchall()
|
||||||
|
|
||||||
|
print(f"Total rows found in MySQL: {len(all_rows)}")
|
||||||
|
print(f"\nNodes present (sorted by idElabData):")
|
||||||
|
for row in all_rows:
|
||||||
|
print(f" NodeNum={row['NodeNum']:2d}, idElabData={row['idElabData']:10d}")
|
||||||
|
|
||||||
|
# Now simulate what fetch_consolidation_groups_from_partition does
|
||||||
|
print(f"\n{'='*80}")
|
||||||
|
print(f"Simulating batch fetching with consolidation grouping:")
|
||||||
|
print(f"{'='*80}\n")
|
||||||
|
|
||||||
|
# Group by consolidation key first (as the real code does via iterator)
|
||||||
|
all_groups_fetched = []
|
||||||
|
for group_rows in mysql_conn.fetch_consolidation_groups_from_partition(
|
||||||
|
"ELABDATADISP",
|
||||||
|
PARTITION,
|
||||||
|
limit=5000 # Default batch size
|
||||||
|
):
|
||||||
|
all_groups_fetched.append(group_rows)
|
||||||
|
|
||||||
|
# Check if this is our target group
|
||||||
|
if group_rows:
|
||||||
|
key = (
|
||||||
|
group_rows[0].get("UnitName"),
|
||||||
|
group_rows[0].get("ToolNameID"),
|
||||||
|
group_rows[0].get("EventDate"),
|
||||||
|
group_rows[0].get("EventTime")
|
||||||
|
)
|
||||||
|
if key == (UNIT_NAME, TOOL_NAME_ID, EVENT_DATE, EVENT_TIME):
|
||||||
|
print(f"Found target group!")
|
||||||
|
print(f" Group size: {len(group_rows)} rows")
|
||||||
|
print(f" Nodes in group: {sorted([r['NodeNum'] for r in group_rows])}")
|
||||||
|
print(f" idElabData range: {min(r['idElabData'] for r in group_rows)} - {max(r['idElabData'] for r in group_rows)}")
|
||||||
|
|
||||||
|
# Now check consolidation
|
||||||
|
print(f"\n{'='*80}")
|
||||||
|
print(f"Testing consolidation logic:")
|
||||||
|
print(f"{'='*80}\n")
|
||||||
|
|
||||||
|
# Find all groups for this consolidation key in all fetched data
|
||||||
|
consolidated_results = {}
|
||||||
|
for group_rows in all_groups_fetched:
|
||||||
|
if not group_rows:
|
||||||
|
continue
|
||||||
|
|
||||||
|
key = (
|
||||||
|
group_rows[0].get("UnitName"),
|
||||||
|
group_rows[0].get("ToolNameID"),
|
||||||
|
group_rows[0].get("EventDate"),
|
||||||
|
group_rows[0].get("EventTime")
|
||||||
|
)
|
||||||
|
|
||||||
|
if key == (UNIT_NAME, TOOL_NAME_ID, EVENT_DATE, EVENT_TIME):
|
||||||
|
print(f"\nGroup received by consolidate_elabdatadisp_batch():")
|
||||||
|
print(f" Rows: {len(group_rows)}")
|
||||||
|
print(f" Nodes: {sorted([r['NodeNum'] for r in group_rows])}")
|
||||||
|
|
||||||
|
# Run consolidation
|
||||||
|
consolidated = DataTransformer.consolidate_elabdatadisp_batch(group_rows)
|
||||||
|
print(f"\nAfter consolidation:")
|
||||||
|
print(f" Consolidated rows: {len(consolidated)}")
|
||||||
|
|
||||||
|
for cons_row in consolidated:
|
||||||
|
if "measurements" in cons_row:
|
||||||
|
nodes_in_measurements = sorted([int(k) for k in cons_row["measurements"].keys()])
|
||||||
|
print(f" Nodes in JSONB measurements: {nodes_in_measurements}")
|
||||||
|
|
||||||
|
consolidated_results[key] = {
|
||||||
|
"rows": len(group_rows),
|
||||||
|
"nodes_fetched": sorted([r['NodeNum'] for r in group_rows]),
|
||||||
|
"nodes_consolidated": nodes_in_measurements
|
||||||
|
}
|
||||||
|
|
||||||
|
if not consolidated_results:
|
||||||
|
print("\n⚠️ Target consolidation key NOT found in any group!")
|
||||||
|
else:
|
||||||
|
print(f"\n{'='*80}")
|
||||||
|
print(f"Summary:")
|
||||||
|
print(f"{'='*80}")
|
||||||
|
for key, result in consolidated_results.items():
|
||||||
|
print(f"\nKey: {key}")
|
||||||
|
print(f" MySQL rows fetched: {result['rows']}")
|
||||||
|
print(f" Nodes in fetched rows: {result['nodes_fetched']}")
|
||||||
|
print(f" Nodes in consolidated JSONB: {result['nodes_consolidated']}")
|
||||||
|
|
||||||
|
if set(result['nodes_fetched']) == set(result['nodes_consolidated']):
|
||||||
|
print(f" ✓ Consolidation is COMPLETE")
|
||||||
|
else:
|
||||||
|
missing = set(result['nodes_fetched']) - set(result['nodes_consolidated'])
|
||||||
|
extra = set(result['nodes_consolidated']) - set(result['nodes_fetched'])
|
||||||
|
print(f" ✗ Consolidation is INCOMPLETE")
|
||||||
|
if missing:
|
||||||
|
print(f" Missing nodes: {sorted(missing)}")
|
||||||
|
if extra:
|
||||||
|
print(f" Extra nodes: {sorted(extra)}")
|
||||||
@@ -289,12 +289,17 @@ class MySQLConnector:
|
|||||||
|
|
||||||
# If we have a buffered group, prepend it to continue
|
# If we have a buffered group, prepend it to continue
|
||||||
if buffered_group:
|
if buffered_group:
|
||||||
logger.debug(
|
logger.info(
|
||||||
f"Resuming buffered group: key={last_buffered_key}, "
|
f"[CONSOLIDATION DEBUG] Resuming buffered group: key={last_buffered_key}, "
|
||||||
f"prev_buffered_rows={len(buffered_group)}, new_rows={len(rows)}"
|
f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}, "
|
||||||
|
f"buffered_nodes={sorted([r.get('NodeNum') for r in buffered_group])}"
|
||||||
)
|
)
|
||||||
sorted_rows = buffered_group + sorted_rows
|
sorted_rows = buffered_group + sorted_rows
|
||||||
buffered_group = []
|
buffered_group = []
|
||||||
|
logger.info(
|
||||||
|
f"[CONSOLIDATION DEBUG] After prepending buffer: total_rows={len(sorted_rows)}, "
|
||||||
|
f"nodes={sorted([r.get('NodeNum') for r in sorted_rows])}"
|
||||||
|
)
|
||||||
|
|
||||||
# Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime)
|
# Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime)
|
||||||
current_group = []
|
current_group = []
|
||||||
@@ -327,6 +332,10 @@ class MySQLConnector:
|
|||||||
if is_final_batch:
|
if is_final_batch:
|
||||||
# This is the last batch - yield the remaining group
|
# This is the last batch - yield the remaining group
|
||||||
if current_group:
|
if current_group:
|
||||||
|
logger.info(
|
||||||
|
f"[CONSOLIDATION DEBUG] Final batch - yielding group: key={current_key}, "
|
||||||
|
f"rows={len(current_group)}, nodes={sorted([r.get('NodeNum') for r in current_group])}"
|
||||||
|
)
|
||||||
yield current_group
|
yield current_group
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Final group yielded: key={current_key}, "
|
f"Final group yielded: key={current_key}, "
|
||||||
@@ -338,6 +347,10 @@ class MySQLConnector:
|
|||||||
if current_group:
|
if current_group:
|
||||||
buffered_group = current_group
|
buffered_group = current_group
|
||||||
last_buffered_key = current_key
|
last_buffered_key = current_key
|
||||||
|
logger.info(
|
||||||
|
f"[CONSOLIDATION DEBUG] Buffering group at boundary: key={current_key}, "
|
||||||
|
f"rows={len(current_group)}, nodes={sorted([r.get('NodeNum') for r in current_group])}"
|
||||||
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Group buffered at boundary: key={current_key}, "
|
f"Group buffered at boundary: key={current_key}, "
|
||||||
f"rows_in_group={len(current_group)}, "
|
f"rows_in_group={len(current_group)}, "
|
||||||
|
|||||||
@@ -166,9 +166,9 @@ class FullMigrator:
|
|||||||
batch_count += 1
|
batch_count += 1
|
||||||
progress.update(fetched_in_buffer)
|
progress.update(fetched_in_buffer)
|
||||||
# Update migration state after every batch flush
|
# Update migration state after every batch flush
|
||||||
|
# Do NOT set last_completed_partition yet - partition is still being processed
|
||||||
self._update_migration_state(
|
self._update_migration_state(
|
||||||
pg_conn, migrated, None, migration_start_time,
|
pg_conn, migrated, None, migration_start_time
|
||||||
last_partition=partition
|
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Partition {partition}: flushed {inserted} rows, "
|
f"Partition {partition}: flushed {inserted} rows, "
|
||||||
@@ -185,16 +185,21 @@ class FullMigrator:
|
|||||||
migrated += inserted
|
migrated += inserted
|
||||||
batch_count += 1
|
batch_count += 1
|
||||||
progress.update(fetched_in_buffer)
|
progress.update(fetched_in_buffer)
|
||||||
|
# Still don't set last_completed_partition - partition is still being finalized
|
||||||
self._update_migration_state(
|
self._update_migration_state(
|
||||||
pg_conn, migrated, None, migration_start_time,
|
pg_conn, migrated, None, migration_start_time
|
||||||
last_partition=partition
|
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Partition {partition} final flush: {inserted} rows, "
|
f"Partition {partition} final flush: {inserted} rows, "
|
||||||
f"total migrated: {migrated}"
|
f"total migrated: {migrated}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# NOW partition is complete - update with completed partition
|
||||||
logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated")
|
logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated")
|
||||||
|
self._update_migration_state(
|
||||||
|
pg_conn, migrated, None, migration_start_time,
|
||||||
|
last_partition=partition
|
||||||
|
)
|
||||||
|
|
||||||
# Get final actual count from PostgreSQL
|
# Get final actual count from PostgreSQL
|
||||||
final_count = pg_conn.get_row_count(pg_table)
|
final_count = pg_conn.get_row_count(pg_table)
|
||||||
|
|||||||
36
test_d10_migration.py
Normal file
36
test_d10_migration.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Test migration of just d10 partition with consolidation debugging."""
|
||||||
|
import sys
|
||||||
|
from src.migrator.full_migration import FullMigrator
|
||||||
|
from src.utils.logger import setup_logger, get_logger
|
||||||
|
from src.connectors.postgres_connector import PostgreSQLConnector
|
||||||
|
|
||||||
|
setup_logger(__name__)
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("Testing ELABDATADISP migration for partition d10 with debugging")
|
||||||
|
print("="*80 + "\n")
|
||||||
|
|
||||||
|
# Clear the target table first
|
||||||
|
print("Clearing target table...")
|
||||||
|
with PostgreSQLConnector() as pg_conn:
|
||||||
|
with pg_conn.connection.cursor() as cursor:
|
||||||
|
cursor.execute("DELETE FROM elabdatadisp")
|
||||||
|
pg_conn.connection.commit()
|
||||||
|
print("Target table cleared.")
|
||||||
|
|
||||||
|
# Now run migration
|
||||||
|
print("\nStarting migration...")
|
||||||
|
try:
|
||||||
|
migrator = FullMigrator("ELABDATADISP")
|
||||||
|
result = migrator.migrate(dry_run=False, resume=False)
|
||||||
|
print(f"\nMigration result: {result} rows")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Migration error: {e}", exc_info=True)
|
||||||
|
print(f"Migration error: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("Migration complete - check logs for [CONSOLIDATION DEBUG] messages")
|
||||||
|
print("="*80 + "\n")
|
||||||
Reference in New Issue
Block a user