Files
mysql2postgres/main.py
alex 58624c0866 feat: Add --partition flag to migrate only specific partition
Allows testing/debugging by migrating only a single partition instead of
the entire table. Useful for:
- Testing consolidation on specific partitions
- Quick verification of fixes without full migration
- Targeted debugging

Usage:
    python3 main.py migrate full --table ELABDATADISP --partition d10
    python3 main.py migrate full --table RAWDATACOR --partition d11 --resume

Changes:
- Add partition parameter to FullMigrator.migrate()
- Filter partitions list to only specified partition if provided
- Validate partition exists in available partitions
- Add --partition CLI option to migrate full command
- Update message to show partition in progress

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2025-12-27 09:05:11 +01:00

209 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MySQL to PostgreSQL migration tool CLI."""
import click
import sys
from pathlib import Path
from config import get_settings
from src.utils.logger import setup_logger, get_logger
from src.transformers.schema_transformer import get_full_schema_script
from src.migrator.full_migration import run_full_migration
from src.migrator.incremental_migration import run_incremental_migration
from src.benchmark.performance_test import run_benchmark
from src.connectors.postgres_connector import PostgreSQLConnector
logger = get_logger(__name__)
@click.group()
@click.pass_context
def cli(ctx):
"""MySQL to PostgreSQL migration tool with performance benchmarking."""
setup_logger(__name__)
ctx.ensure_object(dict)
@cli.command()
@click.option(
"--create-schema",
is_flag=True,
help="Create PostgreSQL schema and partitions"
)
def setup(create_schema):
"""Setup PostgreSQL database."""
setup_logger(__name__)
if not create_schema:
click.echo("Usage: python main.py setup --create-schema")
click.echo("Create PostgreSQL schema and partitions")
return
try:
with PostgreSQLConnector() as pg_conn:
logger.info("Creating PostgreSQL schema...")
schema_script = get_full_schema_script()
pg_conn.execute_script(schema_script)
logger.info("✓ Schema creation complete")
click.echo("✓ PostgreSQL schema created successfully")
except Exception as e:
logger.error(f"Setup failed: {e}")
click.echo(f"✗ Setup failed: {e}", err=True)
sys.exit(1)
@cli.group()
def migrate():
"""Migrate data from MySQL to PostgreSQL."""
pass
@migrate.command()
@click.option(
"--table",
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
default="all",
help="Table to migrate (default: all)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be done without modifying data"
)
@click.option(
"--resume",
is_flag=True,
help="Resume from last checkpoint if migration was interrupted"
)
@click.option(
"--partition",
type=str,
default=None,
help="Only migrate this partition (for testing/debugging)"
)
def full(table, dry_run, resume, partition):
"""Perform full migration of all data."""
setup_logger(__name__)
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
try:
total_migrated = 0
for tbl in tables:
click.echo(f"\nMigrating {tbl}" + (f" (partition {partition})" if partition else "") + "...")
migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume, partition=partition)
total_migrated += migrated
click.echo(f"{tbl}: {migrated} rows migrated")
click.echo(f"\n✓ Full migration complete: {total_migrated} total rows migrated")
except Exception as e:
logger.error(f"Migration failed: {e}")
click.echo(f"✗ Migration failed: {e}", err=True)
sys.exit(1)
@migrate.command()
@click.option(
"--table",
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
default="all",
help="Table to migrate (default: all)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be done without modifying data"
)
@click.option(
"--state-file",
default="migration_state.json",
help="Path to migration state file"
)
def incremental(table, dry_run, state_file):
"""Perform incremental migration since last sync."""
setup_logger(__name__)
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
try:
total_migrated = 0
for tbl in tables:
click.echo(f"\nIncremental migration for {tbl}...")
migrated = run_incremental_migration(tbl, dry_run=dry_run, state_file=state_file)
total_migrated += migrated
if migrated > 0:
click.echo(f"{tbl}: {migrated} rows migrated")
else:
click.echo(f" {tbl}: No new rows to migrate")
if total_migrated == 0:
click.echo("\n No rows to migrate")
else:
click.echo(f"\n✓ Incremental migration complete: {total_migrated} total rows migrated")
except Exception as e:
logger.error(f"Incremental migration failed: {e}")
click.echo(f"✗ Incremental migration failed: {e}", err=True)
sys.exit(1)
@cli.command()
@click.option(
"--iterations",
type=int,
default=None,
help="Number of iterations per query (default from config)"
)
@click.option(
"--output",
type=click.Path(),
default=None,
help="Output file path (default: benchmark_results/benchmark_TIMESTAMP.json)"
)
def benchmark(iterations, output):
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
setup_logger(__name__)
try:
click.echo("Running performance benchmarks...")
output_file = run_benchmark(iterations=iterations, output_file=output)
click.echo(f"✓ Benchmark complete: results saved to {output_file}")
except Exception as e:
logger.error(f"Benchmark failed: {e}")
click.echo(f"✗ Benchmark failed: {e}", err=True)
sys.exit(1)
@cli.command()
def info():
"""Show configuration information."""
setup_logger(__name__)
settings = get_settings()
click.echo("\n[MySQL Configuration]")
click.echo(f" Host: {settings.mysql.host}:{settings.mysql.port}")
click.echo(f" Database: {settings.mysql.database}")
click.echo(f" User: {settings.mysql.user}")
click.echo("\n[PostgreSQL Configuration]")
click.echo(f" Host: {settings.postgres.host}:{settings.postgres.port}")
click.echo(f" Database: {settings.postgres.database}")
click.echo(f" User: {settings.postgres.user}")
click.echo("\n[Migration Settings]")
click.echo(f" Batch Size: {settings.migration.batch_size}")
click.echo(f" Log Level: {settings.migration.log_level}")
click.echo(f" Dry Run: {settings.migration.dry_run}")
click.echo("\n[Benchmark Settings]")
click.echo(f" Output Directory: {settings.benchmark.output_dir}")
click.echo(f" Iterations: {settings.benchmark.iterations}")
if __name__ == "__main__":
cli(obj={})