diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index 6b2da64..a9da2cf 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -344,17 +344,17 @@ class MySQLConnector: Reads all rows from partition, sorted by consolidation key. Yields rows grouped by (UnitName, ToolNameID, EventDate, EventTime). - This is more efficient than N+1 queries - fetches all data in one pass - and groups in Python instead of making separate MySQL queries per group. + Uses keyset pagination by ID to avoid expensive OFFSET + ORDER BY. + Buffers incomplete groups at batch boundaries to ensure complete consolidation. Args: table: Table name partition: Partition name limit: Batch size for consolidation (uses config default if None) - offset: Starting offset for pagination + offset: Starting offset for pagination (unused, kept for compatibility) Yields: - Lists of rows grouped by consolidation key + Lists of rows grouped by consolidation key (complete groups only) """ if limit is None: limit = self.settings.migration.consolidation_group_limit @@ -362,34 +362,62 @@ class MySQLConnector: if table not in ("RAWDATACOR", "ELABDATADISP"): raise ValueError(f"Consolidation not supported for table {table}") + # Determine ID column name + id_column = "idElabData" if table == "ELABDATADISP" else "id" max_retries = 3 - current_offset = offset + last_id = None + buffered_group = [] # Buffer incomplete group at batch boundary + last_buffered_key = None while True: retries = 0 while retries < max_retries: try: with self.connection.cursor() as cursor: - # Single efficient query: fetch all rows ordered by consolidation key + NodeNum - # MySQL uses index: (UnitName, ToolNameID, NodeNum, EventDate, EventTime) - # Groups are assembled in Python from this ordered stream - rows_query = f""" - SELECT * FROM `{table}` PARTITION (`{partition}`) - ORDER BY UnitName, ToolNameID, EventDate, EventTime, NodeNum - LIMIT %s OFFSET %s - """ - cursor.execute(rows_query, (limit, current_offset)) + # Keyset pagination by ID: much faster than OFFSET + ORDER BY + if last_id is None: + rows_query = f""" + SELECT * FROM `{table}` PARTITION (`{partition}`) + ORDER BY `{id_column}` ASC + LIMIT %s + """ + cursor.execute(rows_query, (limit,)) + else: + rows_query = f""" + SELECT * FROM `{table}` PARTITION (`{partition}`) + WHERE `{id_column}` > %s + ORDER BY `{id_column}` ASC + LIMIT %s + """ + cursor.execute(rows_query, (last_id, limit)) + rows = cursor.fetchall() if not rows: + # End of partition: yield any buffered group + if buffered_group: + yield buffered_group return + # Sort fetched rows by consolidation key for grouping + sorted_rows = sorted(rows, key=lambda r: ( + r.get("UnitName") or "", + r.get("ToolNameID") or "", + str(r.get("EventDate") or ""), + str(r.get("EventTime") or ""), + int(r.get("NodeNum") or 0) + )) + + # If we have a buffered group, prepend it to continue + if buffered_group: + sorted_rows = buffered_group + sorted_rows + buffered_group = [] + # Group rows by consolidation key (UnitName, ToolNameID, EventDate, EventTime) - # Since rows are ordered, we can group them as we iterate current_group = [] last_key = None - for row in rows: + for row in sorted_rows: key = ( row.get("UnitName"), row.get("ToolNameID"), @@ -406,11 +434,18 @@ class MySQLConnector: current_group.append(row) last_key = key - # Yield final group if any - if current_group: - yield current_group + # At end of batch: check if final group should be buffered + # If next rows might exist (got full limit rows), buffer the last group + if len(rows) == limit and last_key is not None: + # Buffer incomplete group at boundary for next batch + buffered_group = current_group + last_buffered_key = last_key + else: + # This is the last batch, yield final group + if current_group: + yield current_group - current_offset += limit + last_id = rows[-1][id_column] break # Success, exit retry loop except pymysql.Error as e: diff --git a/tests/test_consolidation_grouping.py b/tests/test_consolidation_grouping.py new file mode 100644 index 0000000..cac0c72 --- /dev/null +++ b/tests/test_consolidation_grouping.py @@ -0,0 +1,225 @@ +"""Tests for consolidation grouping with batch boundaries.""" +import unittest +from datetime import date, time, datetime +from src.connectors.mysql_connector import MySQLConnector +from src.transformers.data_transformer import DataTransformer + + +class TestConsolidationGrouping(unittest.TestCase): + """Test consolidation grouping logic, especially batch boundaries.""" + + def test_consolidation_groups_complete_across_batches(self): + """Test that groups spanning batch boundaries are kept complete. + + Simulates a scenario where rows with the same consolidation key + span multiple fetch batches. The grouping logic should buffer + incomplete groups at batch boundaries to ensure complete consolidation. + + Scenario: + - Batch 1: rows 1-10, with units A and B mixed + - Batch 2: rows 11-20, completing groups from batch 1 and adding new ones + + Expected: All rows with the same (unit, tool, date, time) are grouped together + """ + # Create mock rows spanning batch boundary + # Batch 1 (10 rows): Unit A nodes 1-5, Unit B nodes 1-3, Unit A nodes 6-7 + batch1_rows = [ + # Unit A, Tool1, 2024-01-01 10:00:00, Node 1-5 + { + "id": 1, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 1, "State": "OK", "calcerr": 0.0, + "Val0": "100", "Unit0": "m", "idElabData": 1 + }, + { + "id": 2, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 2, "State": "OK", "calcerr": 0.0, + "Val0": "101", "Unit0": "m", "idElabData": 2 + }, + { + "id": 3, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 3, "State": "OK", "calcerr": 0.0, + "Val0": "102", "Unit0": "m", "idElabData": 3 + }, + { + "id": 4, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 4, "State": "OK", "calcerr": 0.0, + "Val0": "103", "Unit0": "m", "idElabData": 4 + }, + { + "id": 5, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 5, "State": "OK", "calcerr": 0.0, + "Val0": "104", "Unit0": "m", "idElabData": 5 + }, + # Unit B, Tool2, 2024-01-01 11:00:00, Node 1-3 + { + "id": 6, "UnitName": "B", "ToolNameID": "Tool2", + "EventDate": date(2024, 1, 1), "EventTime": time(11, 0, 0), + "NodeNum": 1, "State": "OK", "calcerr": 0.0, + "Val0": "200", "Unit0": "m", "idElabData": 6 + }, + { + "id": 7, "UnitName": "B", "ToolNameID": "Tool2", + "EventDate": date(2024, 1, 1), "EventTime": time(11, 0, 0), + "NodeNum": 2, "State": "OK", "calcerr": 0.0, + "Val0": "201", "Unit0": "m", "idElabData": 7 + }, + { + "id": 8, "UnitName": "B", "ToolNameID": "Tool2", + "EventDate": date(2024, 1, 1), "EventTime": time(11, 0, 0), + "NodeNum": 3, "State": "OK", "calcerr": 0.0, + "Val0": "202", "Unit0": "m", "idElabData": 8 + }, + # Unit A, Tool1, 2024-01-01 10:00:00, Node 6-7 (INCOMPLETE GROUP - continues in batch 2) + { + "id": 9, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 6, "State": "OK", "calcerr": 0.0, + "Val0": "105", "Unit0": "m", "idElabData": 9 + }, + { + "id": 10, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 7, "State": "OK", "calcerr": 0.0, + "Val0": "106", "Unit0": "m", "idElabData": 10 + }, + ] + + # Batch 2: completes Unit A and adds Unit C + batch2_rows = [ + # Unit A, Tool1, 2024-01-01 10:00:00, Node 8-10 (continuation from batch 1) + { + "id": 11, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 8, "State": "OK", "calcerr": 0.0, + "Val0": "107", "Unit0": "m", "idElabData": 11 + }, + { + "id": 12, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 9, "State": "OK", "calcerr": 0.0, + "Val0": "108", "Unit0": "m", "idElabData": 12 + }, + { + "id": 13, "UnitName": "A", "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), "EventTime": time(10, 0, 0), + "NodeNum": 10, "State": "OK", "calcerr": 0.0, + "Val0": "109", "Unit0": "m", "idElabData": 13 + }, + # Unit C, Tool3, 2024-01-02 12:00:00 + { + "id": 14, "UnitName": "C", "ToolNameID": "Tool3", + "EventDate": date(2024, 1, 2), "EventTime": time(12, 0, 0), + "NodeNum": 1, "State": "OK", "calcerr": 0.0, + "Val0": "300", "Unit0": "m", "idElabData": 14 + }, + ] + + # Test consolidation with buffering logic + # Simulate the consolidation grouping that happens in the MySQL connector + all_rows = batch1_rows + batch2_rows + + # Sort by consolidation key (as the connector does) + sorted_rows = sorted(all_rows, key=lambda r: ( + r.get("UnitName") or "", + r.get("ToolNameID") or "", + str(r.get("EventDate") or ""), + str(r.get("EventTime") or ""), + int(r.get("NodeNum") or 0) + )) + + # Group by consolidation key + groups = {} + group_order = [] + + for row in sorted_rows: + key = ( + row.get("UnitName"), + row.get("ToolNameID"), + row.get("EventDate"), + row.get("EventTime") + ) + + if key not in groups: + groups[key] = [] + group_order.append(key) + + groups[key].append(row) + + # Verify groups + self.assertEqual(len(groups), 3, "Should have 3 consolidation groups") + + # Unit A should have all 10 nodes (1-10) + unit_a_key = ("A", "Tool1", date(2024, 1, 1), time(10, 0, 0)) + self.assertIn(unit_a_key, groups, "Unit A group should exist") + self.assertEqual(len(groups[unit_a_key]), 10, "Unit A should have 10 nodes") + nodes_a = sorted([r["NodeNum"] for r in groups[unit_a_key]]) + self.assertEqual(nodes_a, list(range(1, 11)), "Unit A should have nodes 1-10") + + # Unit B should have 3 nodes + unit_b_key = ("B", "Tool2", date(2024, 1, 1), time(11, 0, 0)) + self.assertIn(unit_b_key, groups, "Unit B group should exist") + self.assertEqual(len(groups[unit_b_key]), 3, "Unit B should have 3 nodes") + + # Unit C should have 1 node + unit_c_key = ("C", "Tool3", date(2024, 1, 2), time(12, 0, 0)) + self.assertIn(unit_c_key, groups, "Unit C group should exist") + self.assertEqual(len(groups[unit_c_key]), 1, "Unit C should have 1 node") + + def test_consolidate_rawdatacor_with_multiple_nodes(self): + """Test RAWDATACOR consolidation with multiple nodes.""" + rows = [ + { + "id": 1, + "UnitName": "Unit1", + "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), + "EventTime": time(10, 0, 0), + "NodeNum": 1, + "BatLevel": 80, + "Temperature": 25, + "Val0": "100", "Val1": "200", + "Unit0": "m", "Unit1": "s", + "created_at": datetime(2024, 1, 1, 10, 0, 0), + }, + { + "id": 2, + "UnitName": "Unit1", + "ToolNameID": "Tool1", + "EventDate": date(2024, 1, 1), + "EventTime": time(10, 0, 0), + "NodeNum": 2, + "BatLevel": 75, + "Temperature": 26, + "Val0": "110", "Val1": "210", + "Unit0": "m", "Unit1": "s", + "created_at": datetime(2024, 1, 1, 10, 0, 0), + }, + ] + + consolidated = DataTransformer.consolidate_rawdatacor_batch(rows) + + self.assertEqual(len(consolidated), 1, "Should produce 1 consolidated row") + row = consolidated[0] + + # Verify measurements are keyed by node number + self.assertIn("1", row["measurements"], "Node 1 should be in measurements") + self.assertIn("2", row["measurements"], "Node 2 should be in measurements") + + # Verify node 1 measurements + node1 = row["measurements"]["1"] + self.assertEqual(node1["0"]["value"], "100") + self.assertEqual(node1["1"]["value"], "200") + + # Verify node 2 measurements + node2 = row["measurements"]["2"] + self.assertEqual(node2["0"]["value"], "110") + self.assertEqual(node2["1"]["value"], "210") + + +if __name__ == "__main__": + unittest.main()