diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index 84f1ca3..7563795 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -90,6 +90,7 @@ class FullMigrator: migrated = previous_migrated_count migration_start_time = datetime.utcnow().isoformat() + batch_count = 0 with ProgressTracker( rows_to_migrate, @@ -127,16 +128,36 @@ class FullMigrator: # (not PostgreSQL rows inserted, since consolidation reduces count) progress.update(batch_size) - # Update state after each batch for resume capability - # Use MAX id of the batch (represents last MySQL id processed) - batch_max_id = max( - int(row.get("id", 0)) for row in batch - ) - # Get actual row count from PostgreSQL for accuracy - actual_count = pg_conn.get_row_count(pg_table) - self._update_migration_state( - pg_conn, actual_count, batch_max_id, migration_start_time - ) + # Accumulate inserted count locally + migrated += inserted + batch_count += 1 + + # Update state periodically (every 10 batches) to avoid expensive COUNT(*) queries + # Always update on last batch (will be detected when loop ends) + if batch_count % 10 == 0: + batch_max_id = max( + int(row.get("id", 0)) for row in batch + ) + # Update with accumulated local count (cheaper than COUNT(*)) + self._update_migration_state( + pg_conn, migrated, batch_max_id, migration_start_time + ) + else: + # Still update last_migrated_id for resume, but not total count + batch_max_id = max( + int(row.get("id", 0)) for row in batch + ) + try: + with pg_conn.connection.cursor() as cursor: + cursor.execute( + """UPDATE migration_state + SET last_migrated_id = %s, last_migrated_timestamp = %s + WHERE table_name = %s""", + (batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table) + ) + pg_conn.connection.commit() + except Exception as e: + logger.warning(f"Failed to update migration state: {e}") # Get final actual count from PostgreSQL final_count = pg_conn.get_row_count(pg_table)