Files
mysql2postgres/main.py
alex 62577d3200 feat: Add MySQL to PostgreSQL migration tool with JSONB transformation
Implement comprehensive migration solution with:
- Full and incremental migration modes
- JSONB schema transformation for RAWDATACOR and ELABDATADISP tables
- Native PostgreSQL partitioning (2014-2031)
- Optimized GIN indexes for JSONB queries
- Rich logging with progress tracking
- Complete benchmark system for MySQL vs PostgreSQL comparison
- CLI interface with multiple commands (setup, migrate, benchmark)
- Configuration management via .env file
- Error handling and retry logic
- Batch processing for performance (configurable batch size)

Database transformations:
- RAWDATACOR: 16 Val columns + units → single JSONB measurements
- ELABDATADISP: 25+ measurement fields → structured JSONB with categories

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2025-12-10 19:57:11 +01:00

198 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MySQL to PostgreSQL migration tool CLI."""
import click
import sys
from pathlib import Path
from config import get_settings
from src.utils.logger import setup_logger, get_logger
from src.transformers.schema_transformer import get_full_schema_script
from src.migrator.full_migration import run_full_migration
from src.migrator.incremental_migration import run_incremental_migration
from src.benchmark.performance_test import run_benchmark
from src.connectors.postgres_connector import PostgreSQLConnector
logger = get_logger(__name__)
@click.group()
@click.pass_context
def cli(ctx):
"""MySQL to PostgreSQL migration tool with performance benchmarking."""
setup_logger(__name__)
ctx.ensure_object(dict)
@cli.command()
@click.option(
"--create-schema",
is_flag=True,
help="Create PostgreSQL schema and partitions"
)
def setup(create_schema):
"""Setup PostgreSQL database."""
setup_logger(__name__)
if not create_schema:
click.echo("Usage: python main.py setup --create-schema")
click.echo("Create PostgreSQL schema and partitions")
return
try:
with PostgreSQLConnector() as pg_conn:
logger.info("Creating PostgreSQL schema...")
schema_script = get_full_schema_script()
pg_conn.execute_script(schema_script)
logger.info("✓ Schema creation complete")
click.echo("✓ PostgreSQL schema created successfully")
except Exception as e:
logger.error(f"Setup failed: {e}")
click.echo(f"✗ Setup failed: {e}", err=True)
sys.exit(1)
@cli.group()
def migrate():
"""Migrate data from MySQL to PostgreSQL."""
pass
@migrate.command()
@click.option(
"--table",
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
default="all",
help="Table to migrate (default: all)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be done without modifying data"
)
def full(table, dry_run):
"""Perform full migration of all data."""
setup_logger(__name__)
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
try:
total_migrated = 0
for tbl in tables:
click.echo(f"\nMigrating {tbl}...")
migrated = run_full_migration(tbl, dry_run=dry_run)
total_migrated += migrated
click.echo(f"{tbl}: {migrated} rows migrated")
click.echo(f"\n✓ Full migration complete: {total_migrated} total rows migrated")
except Exception as e:
logger.error(f"Migration failed: {e}")
click.echo(f"✗ Migration failed: {e}", err=True)
sys.exit(1)
@migrate.command()
@click.option(
"--table",
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
default="all",
help="Table to migrate (default: all)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be done without modifying data"
)
@click.option(
"--state-file",
default="migration_state.json",
help="Path to migration state file"
)
def incremental(table, dry_run, state_file):
"""Perform incremental migration since last sync."""
setup_logger(__name__)
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
try:
total_migrated = 0
for tbl in tables:
click.echo(f"\nIncremental migration for {tbl}...")
migrated = run_incremental_migration(tbl, dry_run=dry_run, state_file=state_file)
total_migrated += migrated
if migrated > 0:
click.echo(f"{tbl}: {migrated} rows migrated")
else:
click.echo(f" {tbl}: No new rows to migrate")
if total_migrated == 0:
click.echo("\n No rows to migrate")
else:
click.echo(f"\n✓ Incremental migration complete: {total_migrated} total rows migrated")
except Exception as e:
logger.error(f"Incremental migration failed: {e}")
click.echo(f"✗ Incremental migration failed: {e}", err=True)
sys.exit(1)
@cli.command()
@click.option(
"--iterations",
type=int,
default=None,
help="Number of iterations per query (default from config)"
)
@click.option(
"--output",
type=click.Path(),
default=None,
help="Output file path (default: benchmark_results/benchmark_TIMESTAMP.json)"
)
def benchmark(iterations, output):
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
setup_logger(__name__)
try:
click.echo("Running performance benchmarks...")
output_file = run_benchmark(iterations=iterations, output_file=output)
click.echo(f"✓ Benchmark complete: results saved to {output_file}")
except Exception as e:
logger.error(f"Benchmark failed: {e}")
click.echo(f"✗ Benchmark failed: {e}", err=True)
sys.exit(1)
@cli.command()
def info():
"""Show configuration information."""
setup_logger(__name__)
settings = get_settings()
click.echo("\n[MySQL Configuration]")
click.echo(f" Host: {settings.mysql.host}:{settings.mysql.port}")
click.echo(f" Database: {settings.mysql.database}")
click.echo(f" User: {settings.mysql.user}")
click.echo("\n[PostgreSQL Configuration]")
click.echo(f" Host: {settings.postgres.host}:{settings.postgres.port}")
click.echo(f" Database: {settings.postgres.database}")
click.echo(f" User: {settings.postgres.user}")
click.echo("\n[Migration Settings]")
click.echo(f" Batch Size: {settings.migration.batch_size}")
click.echo(f" Log Level: {settings.migration.log_level}")
click.echo(f" Dry Run: {settings.migration.dry_run}")
click.echo("\n[Benchmark Settings]")
click.echo(f" Output Directory: {settings.benchmark.output_dir}")
click.echo(f" Iterations: {settings.benchmark.iterations}")
if __name__ == "__main__":
cli(obj={})