diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index aaa89c8..e437f93 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -45,7 +45,6 @@ class FullMigrator: mysql_table = self.config["mysql_table"] pg_table = self.config["postgres_table"] - primary_key = self.config.get("primary_key", "id") logger.info(f"Starting full migration of {mysql_table} -> {pg_table}") @@ -64,10 +63,15 @@ class FullMigrator: ) # Check for previous migration state - last_migrated_id = self._get_last_migrated_id(pg_conn, pg_table) + # Note: With partition-based consolidation, we track progress differently + # than with ID-based pagination. The resume capability is simplified: + # - If data exists in table, migration was in progress + # - Resume will continue from where we left off + # - Full restart requires clearing the table + previous_migrated_count = self._get_previous_migrated_count(pg_conn, pg_table) - if last_migrated_id is not None: + if previous_migrated_count > 0: pg_row_count = pg_conn.get_row_count(pg_table) logger.warning( f"Found previous migration state: {pg_row_count} rows already in {pg_table}" @@ -77,10 +81,9 @@ class FullMigrator: f"Migration already in progress for {pg_table}. " f"Use --resume to continue from last checkpoint, or delete data to restart." ) - logger.info(f"Resuming from ID > {last_migrated_id}") - rows_to_migrate = total_rows - last_migrated_id + logger.info(f"Resuming migration - found {pg_row_count} existing rows") + rows_to_migrate = total_rows - previous_migrated_count else: - last_migrated_id = None previous_migrated_count = 0 rows_to_migrate = total_rows @@ -128,25 +131,11 @@ class FullMigrator: batch_count += 1 progress.update(len(group_rows)) - # Update state every 10 consolidations + # Update state every 10 consolidations to track progress if batch_count % 10 == 0: - batch_max_id = max(int(r.get(primary_key, 0)) for r in group_rows) self._update_migration_state( - pg_conn, migrated, batch_max_id, migration_start_time + pg_conn, migrated, None, migration_start_time ) - else: - batch_max_id = max(int(r.get(primary_key, 0)) for r in group_rows) - try: - with pg_conn.connection.cursor() as cursor: - cursor.execute( - """UPDATE migration_state - SET last_migrated_id = %s, last_migrated_timestamp = %s - WHERE table_name = %s""", - (batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table) - ) - pg_conn.connection.commit() - except Exception as e: - logger.warning(f"Failed to update migration state: {e}") # Get final actual count from PostgreSQL final_count = pg_conn.get_row_count(pg_table)