feat: Add --partition flag to migrate only specific partition

Allows testing/debugging by migrating only a single partition instead of
the entire table. Useful for:
- Testing consolidation on specific partitions
- Quick verification of fixes without full migration
- Targeted debugging

Usage:
    python3 main.py migrate full --table ELABDATADISP --partition d10
    python3 main.py migrate full --table RAWDATACOR --partition d11 --resume

Changes:
- Add partition parameter to FullMigrator.migrate()
- Filter partitions list to only specified partition if provided
- Validate partition exists in available partitions
- Add --partition CLI option to migrate full command
- Update message to show partition in progress

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-27 09:05:11 +01:00
parent 287a7ffb51
commit 58624c0866
2 changed files with 22 additions and 6 deletions

12
main.py
View File

@@ -74,7 +74,13 @@ def migrate():
is_flag=True, is_flag=True,
help="Resume from last checkpoint if migration was interrupted" help="Resume from last checkpoint if migration was interrupted"
) )
def full(table, dry_run, resume): @click.option(
"--partition",
type=str,
default=None,
help="Only migrate this partition (for testing/debugging)"
)
def full(table, dry_run, resume, partition):
"""Perform full migration of all data.""" """Perform full migration of all data."""
setup_logger(__name__) setup_logger(__name__)
@@ -84,8 +90,8 @@ def full(table, dry_run, resume):
total_migrated = 0 total_migrated = 0
for tbl in tables: for tbl in tables:
click.echo(f"\nMigrating {tbl}...") click.echo(f"\nMigrating {tbl}" + (f" (partition {partition})" if partition else "") + "...")
migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume) migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume, partition=partition)
total_migrated += migrated total_migrated += migrated
click.echo(f"{tbl}: {migrated} rows migrated") click.echo(f"{tbl}: {migrated} rows migrated")

View File

@@ -31,12 +31,13 @@ class FullMigrator:
self.settings = get_settings() self.settings = get_settings()
self.state = MigrationState() self.state = MigrationState()
def migrate(self, dry_run: bool = False, resume: bool = False) -> int: def migrate(self, dry_run: bool = False, resume: bool = False, partition: str = None) -> int:
"""Perform full migration of the table with resume capability. """Perform full migration of the table with resume capability.
Args: Args:
dry_run: If True, log what would be done but don't modify data dry_run: If True, log what would be done but don't modify data
resume: If True, resume from last checkpoint; if False, check for conflicts resume: If True, resume from last checkpoint; if False, check for conflicts
partition: If specified, only migrate this partition (for testing/debugging)
Returns: Returns:
Total number of rows migrated in this run Total number of rows migrated in this run
@@ -112,6 +113,13 @@ class FullMigrator:
partitions = mysql_conn.get_table_partitions(mysql_table) partitions = mysql_conn.get_table_partitions(mysql_table)
logger.info(f"Found {len(partitions)} partitions for {mysql_table}") logger.info(f"Found {len(partitions)} partitions for {mysql_table}")
# If specific partition requested, filter to just that one
if partition:
if partition not in partitions:
raise ValueError(f"Partition {partition} not found. Available: {partitions}")
partitions = [partition]
logger.info(f"Filtering to only partition: {partition}")
for partition_idx, partition in enumerate(partitions, 1): for partition_idx, partition in enumerate(partitions, 1):
# Skip partitions already completed in previous run # Skip partitions already completed in previous run
if last_completed_partition and partition <= last_completed_partition: if last_completed_partition and partition <= last_completed_partition:
@@ -417,7 +425,8 @@ class FullMigrator:
def run_full_migration( def run_full_migration(
table: str, table: str,
dry_run: bool = False, dry_run: bool = False,
resume: bool = False resume: bool = False,
partition: str = None
) -> int: ) -> int:
"""Run full migration for a table. """Run full migration for a table.
@@ -425,9 +434,10 @@ def run_full_migration(
table: Table name to migrate table: Table name to migrate
dry_run: If True, show what would be done without modifying data dry_run: If True, show what would be done without modifying data
resume: If True, resume from last checkpoint instead of starting fresh resume: If True, resume from last checkpoint instead of starting fresh
partition: If specified, only migrate this partition (for testing/debugging)
Returns: Returns:
Number of rows migrated in this run Number of rows migrated in this run
""" """
migrator = FullMigrator(table) migrator = FullMigrator(table)
return migrator.migrate(dry_run=dry_run, resume=resume) return migrator.migrate(dry_run=dry_run, resume=resume, partition=partition)