diff --git a/config.py b/config.py index 713e807..e854308 100644 --- a/config.py +++ b/config.py @@ -49,7 +49,7 @@ class MigrationSettings(BaseSettings): ) batch_size: int = 10000 - consolidation_group_limit: int = 100000 + consolidation_group_limit: int = 10000 log_level: str = "INFO" dry_run: bool = False diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index dd2323a..8b59250 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -341,13 +341,16 @@ class MySQLConnector: ) -> Generator[List[Dict[str, Any]], None, None]: """Fetch consolidation groups from a partition. - Reads unique combinations of (UnitName, ToolNameID, EventDate, EventTime, NodeNum) - and fetches all rows for each group. This ensures proper consolidation. + Reads all rows from partition, sorted by consolidation key. + Yields rows grouped by (UnitName, ToolNameID, EventDate, EventTime). + + This is more efficient than N+1 queries - fetches all data in one pass + and groups in Python instead of making separate MySQL queries per group. Args: table: Table name partition: Partition name - limit: Maximum number of groups per query (uses config default if None) + limit: Batch size for consolidation (uses config default if None) offset: Starting offset for pagination Yields: @@ -367,26 +370,27 @@ class MySQLConnector: while retries < max_retries: try: with self.connection.cursor() as cursor: - # Get unique consolidation groups from partition - # First, get the distinct consolidation keys - group_query = f""" - SELECT DISTINCT UnitName, ToolNameID, EventDate, EventTime + # First: Get distinct consolidation keys from partition using GROUP BY + # Uses index efficiently: (UnitName, ToolNameID, NodeNum, EventDate, EventTime) + group_keys_query = f""" + SELECT UnitName, ToolNameID, EventDate, EventTime FROM `{table}` PARTITION (`{partition}`) - ORDER BY UnitName, ToolNameID, EventDate, EventTime + GROUP BY UnitName, ToolNameID, NodeNum, EventDate, EventTime + ORDER BY UnitName, ToolNameID, NodeNum, EventDate, EventTime LIMIT %s OFFSET %s """ - cursor.execute(group_query, (limit, current_offset)) - groups = cursor.fetchall() + cursor.execute(group_keys_query, (limit, current_offset)) + group_keys = cursor.fetchall() - if not groups: + if not group_keys: return - # For each group, fetch all rows - for group in groups: - unit_name = group.get("UnitName") - tool_name_id = group.get("ToolNameID") - event_date = group.get("EventDate") - event_time = group.get("EventTime") + # For each consolidation key, fetch all matching rows + for group_key in group_keys: + unit_name = group_key.get("UnitName") + tool_name_id = group_key.get("ToolNameID") + event_date = group_key.get("EventDate") + event_time = group_key.get("EventTime") rows_query = f""" SELECT * FROM `{table}` PARTITION (`{partition}`)