diff --git a/main.py b/main.py index 890b9be..df87fde 100644 --- a/main.py +++ b/main.py @@ -74,7 +74,13 @@ def migrate(): is_flag=True, help="Resume from last checkpoint if migration was interrupted" ) -def full(table, dry_run, resume): +@click.option( + "--partition", + type=str, + default=None, + help="Only migrate this partition (for testing/debugging)" +) +def full(table, dry_run, resume, partition): """Perform full migration of all data.""" setup_logger(__name__) @@ -84,8 +90,8 @@ def full(table, dry_run, resume): total_migrated = 0 for tbl in tables: - click.echo(f"\nMigrating {tbl}...") - migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume) + click.echo(f"\nMigrating {tbl}" + (f" (partition {partition})" if partition else "") + "...") + migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume, partition=partition) total_migrated += migrated click.echo(f"✓ {tbl}: {migrated} rows migrated") diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index a53652d..433f350 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -31,12 +31,13 @@ class FullMigrator: self.settings = get_settings() self.state = MigrationState() - def migrate(self, dry_run: bool = False, resume: bool = False) -> int: + def migrate(self, dry_run: bool = False, resume: bool = False, partition: str = None) -> int: """Perform full migration of the table with resume capability. Args: dry_run: If True, log what would be done but don't modify data resume: If True, resume from last checkpoint; if False, check for conflicts + partition: If specified, only migrate this partition (for testing/debugging) Returns: Total number of rows migrated in this run @@ -112,6 +113,13 @@ class FullMigrator: partitions = mysql_conn.get_table_partitions(mysql_table) logger.info(f"Found {len(partitions)} partitions for {mysql_table}") + # If specific partition requested, filter to just that one + if partition: + if partition not in partitions: + raise ValueError(f"Partition {partition} not found. Available: {partitions}") + partitions = [partition] + logger.info(f"Filtering to only partition: {partition}") + for partition_idx, partition in enumerate(partitions, 1): # Skip partitions already completed in previous run if last_completed_partition and partition <= last_completed_partition: @@ -417,7 +425,8 @@ class FullMigrator: def run_full_migration( table: str, dry_run: bool = False, - resume: bool = False + resume: bool = False, + partition: str = None ) -> int: """Run full migration for a table. @@ -425,9 +434,10 @@ def run_full_migration( table: Table name to migrate dry_run: If True, show what would be done without modifying data resume: If True, resume from last checkpoint instead of starting fresh + partition: If specified, only migrate this partition (for testing/debugging) Returns: Number of rows migrated in this run """ migrator = FullMigrator(table) - return migrator.migrate(dry_run=dry_run, resume=resume) + return migrator.migrate(dry_run=dry_run, resume=resume, partition=partition)