Implement comprehensive migration solution with: - Full and incremental migration modes - JSONB schema transformation for RAWDATACOR and ELABDATADISP tables - Native PostgreSQL partitioning (2014-2031) - Optimized GIN indexes for JSONB queries - Rich logging with progress tracking - Complete benchmark system for MySQL vs PostgreSQL comparison - CLI interface with multiple commands (setup, migrate, benchmark) - Configuration management via .env file - Error handling and retry logic - Batch processing for performance (configurable batch size) Database transformations: - RAWDATACOR: 16 Val columns + units → single JSONB measurements - ELABDATADISP: 25+ measurement fields → structured JSONB with categories 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
198 lines
5.8 KiB
Python
198 lines
5.8 KiB
Python
"""MySQL to PostgreSQL migration tool CLI."""
|
||
import click
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
from config import get_settings
|
||
from src.utils.logger import setup_logger, get_logger
|
||
from src.transformers.schema_transformer import get_full_schema_script
|
||
from src.migrator.full_migration import run_full_migration
|
||
from src.migrator.incremental_migration import run_incremental_migration
|
||
from src.benchmark.performance_test import run_benchmark
|
||
from src.connectors.postgres_connector import PostgreSQLConnector
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
@click.group()
|
||
@click.pass_context
|
||
def cli(ctx):
|
||
"""MySQL to PostgreSQL migration tool with performance benchmarking."""
|
||
setup_logger(__name__)
|
||
ctx.ensure_object(dict)
|
||
|
||
|
||
@cli.command()
|
||
@click.option(
|
||
"--create-schema",
|
||
is_flag=True,
|
||
help="Create PostgreSQL schema and partitions"
|
||
)
|
||
def setup(create_schema):
|
||
"""Setup PostgreSQL database."""
|
||
setup_logger(__name__)
|
||
|
||
if not create_schema:
|
||
click.echo("Usage: python main.py setup --create-schema")
|
||
click.echo("Create PostgreSQL schema and partitions")
|
||
return
|
||
|
||
try:
|
||
with PostgreSQLConnector() as pg_conn:
|
||
logger.info("Creating PostgreSQL schema...")
|
||
schema_script = get_full_schema_script()
|
||
pg_conn.execute_script(schema_script)
|
||
logger.info("✓ Schema creation complete")
|
||
click.echo("✓ PostgreSQL schema created successfully")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Setup failed: {e}")
|
||
click.echo(f"✗ Setup failed: {e}", err=True)
|
||
sys.exit(1)
|
||
|
||
|
||
@cli.group()
|
||
def migrate():
|
||
"""Migrate data from MySQL to PostgreSQL."""
|
||
pass
|
||
|
||
|
||
@migrate.command()
|
||
@click.option(
|
||
"--table",
|
||
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
|
||
default="all",
|
||
help="Table to migrate (default: all)"
|
||
)
|
||
@click.option(
|
||
"--dry-run",
|
||
is_flag=True,
|
||
help="Show what would be done without modifying data"
|
||
)
|
||
def full(table, dry_run):
|
||
"""Perform full migration of all data."""
|
||
setup_logger(__name__)
|
||
|
||
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
|
||
|
||
try:
|
||
total_migrated = 0
|
||
|
||
for tbl in tables:
|
||
click.echo(f"\nMigrating {tbl}...")
|
||
migrated = run_full_migration(tbl, dry_run=dry_run)
|
||
total_migrated += migrated
|
||
click.echo(f"✓ {tbl}: {migrated} rows migrated")
|
||
|
||
click.echo(f"\n✓ Full migration complete: {total_migrated} total rows migrated")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Migration failed: {e}")
|
||
click.echo(f"✗ Migration failed: {e}", err=True)
|
||
sys.exit(1)
|
||
|
||
|
||
@migrate.command()
|
||
@click.option(
|
||
"--table",
|
||
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
|
||
default="all",
|
||
help="Table to migrate (default: all)"
|
||
)
|
||
@click.option(
|
||
"--dry-run",
|
||
is_flag=True,
|
||
help="Show what would be done without modifying data"
|
||
)
|
||
@click.option(
|
||
"--state-file",
|
||
default="migration_state.json",
|
||
help="Path to migration state file"
|
||
)
|
||
def incremental(table, dry_run, state_file):
|
||
"""Perform incremental migration since last sync."""
|
||
setup_logger(__name__)
|
||
|
||
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
|
||
|
||
try:
|
||
total_migrated = 0
|
||
|
||
for tbl in tables:
|
||
click.echo(f"\nIncremental migration for {tbl}...")
|
||
migrated = run_incremental_migration(tbl, dry_run=dry_run, state_file=state_file)
|
||
total_migrated += migrated
|
||
if migrated > 0:
|
||
click.echo(f"✓ {tbl}: {migrated} rows migrated")
|
||
else:
|
||
click.echo(f"ℹ {tbl}: No new rows to migrate")
|
||
|
||
if total_migrated == 0:
|
||
click.echo("\nℹ No rows to migrate")
|
||
else:
|
||
click.echo(f"\n✓ Incremental migration complete: {total_migrated} total rows migrated")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Incremental migration failed: {e}")
|
||
click.echo(f"✗ Incremental migration failed: {e}", err=True)
|
||
sys.exit(1)
|
||
|
||
|
||
@cli.command()
|
||
@click.option(
|
||
"--iterations",
|
||
type=int,
|
||
default=None,
|
||
help="Number of iterations per query (default from config)"
|
||
)
|
||
@click.option(
|
||
"--output",
|
||
type=click.Path(),
|
||
default=None,
|
||
help="Output file path (default: benchmark_results/benchmark_TIMESTAMP.json)"
|
||
)
|
||
def benchmark(iterations, output):
|
||
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
|
||
setup_logger(__name__)
|
||
|
||
try:
|
||
click.echo("Running performance benchmarks...")
|
||
output_file = run_benchmark(iterations=iterations, output_file=output)
|
||
click.echo(f"✓ Benchmark complete: results saved to {output_file}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Benchmark failed: {e}")
|
||
click.echo(f"✗ Benchmark failed: {e}", err=True)
|
||
sys.exit(1)
|
||
|
||
|
||
@cli.command()
|
||
def info():
|
||
"""Show configuration information."""
|
||
setup_logger(__name__)
|
||
|
||
settings = get_settings()
|
||
|
||
click.echo("\n[MySQL Configuration]")
|
||
click.echo(f" Host: {settings.mysql.host}:{settings.mysql.port}")
|
||
click.echo(f" Database: {settings.mysql.database}")
|
||
click.echo(f" User: {settings.mysql.user}")
|
||
|
||
click.echo("\n[PostgreSQL Configuration]")
|
||
click.echo(f" Host: {settings.postgres.host}:{settings.postgres.port}")
|
||
click.echo(f" Database: {settings.postgres.database}")
|
||
click.echo(f" User: {settings.postgres.user}")
|
||
|
||
click.echo("\n[Migration Settings]")
|
||
click.echo(f" Batch Size: {settings.migration.batch_size}")
|
||
click.echo(f" Log Level: {settings.migration.log_level}")
|
||
click.echo(f" Dry Run: {settings.migration.dry_run}")
|
||
|
||
click.echo("\n[Benchmark Settings]")
|
||
click.echo(f" Output Directory: {settings.benchmark.output_dir}")
|
||
click.echo(f" Iterations: {settings.benchmark.iterations}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
cli(obj={})
|