From 32f90fdd4776614c8f9022d466cacc10fd5ceb48 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 26 Dec 2025 00:20:46 +0100 Subject: [PATCH] refactor: Remove legacy consolidation methods from MySQLConnector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove unused fetch_all_rows() and fetch_rows_ordered_for_consolidation() methods. These were part of the old migration strategy before partition-based consolidation. The current implementation uses fetch_consolidation_groups_from_partition() which handles keyset pagination and consolidation group buffering more efficiently. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 --- src/connectors/mysql_connector.py | 123 ------------------------------ 1 file changed, 123 deletions(-) diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index a4e7350..c65a6df 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -71,57 +71,6 @@ class MySQLConnector: logger.error(f"Failed to get row count for {table}: {e}") raise - def fetch_all_rows( - self, - table: str, - batch_size: Optional[int] = None - ) -> Generator[List[Dict[str, Any]], None, None]: - """Fetch all rows from a table in batches. - - Args: - table: Table name - batch_size: Number of rows per batch (uses config default if None) - - Yields: - Batches of row dictionaries - """ - if batch_size is None: - batch_size = self.settings.migration.batch_size - - offset = 0 - max_retries = 3 - - while True: - retries = 0 - while retries < max_retries: - try: - with self.connection.cursor() as cursor: - query = f"SELECT * FROM `{table}` LIMIT %s OFFSET %s" - cursor.execute(query, (batch_size, offset)) - rows = cursor.fetchall() - - if not rows: - return - - yield rows - offset += len(rows) - break # Success, exit retry loop - - except pymysql.Error as e: - retries += 1 - if retries >= max_retries: - logger.error(f"Failed to fetch rows from {table} after {max_retries} retries: {e}") - raise - else: - logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}") - # Reconnect and retry - try: - self.disconnect() - self.connect() - except Exception as reconnect_error: - logger.error(f"Failed to reconnect: {reconnect_error}") - raise - def fetch_rows_since( self, table: str, @@ -219,78 +168,6 @@ class MySQLConnector: logger.error(f"Failed to fetch rows from {table}: {e}") raise - def fetch_rows_ordered_for_consolidation( - self, - table: str, - start_id: Optional[int] = None, - batch_size: Optional[int] = None - ) -> Generator[List[Dict[str, Any]], None, None]: - """Fetch rows using keyset pagination for efficient consolidation. - - Fetches rows by ID for fast, efficient keyset pagination. - Rows are then sorted by consolidation key in Python after fetching to ensure - all nodes with the same (Unit, Tool, Date, Time) stay together in the same batch. - - Args: - table: Table name ('RAWDATACOR' or 'ELABDATADISP') - start_id: Resume from this ID (fetch id > start_id). If None, starts from beginning - batch_size: Number of rows per batch (uses config default if None) - - Yields: - Batches of row dictionaries - """ - if table not in ("RAWDATACOR", "ELABDATADISP"): - raise ValueError(f"Consolidation ordering only supported for RAWDATACOR and ELABDATADISP, got {table}") - - if batch_size is None: - batch_size = self.settings.migration.batch_size - - # Determine the ID column name based on table - id_column = "idElabData" if table == "ELABDATADISP" else "id" - - last_id = start_id - max_retries = 3 - - while True: - retries = 0 - while retries < max_retries: - try: - with self.connection.cursor() as cursor: - # Fetch by ID using keyset pagination (fast, uses primary key) - # Python will sort by consolidation key after fetching - - if last_id is None: - query = f"SELECT * FROM `{table}` ORDER BY `{id_column}` ASC LIMIT %s" - cursor.execute(query, (batch_size,)) - else: - # For resume: fetch rows with id > last_id - query = f"SELECT * FROM `{table}` WHERE `{id_column}` > %s ORDER BY `{id_column}` ASC LIMIT %s" - cursor.execute(query, (last_id, batch_size)) - - rows = cursor.fetchall() - - if not rows: - return - - yield rows - last_id = rows[-1][id_column] - break # Success, exit retry loop - - except pymysql.Error as e: - retries += 1 - if retries >= max_retries: - logger.error(f"Failed to fetch rows from {table} after {max_retries} retries: {e}") - raise - else: - logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}") - # Reconnect and retry - try: - self.disconnect() - self.connect() - except Exception as reconnect_error: - logger.error(f"Failed to reconnect: {reconnect_error}") - raise - def get_table_structure(self, table: str) -> Dict[str, Any]: """Get table structure (column info).