diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index d5dc61a..c8302a2 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -245,7 +245,8 @@ class MySQLConnector: id_column = "idElabData" if table == "ELABDATADISP" else "id" max_retries = 3 last_id = start_id - buffered_group = [] # Buffer group at batch boundary or with continuing key + # Buffer incomplete groups at batch boundaries when NodeNum hasn't cycled back to 1 yet + buffered_group = [] last_buffered_key = None while True: @@ -275,11 +276,6 @@ class MySQLConnector: if not rows: # End of partition: yield any buffered group if buffered_group: - logger.info( - f"[CONSOLIDATION DEBUG] End of partition - yielding final buffered group: " - f"key={last_buffered_key}, rows={len(buffered_group)}, " - f"nodes={sorted([r.get('NodeNum') for r in buffered_group])}" - ) yield buffered_group return @@ -304,18 +300,11 @@ class MySQLConnector: ) if first_row_key == last_buffered_key: - logger.info( - f"[CONSOLIDATION DEBUG] Merging buffered group with new batch: key={last_buffered_key}, " - f"buffered_rows={len(buffered_group)}, new_rows={len(rows)}" - ) + # Merge buffered rows with current batch (same consolidation key continues) sorted_rows = buffered_group + sorted_rows buffered_group = [] else: # Buffered group belongs to different key - yield it first - logger.info( - f"[CONSOLIDATION DEBUG] Yielding buffered group (key change): " - f"key={last_buffered_key}, rows={len(buffered_group)}" - ) yield buffered_group buffered_group = [] last_buffered_key = None @@ -338,11 +327,6 @@ class MySQLConnector: # Detect group boundary: key changed OR NodeNum decreased (new measurement started) if current_key is not None and (key != current_key or (last_node_num is not None and node_num < last_node_num)): if current_group: - nodes = sorted([r.get('NodeNum') for r in current_group]) - logger.info( - f"[CONSOLIDATION DEBUG] Group boundary detected: key={current_key}, " - f"rows={len(current_group)}, nodes={nodes}" - ) yield current_group current_group = [] @@ -354,22 +338,12 @@ class MySQLConnector: if not rows or len(rows) < limit: # This is the last batch - yield the remaining group if current_group: - nodes = sorted([r.get('NodeNum') for r in current_group]) - logger.info( - f"[CONSOLIDATION DEBUG] Final batch - yielding group: key={current_key}, " - f"rows={len(current_group)}, nodes={nodes}" - ) yield current_group else: - # More rows might exist - buffer the last group + # More rows might exist - buffer the last group for next batch if current_group: buffered_group = current_group last_buffered_key = current_key - nodes = sorted([r.get('NodeNum') for r in current_group]) - logger.info( - f"[CONSOLIDATION DEBUG] Buffering group at batch boundary: key={current_key}, " - f"rows={len(current_group)}, nodes={nodes}" - ) last_id = rows[-1][id_column] break # Success, exit retry loop diff --git a/src/transformers/schema_transformer.py b/src/transformers/schema_transformer.py index 79fad69..1608c8b 100644 --- a/src/transformers/schema_transformer.py +++ b/src/transformers/schema_transformer.py @@ -66,6 +66,9 @@ CREATE INDEX IF NOT EXISTS idx_measurements_gin_raw CREATE INDEX IF NOT EXISTS idx_event_timestamp_raw ON rawdatacor(event_timestamp); + +CREATE INDEX IF NOT EXISTS idx_id_raw + ON rawdatacor(id); """ return sql @@ -128,6 +131,9 @@ CREATE INDEX IF NOT EXISTS idx_measurements_gin_elab CREATE INDEX IF NOT EXISTS idx_event_timestamp_elab ON elabdatadisp(event_timestamp); + +CREATE INDEX IF NOT EXISTS idx_id_elab_data + ON elabdatadisp(id_elab_data); """ return sql