Files
mysql2postgres/debug_consolidation.py
alex 6ca97f0ba4 fix: Only update last_completed_partition when partition is fully processed
Previously, last_completed_partition was updated during batch flushes while
the partition was still being processed. This caused resume to skip partitions
that were only partially completed.

Now, last_completed_partition is only updated AFTER all consolidation groups
in a partition have been processed and the final buffer flush is complete.

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2025-12-26 00:49:14 +01:00

135 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""Debug script to trace consolidation for a specific group."""
import sys
from datetime import date, time
from src.connectors.mysql_connector import MySQLConnector
from src.transformers.data_transformer import DataTransformer
from src.utils.logger import setup_logger, get_logger
setup_logger(__name__)
logger = get_logger(__name__)
# Test consolidation key
UNIT_NAME = "M1_ID0246"
TOOL_NAME_ID = "DT0001"
EVENT_DATE = date(2023, 6, 26)
EVENT_TIME = time(10, 43, 59)
PARTITION = "d10"
print(f"\n{'='*80}")
print(f"Tracing consolidation for:")
print(f" Unit: {UNIT_NAME}")
print(f" Tool: {TOOL_NAME_ID}")
print(f" Date: {EVENT_DATE}")
print(f" Time: {EVENT_TIME}")
print(f" Partition: {PARTITION}")
print(f"{'='*80}\n")
with MySQLConnector() as mysql_conn:
# First, get all rows from MySQL
query = f"""
SELECT * FROM `ELABDATADISP` PARTITION (`{PARTITION}`)
WHERE UnitName = %s AND ToolNameID = %s
AND EventDate = %s AND EventTime = %s
ORDER BY idElabData ASC
"""
with mysql_conn.connection.cursor() as cursor:
cursor.execute(query, (UNIT_NAME, TOOL_NAME_ID, EVENT_DATE, EVENT_TIME))
all_rows = cursor.fetchall()
print(f"Total rows found in MySQL: {len(all_rows)}")
print(f"\nNodes present (sorted by idElabData):")
for row in all_rows:
print(f" NodeNum={row['NodeNum']:2d}, idElabData={row['idElabData']:10d}")
# Now simulate what fetch_consolidation_groups_from_partition does
print(f"\n{'='*80}")
print(f"Simulating batch fetching with consolidation grouping:")
print(f"{'='*80}\n")
# Group by consolidation key first (as the real code does via iterator)
all_groups_fetched = []
for group_rows in mysql_conn.fetch_consolidation_groups_from_partition(
"ELABDATADISP",
PARTITION,
limit=5000 # Default batch size
):
all_groups_fetched.append(group_rows)
# Check if this is our target group
if group_rows:
key = (
group_rows[0].get("UnitName"),
group_rows[0].get("ToolNameID"),
group_rows[0].get("EventDate"),
group_rows[0].get("EventTime")
)
if key == (UNIT_NAME, TOOL_NAME_ID, EVENT_DATE, EVENT_TIME):
print(f"Found target group!")
print(f" Group size: {len(group_rows)} rows")
print(f" Nodes in group: {sorted([r['NodeNum'] for r in group_rows])}")
print(f" idElabData range: {min(r['idElabData'] for r in group_rows)} - {max(r['idElabData'] for r in group_rows)}")
# Now check consolidation
print(f"\n{'='*80}")
print(f"Testing consolidation logic:")
print(f"{'='*80}\n")
# Find all groups for this consolidation key in all fetched data
consolidated_results = {}
for group_rows in all_groups_fetched:
if not group_rows:
continue
key = (
group_rows[0].get("UnitName"),
group_rows[0].get("ToolNameID"),
group_rows[0].get("EventDate"),
group_rows[0].get("EventTime")
)
if key == (UNIT_NAME, TOOL_NAME_ID, EVENT_DATE, EVENT_TIME):
print(f"\nGroup received by consolidate_elabdatadisp_batch():")
print(f" Rows: {len(group_rows)}")
print(f" Nodes: {sorted([r['NodeNum'] for r in group_rows])}")
# Run consolidation
consolidated = DataTransformer.consolidate_elabdatadisp_batch(group_rows)
print(f"\nAfter consolidation:")
print(f" Consolidated rows: {len(consolidated)}")
for cons_row in consolidated:
if "measurements" in cons_row:
nodes_in_measurements = sorted([int(k) for k in cons_row["measurements"].keys()])
print(f" Nodes in JSONB measurements: {nodes_in_measurements}")
consolidated_results[key] = {
"rows": len(group_rows),
"nodes_fetched": sorted([r['NodeNum'] for r in group_rows]),
"nodes_consolidated": nodes_in_measurements
}
if not consolidated_results:
print("\n⚠️ Target consolidation key NOT found in any group!")
else:
print(f"\n{'='*80}")
print(f"Summary:")
print(f"{'='*80}")
for key, result in consolidated_results.items():
print(f"\nKey: {key}")
print(f" MySQL rows fetched: {result['rows']}")
print(f" Nodes in fetched rows: {result['nodes_fetched']}")
print(f" Nodes in consolidated JSONB: {result['nodes_consolidated']}")
if set(result['nodes_fetched']) == set(result['nodes_consolidated']):
print(f" ✓ Consolidation is COMPLETE")
else:
missing = set(result['nodes_fetched']) - set(result['nodes_consolidated'])
extra = set(result['nodes_consolidated']) - set(result['nodes_fetched'])
print(f" ✗ Consolidation is INCOMPLETE")
if missing:
print(f" Missing nodes: {sorted(missing)}")
if extra:
print(f" Extra nodes: {sorted(extra)}")