Files
mysql2postgres/main.py
2026-01-05 14:00:29 +01:00

224 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MySQL to PostgreSQL migration tool CLI."""
import click
import sys
from pathlib import Path
from config import get_settings
from src.utils.logger import setup_logger, get_logger
from src.transformers.schema_transformer import get_full_schema_script
from src.migrator.full_migrator import run_full_migration
from src.migrator.incremental_migrator import run_incremental_migration
from src.migrator.parallel_migrator import run_parallel_migration
from src.benchmark.performance_test import run_benchmark
from src.connectors.postgres_connector import PostgreSQLConnector
logger = get_logger(__name__)
@click.group()
@click.pass_context
def cli(ctx):
"""MySQL to PostgreSQL migration tool with performance benchmarking."""
setup_logger(__name__)
ctx.ensure_object(dict)
@cli.command()
@click.option(
"--create-schema",
is_flag=True,
help="Create PostgreSQL schema and partitions"
)
def setup(create_schema):
"""Setup PostgreSQL database."""
setup_logger("") # Configure root logger to show all module logs
if not create_schema:
click.echo("Usage: python main.py setup --create-schema")
click.echo("Create PostgreSQL schema and partitions")
return
try:
with PostgreSQLConnector() as pg_conn:
logger.info("Creating PostgreSQL schema...")
schema_script = get_full_schema_script()
pg_conn.execute_script(schema_script)
logger.info("✓ Schema creation complete")
click.echo("✓ PostgreSQL schema created successfully")
except Exception as e:
logger.error(f"Setup failed: {e}")
click.echo(f"✗ Setup failed: {e}", err=True)
sys.exit(1)
@cli.group()
def migrate():
"""Migrate data from MySQL to PostgreSQL."""
pass
@migrate.command()
@click.option(
"--table",
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
default="all",
help="Table to migrate (default: all)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be done without modifying data"
)
@click.option(
"--resume",
is_flag=True,
help="Resume from last checkpoint if migration was interrupted"
)
@click.option(
"--partition",
type=str,
default=None,
help="Only migrate this partition (for testing/debugging)"
)
@click.option(
"--parallel",
type=int,
default=None,
help="Number of parallel workers (e.g., --parallel 5 for 5 workers)"
)
def full(table, dry_run, resume, partition, parallel):
"""Perform full migration of all data."""
setup_logger("") # Configure root logger to show all module logs
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
# Validate options
if parallel and partition:
click.echo("✗ Cannot use --parallel with --partition", err=True)
sys.exit(1)
try:
total_migrated = 0
for tbl in tables:
if parallel:
# Parallel migration mode
click.echo(f"\nMigrating {tbl} with {parallel} parallel workers...")
migrated = run_parallel_migration(tbl, num_workers=parallel, dry_run=dry_run, resume=resume)
else:
# Sequential migration mode
click.echo(f"\nMigrating {tbl}" + (f" (partition {partition})" if partition else "") + "...")
migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume, partition=partition)
total_migrated += migrated
click.echo(f"{tbl}: {migrated} rows migrated")
click.echo(f"\n✓ Full migration complete: {total_migrated} total rows migrated")
except Exception as e:
logger.error(f"Migration failed: {e}")
click.echo(f"✗ Migration failed: {e}", err=True)
sys.exit(1)
@migrate.command()
@click.option(
"--table",
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
default="all",
help="Table to migrate (default: all)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be done without modifying data"
)
def incremental(table, dry_run):
"""Perform incremental migration since last sync (based on consolidation keys)."""
setup_logger("") # Set up root logger so all child loggers work
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
try:
total_migrated = 0
for tbl in tables:
click.echo(f"\nIncremental migration for {tbl}...")
migrated = run_incremental_migration(tbl, dry_run=dry_run)
total_migrated += migrated
if migrated > 0:
click.echo(f"{tbl}: {migrated} rows migrated")
else:
click.echo(f" {tbl}: No new rows to migrate")
if total_migrated == 0:
click.echo("\n No rows to migrate")
else:
click.echo(f"\n✓ Incremental migration complete: {total_migrated} total rows migrated")
except Exception as e:
logger.error(f"Incremental migration failed: {e}")
click.echo(f"✗ Incremental migration failed: {e}", err=True)
sys.exit(1)
@cli.command()
@click.option(
"--iterations",
type=int,
default=None,
help="Number of iterations per query (default from config)"
)
@click.option(
"--output",
type=click.Path(),
default=None,
help="Output file path (default: benchmark_results/benchmark_TIMESTAMP.json)"
)
def benchmark(iterations, output):
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
setup_logger("") # Configure root logger to show all module logs
try:
click.echo("Running performance benchmarks...")
output_file = run_benchmark(iterations=iterations, output_file=output)
click.echo(f"✓ Benchmark complete: results saved to {output_file}")
except Exception as e:
logger.error(f"Benchmark failed: {e}")
click.echo(f"✗ Benchmark failed: {e}", err=True)
sys.exit(1)
@cli.command()
def info():
"""Show configuration information."""
setup_logger("") # Configure root logger to show all module logs
settings = get_settings()
click.echo("\n[MySQL Configuration]")
click.echo(f" Host: {settings.mysql.host}:{settings.mysql.port}")
click.echo(f" Database: {settings.mysql.database}")
click.echo(f" User: {settings.mysql.user}")
click.echo("\n[PostgreSQL Configuration]")
click.echo(f" Host: {settings.postgres.host}:{settings.postgres.port}")
click.echo(f" Database: {settings.postgres.database}")
click.echo(f" User: {settings.postgres.user}")
click.echo("\n[Migration Settings]")
click.echo(f" Batch Size: {settings.migration.batch_size}")
click.echo(f" Consolidation Group Limit: {settings.migration.consolidation_group_limit}")
click.echo(f" Log Level: {settings.migration.log_level}")
click.echo(f" Dry Run: {settings.migration.dry_run}")
click.echo("\n[Benchmark Settings]")
click.echo(f" Output Directory: {settings.benchmark.output_dir}")
click.echo(f" Iterations: {settings.benchmark.iterations}")
if __name__ == "__main__":
cli(obj={})