From 681812d0a4c0075df9c27c1ef1cd6c97e30c93e0 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 26 Dec 2025 14:55:22 +0100 Subject: [PATCH] cleanup: Remove unnecessary debug logging from consolidation logic Removed extensive debug logging that was added while troubleshooting the consolidation grouping issue. The new simplified logic using NodeNum sequence detection is clear enough without the additional logging. This keeps the code cleaner and reduces log verbosity during migration. --- src/connectors/mysql_connector.py | 34 +++----------------------- src/transformers/schema_transformer.py | 6 +++++ 2 files changed, 10 insertions(+), 30 deletions(-) diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index d5dc61a..c8302a2 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -245,7 +245,8 @@ class MySQLConnector: id_column = "idElabData" if table == "ELABDATADISP" else "id" max_retries = 3 last_id = start_id - buffered_group = [] # Buffer group at batch boundary or with continuing key + # Buffer incomplete groups at batch boundaries when NodeNum hasn't cycled back to 1 yet + buffered_group = [] last_buffered_key = None while True: @@ -275,11 +276,6 @@ class MySQLConnector: if not rows: # End of partition: yield any buffered group if buffered_group: - logger.info( - f"[CONSOLIDATION DEBUG] End of partition - yielding final buffered group: " - f"key={last_buffered_key}, rows={len(buffered_group)}, " - f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}" - ) yield buffered_group return @@ -304,18 +300,11 @@ class MySQLConnector: ) if first_row_key == last_buffered_key: - logger.info( - f"[CONSOLIDATION DEBUG] Merging buffered group with new batch: key={last_buffered_key}, " - f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}" - ) + # Merge buffered rows with current batch (same consolidation key continues) sorted_rows = buffered_group + sorted_rows buffered_group = [] else: # Buffered group belongs to different key - yield it first - logger.info( - f"[CONSOLIDATION DEBUG] Yielding buffered group (key change): " - f"key={last_buffered_key}, rows={len(buffered_group)}" - ) yield buffered_group buffered_group = [] last_buffered_key = None @@ -338,11 +327,6 @@ class MySQLConnector: # Detect group boundary: key changed OR NodeNum decreased (new measurement started) if current_key is not None and (key != current_key or (last_node_num is not None and node_num < last_node_num)): if current_group: - nodes = sorted([r.get('NodeNum') for r in current_group]) - logger.info( - f"[CONSOLIDATION DEBUG] Group boundary detected: key={current_key}, " - f"rows={len(current_group)}, nodes={nodes}" - ) yield current_group current_group = [] @@ -354,22 +338,12 @@ class MySQLConnector: if not rows or len(rows) < limit: # This is the last batch - yield the remaining group if current_group: - nodes = sorted([r.get('NodeNum') for r in current_group]) - logger.info( - f"[CONSOLIDATION DEBUG] Final batch - yielding group: key={current_key}, " - f"rows={len(current_group)}, nodes={nodes}" - ) yield current_group else: - # More rows might exist - buffer the last group + # More rows might exist - buffer the last group for next batch if current_group: buffered_group = current_group last_buffered_key = current_key - nodes = sorted([r.get('NodeNum') for r in current_group]) - logger.info( - f"[CONSOLIDATION DEBUG] Buffering group at batch boundary: key={current_key}, " - f"rows={len(current_group)}, nodes={nodes}" - ) last_id = rows[-1][id_column] break # Success, exit retry loop diff --git a/src/transformers/schema_transformer.py b/src/transformers/schema_transformer.py index 79fad69..1608c8b 100644 --- a/src/transformers/schema_transformer.py +++ b/src/transformers/schema_transformer.py @@ -66,6 +66,9 @@ CREATE INDEX IF NOT EXISTS idx_measurements_gin_raw CREATE INDEX IF NOT EXISTS idx_event_timestamp_raw ON rawdatacor(event_timestamp); + +CREATE INDEX IF NOT EXISTS idx_id_raw + ON rawdatacor(id); """ return sql @@ -128,6 +131,9 @@ CREATE INDEX IF NOT EXISTS idx_measurements_gin_elab CREATE INDEX IF NOT EXISTS idx_event_timestamp_elab ON elabdatadisp(event_timestamp); + +CREATE INDEX IF NOT EXISTS idx_id_elab_data + ON elabdatadisp(id_elab_data); """ return sql