diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index 3c08559..7129f11 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -227,8 +227,9 @@ class MySQLConnector: ) -> Generator[List[Dict[str, Any]], None, None]: """Fetch rows using keyset pagination for efficient consolidation. - Uses keyset pagination (id-based) to avoid expensive re-sorting with OFFSET. - Consolidation happens within each batch in Python. + Fetches rows by ID for fast, efficient keyset pagination. + Rows are then sorted by consolidation key in Python after fetching to ensure + all nodes with the same (Unit, Tool, Date, Time) stay together in the same batch. Args: table: Table name ('RAWDATACOR' or 'ELABDATADISP') @@ -255,17 +256,15 @@ class MySQLConnector: while retries < max_retries: try: with self.connection.cursor() as cursor: - # Use keyset pagination: fetch by id > last_id - # This is much more efficient than OFFSET for large tables - # Order by id first for pagination, then by consolidation key to keep - # related nodes together in the same batch - order_clause = f"`{id_column}` ASC, `UnitName` ASC, `ToolNameID` ASC, `EventDate` ASC, `EventTime` ASC, `NodeNum` ASC" + # Fetch by ID using keyset pagination (fast, uses primary key) + # Python will sort by consolidation key after fetching if last_id is None: - query = f"SELECT * FROM `{table}` ORDER BY {order_clause} LIMIT %s" + query = f"SELECT * FROM `{table}` ORDER BY `{id_column}` ASC LIMIT %s" cursor.execute(query, (batch_size,)) else: - query = f"SELECT * FROM `{table}` WHERE `{id_column}` > %s ORDER BY {order_clause} LIMIT %s" + # For resume: fetch rows with id > last_id + query = f"SELECT * FROM `{table}` WHERE `{id_column}` > %s ORDER BY `{id_column}` ASC LIMIT %s" cursor.execute(query, (last_id, batch_size)) rows = cursor.fetchall() diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index cf965a3..061313e 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -96,6 +96,13 @@ class FullMigrator: rows_to_migrate, f"Migrating {mysql_table}" ) as progress: + # Consolidate across batches by buffering rows with the same consolidation key + # This ensures all nodes of the same (unit, tool, timestamp) are consolidated together + row_buffer = [] + last_consolidation_key = None + columns = DataTransformer.get_column_order(pg_table) + total_mysql_rows = 0 + # Fetch and migrate rows in batches # Use ordered fetching for node consolidation with resume support for batch in mysql_conn.fetch_rows_ordered_for_consolidation( @@ -105,59 +112,82 @@ class FullMigrator: if not batch: break - # Track MySQL rows processed for progress (before consolidation) - batch_size = len(batch) + # Sort batch by consolidation key + sorted_batch = sorted(batch, key=lambda r: ( + r.get("UnitName") or "", + r.get("ToolNameID") or "", + str(r.get("EventDate") or ""), + str(r.get("EventTime") or ""), + int(r.get("NodeNum") or 0) + )) - # Transform batch with consolidation enabled + # Process each row, consolidating when consolidation key changes + for row in sorted_batch: + # Extract consolidation key + consolidation_key = ( + row.get("UnitName"), + row.get("ToolNameID"), + row.get("EventDate"), + row.get("EventTime") + ) + + # If consolidation key changed, consolidate the buffer + if last_consolidation_key is not None and consolidation_key != last_consolidation_key: + # Consolidate buffered rows + transformed = DataTransformer.transform_batch( + mysql_table, + row_buffer, + consolidate=True + ) + + # Insert consolidated rows + inserted = pg_conn.insert_batch(pg_table, transformed, columns) + if inserted > 0: + migrated += inserted + batch_count += 1 + progress.update(len(row_buffer)) + total_mysql_rows += len(row_buffer) + + # Update state every 10 inserts + if batch_count % 10 == 0: + batch_max_id = max(int(r.get(primary_key, 0)) for r in row_buffer) + self._update_migration_state( + pg_conn, migrated, batch_max_id, migration_start_time + ) + else: + batch_max_id = max(int(r.get(primary_key, 0)) for r in row_buffer) + try: + with pg_conn.connection.cursor() as cursor: + cursor.execute( + """UPDATE migration_state + SET last_migrated_id = %s, last_migrated_timestamp = %s + WHERE table_name = %s""", + (batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table) + ) + pg_conn.connection.commit() + except Exception as e: + logger.warning(f"Failed to update migration state: {e}") + + # Reset buffer + row_buffer = [] + + # Add row to buffer + row_buffer.append(row) + last_consolidation_key = consolidation_key + + # Consolidate any remaining rows in buffer + if row_buffer: transformed = DataTransformer.transform_batch( mysql_table, - batch, + row_buffer, consolidate=True ) - - # Insert batch - columns = DataTransformer.get_column_order(pg_table) - inserted = pg_conn.insert_batch( - pg_table, - transformed, - columns - ) - + inserted = pg_conn.insert_batch(pg_table, transformed, columns) if inserted > 0: - # Update progress based on MySQL rows processed - # (not PostgreSQL rows inserted, since consolidation reduces count) - progress.update(batch_size) - - # Accumulate inserted count locally migrated += inserted batch_count += 1 - - # Update state periodically (every 10 batches) to avoid expensive COUNT(*) queries - # Always update on last batch (will be detected when loop ends) - if batch_count % 10 == 0: - batch_max_id = max( - int(row.get("id", 0)) for row in batch - ) - # Update with accumulated local count (cheaper than COUNT(*)) - self._update_migration_state( - pg_conn, migrated, batch_max_id, migration_start_time - ) - else: - # Still update last_migrated_id for resume, but not total count - batch_max_id = max( - int(row.get("id", 0)) for row in batch - ) - try: - with pg_conn.connection.cursor() as cursor: - cursor.execute( - """UPDATE migration_state - SET last_migrated_id = %s, last_migrated_timestamp = %s - WHERE table_name = %s""", - (batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table) - ) - pg_conn.connection.commit() - except Exception as e: - logger.warning(f"Failed to update migration state: {e}") + progress.update(len(row_buffer)) + total_mysql_rows += len(row_buffer) # Get final actual count from PostgreSQL final_count = pg_conn.get_row_count(pg_table)