diff --git a/find_incomplete_consolidations.py b/find_incomplete_consolidations.py new file mode 100644 index 0000000..cc8416d --- /dev/null +++ b/find_incomplete_consolidations.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +"""Find and fix incomplete consolidations in PostgreSQL.""" +import sys +from src.connectors.postgres_connector import PostgreSQLConnector +from src.utils.logger import setup_logger, get_logger + +setup_logger(__name__) +logger = get_logger(__name__) + +print("\n" + "="*80) +print("Finding incomplete consolidations in elabdatadisp") +print("="*80 + "\n") + +with PostgreSQLConnector() as pg_conn: + # Find consolidation keys that appear multiple times + print("Querying for duplicate consolidation keys...") + query = """ + SELECT + unit_name, + tool_name_id, + event_timestamp, + COUNT(*) as row_count + FROM elabdatadisp + GROUP BY unit_name, tool_name_id, event_timestamp + HAVING COUNT(*) > 1 + ORDER BY row_count DESC + LIMIT 20 + """ + + with pg_conn.connection.cursor() as cursor: + cursor.execute(query) + results = cursor.fetchall() + + print(f"\nFound {len(results)} consolidation keys with multiple rows:\n") + + for row in results: + unit_name, tool_name_id, event_timestamp, row_count = row + print(f"Unit: {unit_name}, Tool: {tool_name_id}, Timestamp: {event_timestamp}") + print(f" Row count: {row_count}") + print() + + # Now let's get all the rows for the first incomplete consolidation + if results: + unit_name, tool_name_id, event_timestamp, _ = results[0] + + print("="*80) + print(f"Analyzing first incomplete consolidation:") + print(f"Unit: {unit_name}, Tool: {tool_name_id}, Timestamp: {event_timestamp}") + print("="*80 + "\n") + + # Get all rows for this key + detail_query = """ + SELECT + id_elab_data, + unit_name, + tool_name_id, + event_timestamp, + measurements + FROM elabdatadisp + WHERE unit_name = %s + AND tool_name_id = %s + AND event_timestamp = %s + ORDER BY id_elab_data + """ + + with pg_conn.connection.cursor() as cursor: + cursor.execute(detail_query, (unit_name, tool_name_id, event_timestamp)) + detail_rows = cursor.fetchall() + + print(f"Total rows for this key: {len(detail_rows)}\n") + + all_nodes = set() + for detail_row in detail_rows: + id_, unit, tool, ts, measurements = detail_row + if measurements: + # Get node numbers from the JSONB keys + import json + measurements_dict = measurements if isinstance(measurements, dict) else json.loads(measurements) + node_list = sorted([int(k) for k in measurements_dict.keys()]) + print(f"ID: {id_}") + print(f" Nodes: {node_list}") + all_nodes.update(str(n) for n in node_list) + + print(f"\nAll nodes across all rows: {sorted(all_nodes, key=lambda x: int(x))}") + print(f"Total unique nodes: {len(all_nodes)}") diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index 97e32e5..53ae041 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -341,6 +341,11 @@ class MySQLConnector: # If key changed, yield previous group and start new one if current_key is not None and key != current_key: if current_group: + nodes = sorted([r.get('NodeNum') for r in current_group]) + logger.info( + f"[CONSOLIDATION DEBUG] Key change within batch - yielding: key={current_key}, " + f"rows={len(current_group)}, nodes={nodes}" + ) yield current_group logger.debug( f"Group yielded: key={current_key}, "