feat: Add MySQL to PostgreSQL migration tool with JSONB transformation

Implement comprehensive migration solution with:
- Full and incremental migration modes
- JSONB schema transformation for RAWDATACOR and ELABDATADISP tables
- Native PostgreSQL partitioning (2014-2031)
- Optimized GIN indexes for JSONB queries
- Rich logging with progress tracking
- Complete benchmark system for MySQL vs PostgreSQL comparison
- CLI interface with multiple commands (setup, migrate, benchmark)
- Configuration management via .env file
- Error handling and retry logic
- Batch processing for performance (configurable batch size)

Database transformations:
- RAWDATACOR: 16 Val columns + units → single JSONB measurements
- ELABDATADISP: 25+ measurement fields → structured JSONB with categories

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-10 19:57:11 +01:00
commit 62577d3200
24 changed files with 2075 additions and 0 deletions

22
.env.example Normal file
View File

@@ -0,0 +1,22 @@
# MySQL Source Database
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=your_mysql_password
MYSQL_DATABASE=your_database_name
# PostgreSQL Target Database (container Incus)
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=postgres
POSTGRES_PASSWORD=your_postgres_password
POSTGRES_DATABASE=migrated_db
# Migration Settings
BATCH_SIZE=10000
LOG_LEVEL=INFO
DRY_RUN=false
# Performance Testing
BENCHMARK_OUTPUT_DIR=benchmark_results
BENCHMARK_ITERATIONS=5

32
.gitignore vendored Normal file
View File

@@ -0,0 +1,32 @@
# Environment variables
.env
.env.local
.env.*.local
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store
# Testing
.pytest_cache/
.coverage
# Project specific
*.log
migration_state.json
benchmark_results/

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.14

BIN
README.md Normal file

Binary file not shown.

154
config.py Normal file
View File

@@ -0,0 +1,154 @@
"""Configuration management using Pydantic settings."""
from pydantic_settings import BaseSettings
from typing import Optional
import os
class DatabaseConfig(BaseSettings):
"""Database configuration."""
host: str
port: int
user: str
password: str
database: str
class Config:
env_prefix: str = ""
class MySQLConfig(DatabaseConfig):
"""MySQL source database configuration."""
class Config:
env_prefix: str = "MYSQL_"
class PostgreSQLConfig(DatabaseConfig):
"""PostgreSQL target database configuration."""
class Config:
env_prefix: str = "POSTGRES_"
class MigrationSettings(BaseSettings):
"""Migration settings."""
batch_size: int = 10000
log_level: str = "INFO"
dry_run: bool = False
class Config:
env_file = ".env"
case_sensitive = False
class BenchmarkSettings(BaseSettings):
"""Benchmark settings."""
output_dir: str = "benchmark_results"
iterations: int = 5
class Config:
env_prefix: str = "BENCHMARK_"
env_file = ".env"
case_sensitive = False
class Settings(BaseSettings):
"""All application settings."""
mysql: MySQLConfig
postgres: PostgreSQLConfig
migration: MigrationSettings
benchmark: BenchmarkSettings
class Config:
env_file = ".env"
case_sensitive = False
@classmethod
def from_env(cls):
"""Load settings from environment variables."""
return cls(
mysql=MySQLConfig(),
postgres=PostgreSQLConfig(),
migration=MigrationSettings(),
benchmark=BenchmarkSettings(),
)
# Lazy load settings
_settings: Optional[Settings] = None
def get_settings() -> Settings:
"""Get application settings, loading from .env if necessary."""
global _settings
if _settings is None:
_settings = Settings.from_env()
return _settings
# Schema transformation definitions
RAWDATACOR_COLUMNS = {
"val_columns": ["Val0", "Val1", "Val2", "Val3", "Val4", "Val5", "Val6", "Val7", "Val8", "Val9", "ValA", "ValB", "ValC", "ValD", "ValE", "ValF"],
"unit_columns": ["Val0_unitmisure", "Val1_unitmisure", "Val2_unitmisure", "Val3_unitmisure", "Val4_unitmisure", "Val5_unitmisure", "Val6_unitmisure", "Val7_unitmisure", "Val8_unitmisure", "Val9_unitmisure", "ValA_unitmisure", "ValB_unitmisure", "ValC_unitmisure", "ValD_unitmisure", "ValE_unitmisure", "ValF_unitmisure"],
}
ELABDATADISP_MEASUREMENT_FIELDS = {
"shifts": ["XShift", "YShift", "ZShift", "HShift", "HShiftDir", "HShift_local"],
"coordinates": ["X", "Y", "Z", "Xstar", "Zstar"],
"kinematics": ["speed", "speed_local", "acceleration", "acceleration_local"],
"sensors": ["T_node", "load_value", "water_level", "pressure"],
"calculated": ["AlfaX", "AlfaY", "Area"],
}
ELABDATADISP_FIELD_MAPPING = {
# shifts mapping (source -> (category, key))
"XShift": ("shifts", "x"),
"YShift": ("shifts", "y"),
"ZShift": ("shifts", "z"),
"HShift": ("shifts", "h"),
"HShiftDir": ("shifts", "h_dir"),
"HShift_local": ("shifts", "h_local"),
# coordinates mapping
"X": ("coordinates", "x"),
"Y": ("coordinates", "y"),
"Z": ("coordinates", "z"),
"Xstar": ("coordinates", "x_star"),
"Zstar": ("coordinates", "z_star"),
# kinematics mapping
"speed": ("kinematics", "speed"),
"speed_local": ("kinematics", "speed_local"),
"acceleration": ("kinematics", "acceleration"),
"acceleration_local": ("kinematics", "acceleration_local"),
# sensors mapping
"T_node": ("sensors", "t_node"),
"load_value": ("sensors", "load_value"),
"water_level": ("sensors", "water_level"),
"pressure": ("sensors", "pressure"),
# calculated mapping
"AlfaX": ("calculated", "alfa_x"),
"AlfaY": ("calculated", "alfa_y"),
"Area": ("calculated", "area"),
}
# PostgreSQL Partition years (from both tables)
PARTITION_YEARS = list(range(2014, 2032)) # 2014-2031
# Table configurations
TABLE_CONFIGS = {
"rawdatacor": {
"mysql_table": "RAWDATACOR",
"postgres_table": "rawdatacor",
"primary_key": "id",
"partition_key": "event_date",
},
"elabdatadisp": {
"mysql_table": "ELABDATADISP",
"postgres_table": "elabdatadisp",
"primary_key": "idElabData",
"partition_key": "event_date",
},
}

197
main.py Normal file
View File

@@ -0,0 +1,197 @@
"""MySQL to PostgreSQL migration tool CLI."""
import click
import sys
from pathlib import Path
from config import get_settings
from src.utils.logger import setup_logger, get_logger
from src.transformers.schema_transformer import get_full_schema_script
from src.migrator.full_migration import run_full_migration
from src.migrator.incremental_migration import run_incremental_migration
from src.benchmark.performance_test import run_benchmark
from src.connectors.postgres_connector import PostgreSQLConnector
logger = get_logger(__name__)
@click.group()
@click.pass_context
def cli(ctx):
"""MySQL to PostgreSQL migration tool with performance benchmarking."""
setup_logger(__name__)
ctx.ensure_object(dict)
@cli.command()
@click.option(
"--create-schema",
is_flag=True,
help="Create PostgreSQL schema and partitions"
)
def setup(create_schema):
"""Setup PostgreSQL database."""
setup_logger(__name__)
if not create_schema:
click.echo("Usage: python main.py setup --create-schema")
click.echo("Create PostgreSQL schema and partitions")
return
try:
with PostgreSQLConnector() as pg_conn:
logger.info("Creating PostgreSQL schema...")
schema_script = get_full_schema_script()
pg_conn.execute_script(schema_script)
logger.info("✓ Schema creation complete")
click.echo("✓ PostgreSQL schema created successfully")
except Exception as e:
logger.error(f"Setup failed: {e}")
click.echo(f"✗ Setup failed: {e}", err=True)
sys.exit(1)
@cli.group()
def migrate():
"""Migrate data from MySQL to PostgreSQL."""
pass
@migrate.command()
@click.option(
"--table",
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
default="all",
help="Table to migrate (default: all)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be done without modifying data"
)
def full(table, dry_run):
"""Perform full migration of all data."""
setup_logger(__name__)
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
try:
total_migrated = 0
for tbl in tables:
click.echo(f"\nMigrating {tbl}...")
migrated = run_full_migration(tbl, dry_run=dry_run)
total_migrated += migrated
click.echo(f"{tbl}: {migrated} rows migrated")
click.echo(f"\n✓ Full migration complete: {total_migrated} total rows migrated")
except Exception as e:
logger.error(f"Migration failed: {e}")
click.echo(f"✗ Migration failed: {e}", err=True)
sys.exit(1)
@migrate.command()
@click.option(
"--table",
type=click.Choice(["RAWDATACOR", "ELABDATADISP", "all"]),
default="all",
help="Table to migrate (default: all)"
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would be done without modifying data"
)
@click.option(
"--state-file",
default="migration_state.json",
help="Path to migration state file"
)
def incremental(table, dry_run, state_file):
"""Perform incremental migration since last sync."""
setup_logger(__name__)
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
try:
total_migrated = 0
for tbl in tables:
click.echo(f"\nIncremental migration for {tbl}...")
migrated = run_incremental_migration(tbl, dry_run=dry_run, state_file=state_file)
total_migrated += migrated
if migrated > 0:
click.echo(f"{tbl}: {migrated} rows migrated")
else:
click.echo(f" {tbl}: No new rows to migrate")
if total_migrated == 0:
click.echo("\n No rows to migrate")
else:
click.echo(f"\n✓ Incremental migration complete: {total_migrated} total rows migrated")
except Exception as e:
logger.error(f"Incremental migration failed: {e}")
click.echo(f"✗ Incremental migration failed: {e}", err=True)
sys.exit(1)
@cli.command()
@click.option(
"--iterations",
type=int,
default=None,
help="Number of iterations per query (default from config)"
)
@click.option(
"--output",
type=click.Path(),
default=None,
help="Output file path (default: benchmark_results/benchmark_TIMESTAMP.json)"
)
def benchmark(iterations, output):
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
setup_logger(__name__)
try:
click.echo("Running performance benchmarks...")
output_file = run_benchmark(iterations=iterations, output_file=output)
click.echo(f"✓ Benchmark complete: results saved to {output_file}")
except Exception as e:
logger.error(f"Benchmark failed: {e}")
click.echo(f"✗ Benchmark failed: {e}", err=True)
sys.exit(1)
@cli.command()
def info():
"""Show configuration information."""
setup_logger(__name__)
settings = get_settings()
click.echo("\n[MySQL Configuration]")
click.echo(f" Host: {settings.mysql.host}:{settings.mysql.port}")
click.echo(f" Database: {settings.mysql.database}")
click.echo(f" User: {settings.mysql.user}")
click.echo("\n[PostgreSQL Configuration]")
click.echo(f" Host: {settings.postgres.host}:{settings.postgres.port}")
click.echo(f" Database: {settings.postgres.database}")
click.echo(f" User: {settings.postgres.user}")
click.echo("\n[Migration Settings]")
click.echo(f" Batch Size: {settings.migration.batch_size}")
click.echo(f" Log Level: {settings.migration.log_level}")
click.echo(f" Dry Run: {settings.migration.dry_run}")
click.echo("\n[Benchmark Settings]")
click.echo(f" Output Directory: {settings.benchmark.output_dir}")
click.echo(f" Iterations: {settings.benchmark.iterations}")
if __name__ == "__main__":
cli(obj={})

15
pyproject.toml Normal file
View File

@@ -0,0 +1,15 @@
[project]
name = "mysql2postgres"
version = "0.1.0"
description = "Robust MySQL to PostgreSQL migration tool with schema transformation and performance benchmarking"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"pymysql>=1.1.0",
"psycopg[binary]>=3.1.0",
"python-dotenv>=1.0.0",
"click>=8.1.0",
"rich>=13.0.0",
"pydantic>=2.5.0",
"pydantic-settings>=2.1.0",
]

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""MySQL to PostgreSQL migration tool."""

View File

View File

@@ -0,0 +1,263 @@
"""Performance benchmarking for MySQL vs PostgreSQL."""
import json
import time
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path
from datetime import datetime
import statistics
from config import get_settings
from src.connectors.mysql_connector import MySQLConnector
from src.connectors.postgres_connector import PostgreSQLConnector
from src.benchmark.query_generator import BenchmarkQueryGenerator
from src.utils.logger import get_logger, setup_logger
logger = get_logger(__name__)
class PerformanceBenchmark:
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
def __init__(self, iterations: int = 5):
"""Initialize benchmark runner.
Args:
iterations: Number of times to run each query
"""
self.iterations = iterations
self.settings = get_settings()
self.results = {}
def run_all_benchmarks(self) -> Dict[str, Any]:
"""Run all benchmarks.
Returns:
Benchmark results dictionary
"""
setup_logger(__name__)
logger.info(f"Starting performance benchmarks ({self.iterations} iterations per query)")
all_queries = BenchmarkQueryGenerator.get_all_benchmark_queries()
results = {
"timestamp": datetime.utcnow().isoformat(),
"iterations": self.iterations,
"tables": {},
}
for table_name, query_categories in all_queries.items():
logger.info(f"Benchmarking {table_name}...")
table_results = self._benchmark_table(table_name, query_categories)
results["tables"][table_name] = table_results
return results
def _benchmark_table(
self,
table: str,
query_categories: Dict[str, List[Tuple[str, str]]]
) -> Dict[str, Any]:
"""Benchmark queries for a specific table.
Args:
table: Table name
query_categories: Dictionary of query categories
Returns:
Benchmark results for the table
"""
results = {}
try:
with MySQLConnector() as mysql_conn:
with PostgreSQLConnector() as pg_conn:
for category, queries in query_categories.items():
logger.debug(f" Benchmarking {category}...")
results[category] = self._benchmark_query_pair(
mysql_conn,
pg_conn,
queries[0],
category
)
except Exception as e:
logger.error(f"Benchmark failed: {e}")
raise
return results
def _benchmark_query_pair(
self,
mysql_conn: MySQLConnector,
pg_conn: PostgreSQLConnector,
queries: Tuple[str, str],
category: str
) -> Dict[str, Any]:
"""Benchmark a pair of MySQL and PostgreSQL queries.
Args:
mysql_conn: MySQL connector
pg_conn: PostgreSQL connector
queries: Tuple of (mysql_query, postgres_query)
category: Query category name
Returns:
Benchmark results for the query pair
"""
mysql_query, pg_query = queries
result = {
"category": category,
"mysql": None,
"postgres": None,
}
# Benchmark MySQL query
if mysql_query:
try:
times = []
for _ in range(self.iterations):
start = time.perf_counter()
with mysql_conn.connection.cursor() as cursor:
cursor.execute(mysql_query)
rows = cursor.fetchall()
end = time.perf_counter()
times.append((end - start) * 1000) # Convert to ms
result["mysql"] = self._calculate_stats(times, len(rows) if rows else 0)
logger.debug(f" MySQL {category}: {result['mysql']['mean']:.2f}ms")
except Exception as e:
logger.warning(f"MySQL query failed: {e}")
result["mysql"] = {"error": str(e)}
# Benchmark PostgreSQL query
if pg_query:
try:
times = []
for _ in range(self.iterations):
start = time.perf_counter()
with pg_conn.connection.cursor() as cursor:
cursor.execute(pg_query)
rows = cursor.fetchall()
end = time.perf_counter()
times.append((end - start) * 1000) # Convert to ms
result["postgres"] = self._calculate_stats(times, len(rows) if rows else 0)
logger.debug(f" PostgreSQL {category}: {result['postgres']['mean']:.2f}ms")
except Exception as e:
logger.warning(f"PostgreSQL query failed: {e}")
result["postgres"] = {"error": str(e)}
return result
@staticmethod
def _calculate_stats(times: List[float], row_count: int = 0) -> Dict[str, float]:
"""Calculate statistics for a list of execution times.
Args:
times: List of execution times in milliseconds
row_count: Number of rows returned (for throughput calculation)
Returns:
Dictionary with statistics
"""
if not times:
return {}
return {
"min": min(times),
"max": max(times),
"mean": statistics.mean(times),
"median": statistics.median(times),
"stdev": statistics.stdev(times) if len(times) > 1 else 0,
"p95": sorted(times)[int(len(times) * 0.95)] if len(times) > 1 else times[0],
"row_count": row_count,
"throughput": (row_count / (statistics.mean(times) / 1000)) if times and statistics.mean(times) > 0 else 0,
}
def save_results(self, results: Dict[str, Any], output_file: Optional[str] = None) -> str:
"""Save benchmark results to file.
Args:
results: Benchmark results
output_file: Output file path (uses default from config if None)
Returns:
Path to output file
"""
if output_file is None:
output_dir = Path(self.settings.benchmark.output_dir)
output_dir.mkdir(exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
output_file = output_dir / f"benchmark_{timestamp}.json"
else:
output_file = Path(output_file)
output_file.parent.mkdir(parents=True, exist_ok=True)
try:
with open(output_file, "w") as f:
json.dump(results, f, indent=2)
logger.info(f"Benchmark results saved to {output_file}")
return str(output_file)
except Exception as e:
logger.error(f"Failed to save results: {e}")
raise
@staticmethod
def print_results(results: Dict[str, Any]) -> None:
"""Print benchmark results in a readable format.
Args:
results: Benchmark results
"""
from rich.console import Console
from rich.table import Table
console = Console()
for table_name, table_results in results.get("tables", {}).items():
console.print(f"\n[bold]{table_name}[/bold]")
for category, query_result in table_results.items():
mysql_result = query_result.get("mysql")
pg_result = query_result.get("postgres")
console.print(f"\n {category}:")
if mysql_result and "mean" in mysql_result:
console.print(
f" MySQL: {mysql_result['mean']:.2f}ms "
f"(min: {mysql_result['min']:.2f}ms, max: {mysql_result['max']:.2f}ms)"
)
if pg_result and "mean" in pg_result:
speedup = mysql_result['mean'] / pg_result['mean'] if mysql_result and 'mean' in mysql_result else 0
console.print(
f" PostgreSQL: {pg_result['mean']:.2f}ms "
f"(min: {pg_result['min']:.2f}ms, max: {pg_result['max']:.2f}ms)"
)
if speedup:
if speedup > 1:
console.print(f" [green]✓ PostgreSQL is {speedup:.1f}x faster[/green]")
else:
console.print(f" [yellow]⚠ MySQL is {1/speedup:.1f}x faster[/yellow]")
def run_benchmark(iterations: Optional[int] = None, output_file: Optional[str] = None) -> str:
"""Run performance benchmark and save results.
Args:
iterations: Number of iterations per query
output_file: Output file path
Returns:
Path to results file
"""
if iterations is None:
settings = get_settings()
iterations = settings.benchmark.iterations
benchmark = PerformanceBenchmark(iterations=iterations)
results = benchmark.run_all_benchmarks()
benchmark.print_results(results)
return benchmark.save_results(results, output_file)

View File

@@ -0,0 +1,173 @@
"""Benchmark query generator for MySQL and PostgreSQL."""
from typing import List, Dict, Tuple, Any
from datetime import datetime, timedelta
import random
class BenchmarkQueryGenerator:
"""Generate benchmark queries for performance testing."""
@staticmethod
def generate_rawdatacor_queries() -> Dict[str, List[Tuple[str, str]]]:
"""Generate benchmark queries for RAWDATACOR table.
Returns:
Dictionary with query categories and (mysql_query, postgres_query) tuples
"""
# Sample data for queries
sample_unit_name = "Unit1"
sample_tool_name = "Tool1"
sample_node_num = 1
sample_date_start = "2024-01-01"
sample_date_end = "2024-01-31"
queries = {
"select_by_pk": [
(
"SELECT * FROM `RAWDATACOR` WHERE `id` = 1000 AND `EventDate` = '2024-01-15'",
"SELECT * FROM rawdatacor WHERE id = 1000 AND event_date = '2024-01-15'"
)
],
"select_by_date_range": [
(
f"SELECT * FROM `RAWDATACOR` WHERE `EventDate` BETWEEN '{sample_date_start}' AND '{sample_date_end}'",
f"SELECT * FROM rawdatacor WHERE event_date BETWEEN '{sample_date_start}' AND '{sample_date_end}'"
)
],
"select_by_unit_tool": [
(
f"SELECT * FROM `RAWDATACOR` WHERE `UnitName` = '{sample_unit_name}' AND `ToolNameID` = '{sample_tool_name}'",
f"SELECT * FROM rawdatacor WHERE unit_name = '{sample_unit_name}' AND tool_name_id = '{sample_tool_name}'"
)
],
"select_count_by_unit": [
(
f"SELECT COUNT(*) FROM `RAWDATACOR` WHERE `UnitName` = '{sample_unit_name}'",
f"SELECT COUNT(*) FROM rawdatacor WHERE unit_name = '{sample_unit_name}'"
)
],
"jsonb_filter_value": [
(
None, # Not applicable for MySQL
f"SELECT * FROM rawdatacor WHERE measurements->>'0'->>'value' IS NOT NULL LIMIT 1000"
)
],
"jsonb_contains": [
(
None, # Not applicable for MySQL
f"SELECT * FROM rawdatacor WHERE measurements ? '0' LIMIT 1000"
)
],
"aggregate_by_date": [
(
"SELECT `EventDate`, COUNT(*) as count FROM `RAWDATACOR` GROUP BY `EventDate` ORDER BY `EventDate`",
"SELECT event_date, COUNT(*) as count FROM rawdatacor GROUP BY event_date ORDER BY event_date"
)
],
"aggregate_with_filter": [
(
f"SELECT `UnitName`, `ToolNameID`, COUNT(*) as count FROM `RAWDATACOR` WHERE `EventDate` >= '{sample_date_start}' GROUP BY `UnitName`, `ToolNameID`",
f"SELECT unit_name, tool_name_id, COUNT(*) as count FROM rawdatacor WHERE event_date >= '{sample_date_start}' GROUP BY unit_name, tool_name_id"
)
],
}
return queries
@staticmethod
def generate_elabdatadisp_queries() -> Dict[str, List[Tuple[str, str]]]:
"""Generate benchmark queries for ELABDATADISP table.
Returns:
Dictionary with query categories and (mysql_query, postgres_query) tuples
"""
sample_unit_name = "Unit1"
sample_tool_name = "Tool1"
sample_date_start = "2024-01-01"
sample_date_end = "2024-01-31"
queries = {
"select_by_pk": [
(
"SELECT * FROM `ELABDATADISP` WHERE `idElabData` = 5000 AND `EventDate` = '2024-01-15'",
"SELECT * FROM elabdatadisp WHERE id_elab_data = 5000 AND event_date = '2024-01-15'"
)
],
"select_by_date_range": [
(
f"SELECT * FROM `ELABDATADISP` WHERE `EventDate` BETWEEN '{sample_date_start}' AND '{sample_date_end}'",
f"SELECT * FROM elabdatadisp WHERE event_date BETWEEN '{sample_date_start}' AND '{sample_date_end}'"
)
],
"select_by_unit_tool": [
(
f"SELECT * FROM `ELABDATADISP` WHERE `UnitName` = '{sample_unit_name}' AND `ToolNameID` = '{sample_tool_name}'",
f"SELECT * FROM elabdatadisp WHERE unit_name = '{sample_unit_name}' AND tool_name_id = '{sample_tool_name}'"
)
],
"jsonb_filter_speed": [
(
None,
f"SELECT * FROM elabdatadisp WHERE measurements->'kinematics'->>'speed' IS NOT NULL LIMIT 1000"
)
],
"jsonb_range_query": [
(
None,
f"SELECT * FROM elabdatadisp WHERE (measurements->'kinematics'->>'speed')::NUMERIC > 1.0 LIMIT 1000"
)
],
"jsonb_nested_contains": [
(
None,
f"SELECT * FROM elabdatadisp WHERE measurements @> '{{\"kinematics\"{{}}}}' LIMIT 1000"
)
],
"aggregate_measurements": [
(
None,
f"SELECT unit_name, AVG((measurements->'kinematics'->>'speed')::NUMERIC) as avg_speed FROM elabdatadisp WHERE event_date >= '{sample_date_start}' GROUP BY unit_name LIMIT 100"
)
],
"count_by_state": [
(
f"SELECT `State`, COUNT(*) as count FROM `ELABDATADISP` GROUP BY `State`",
f"SELECT state, COUNT(*) as count FROM elabdatadisp GROUP BY state"
)
],
}
return queries
@staticmethod
def generate_insert_queries() -> Dict[str, Tuple[str, str]]:
"""Generate INSERT benchmark queries.
Returns:
Dictionary with (mysql_query, postgres_query) tuples
"""
# These are placeholders - actual queries would be generated based on schema
queries = {
"insert_single_rawdatacor": (
"INSERT INTO `RAWDATACOR` (`UnitName`, `ToolNameID`, `NodeNum`, `EventDate`, `EventTime`, `BatLevel`, `Temperature`) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', 3.5, 25.5)",
"INSERT INTO rawdatacor (unit_name, tool_name_id, node_num, event_date, event_time, bat_level, temperature, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', 3.5, 25.5, '{}')"
),
"insert_single_elabdatadisp": (
"INSERT INTO `ELABDATADISP` (`UnitName`, `ToolNameID`, `NodeNum`, `EventDate`, `EventTime`) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00')",
"INSERT INTO elabdatadisp (unit_name, tool_name_id, node_num, event_date, event_time, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', '{}')"
),
}
return queries
@staticmethod
def get_all_benchmark_queries() -> Dict[str, Dict[str, List[Tuple[str, str]]]]:
"""Get all benchmark queries organized by table.
Returns:
Dictionary with table names as keys and query dictionaries as values
"""
return {
"RAWDATACOR": BenchmarkQueryGenerator.generate_rawdatacor_queries(),
"ELABDATADISP": BenchmarkQueryGenerator.generate_elabdatadisp_queries(),
}

View File

View File

@@ -0,0 +1,166 @@
"""MySQL database connector."""
import pymysql
from typing import List, Dict, Any, Optional, Generator
from config import get_settings
from src.utils.logger import get_logger
logger = get_logger(__name__)
class MySQLConnector:
"""Connector for MySQL database."""
def __init__(self):
"""Initialize MySQL connector with settings."""
self.settings = get_settings()
self.connection = None
def connect(self) -> None:
"""Establish connection to MySQL database."""
try:
self.connection = pymysql.connect(
host=self.settings.mysql.host,
port=self.settings.mysql.port,
user=self.settings.mysql.user,
password=self.settings.mysql.password,
database=self.settings.mysql.database,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)
logger.info(
f"Connected to MySQL: {self.settings.mysql.host}:"
f"{self.settings.mysql.port}/{self.settings.mysql.database}"
)
except pymysql.Error as e:
logger.error(f"Failed to connect to MySQL: {e}")
raise
def disconnect(self) -> None:
"""Close connection to MySQL database."""
if self.connection:
self.connection.close()
logger.info("Disconnected from MySQL")
def __enter__(self):
"""Context manager entry."""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disconnect()
def get_row_count(self, table: str) -> int:
"""Get total row count for a table.
Args:
table: Table name
Returns:
Number of rows in the table
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) as count FROM `{table}`")
result = cursor.fetchone()
return result["count"]
except pymysql.Error as e:
logger.error(f"Failed to get row count for {table}: {e}")
raise
def fetch_all_rows(
self,
table: str,
batch_size: Optional[int] = None
) -> Generator[List[Dict[str, Any]], None, None]:
"""Fetch all rows from a table in batches.
Args:
table: Table name
batch_size: Number of rows per batch (uses config default if None)
Yields:
Batches of row dictionaries
"""
if batch_size is None:
batch_size = self.settings.migration.batch_size
offset = 0
while True:
try:
with self.connection.cursor() as cursor:
query = f"SELECT * FROM `{table}` LIMIT %s OFFSET %s"
cursor.execute(query, (batch_size, offset))
rows = cursor.fetchall()
if not rows:
break
yield rows
offset += len(rows)
except pymysql.Error as e:
logger.error(f"Failed to fetch rows from {table}: {e}")
raise
def fetch_rows_since(
self,
table: str,
since_timestamp: str,
batch_size: Optional[int] = None
) -> Generator[List[Dict[str, Any]], None, None]:
"""Fetch rows modified since a timestamp.
Args:
table: Table name
since_timestamp: ISO format timestamp (e.g., '2024-01-01T00:00:00')
batch_size: Number of rows per batch (uses config default if None)
Yields:
Batches of row dictionaries
"""
if batch_size is None:
batch_size = self.settings.migration.batch_size
offset = 0
timestamp_col = "updated_at" if table == "ELABDATADISP" else "created_at"
while True:
try:
with self.connection.cursor() as cursor:
query = (
f"SELECT * FROM `{table}` "
f"WHERE `{timestamp_col}` > %s "
f"ORDER BY `{timestamp_col}` ASC "
f"LIMIT %s OFFSET %s"
)
cursor.execute(query, (since_timestamp, batch_size, offset))
rows = cursor.fetchall()
if not rows:
break
yield rows
offset += len(rows)
except pymysql.Error as e:
logger.error(f"Failed to fetch rows from {table}: {e}")
raise
def get_table_structure(self, table: str) -> Dict[str, Any]:
"""Get table structure (column info).
Args:
table: Table name
Returns:
Dictionary with column information
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(f"DESCRIBE `{table}`")
columns = cursor.fetchall()
return {col["Field"]: col for col in columns}
except pymysql.Error as e:
logger.error(f"Failed to get structure for {table}: {e}")
raise

View File

@@ -0,0 +1,200 @@
"""PostgreSQL database connector."""
import psycopg
from typing import List, Dict, Any, Optional, Iterator
from psycopg import sql
import json
from config import get_settings
from src.utils.logger import get_logger
logger = get_logger(__name__)
class PostgreSQLConnector:
"""Connector for PostgreSQL database."""
def __init__(self):
"""Initialize PostgreSQL connector with settings."""
self.settings = get_settings()
self.connection = None
def connect(self) -> None:
"""Establish connection to PostgreSQL database."""
try:
self.connection = psycopg.connect(
host=self.settings.postgres.host,
port=self.settings.postgres.port,
user=self.settings.postgres.user,
password=self.settings.postgres.password,
dbname=self.settings.postgres.database,
autocommit=False,
)
logger.info(
f"Connected to PostgreSQL: {self.settings.postgres.host}:"
f"{self.settings.postgres.port}/{self.settings.postgres.database}"
)
except psycopg.Error as e:
logger.error(f"Failed to connect to PostgreSQL: {e}")
raise
def disconnect(self) -> None:
"""Close connection to PostgreSQL database."""
if self.connection:
self.connection.close()
logger.info("Disconnected from PostgreSQL")
def __enter__(self):
"""Context manager entry."""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disconnect()
def execute(self, query: str, params: Optional[tuple] = None) -> None:
"""Execute a query without returning results.
Args:
query: SQL query
params: Query parameters
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(query, params)
self.connection.commit()
except psycopg.Error as e:
self.connection.rollback()
logger.error(f"Query execution failed: {e}\nQuery: {query}")
raise
def execute_script(self, script: str) -> None:
"""Execute multiple SQL statements (script).
Args:
script: SQL script with multiple statements
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(script)
self.connection.commit()
logger.debug("Script executed successfully")
except psycopg.Error as e:
self.connection.rollback()
logger.error(f"Script execution failed: {e}")
raise
def insert_batch(
self,
table: str,
rows: List[Dict[str, Any]],
columns: List[str]
) -> int:
"""Insert a batch of rows using COPY (fast bulk insert).
Args:
table: Table name
rows: List of row dictionaries
columns: Column names in order
Returns:
Number of rows inserted
"""
if not rows:
return 0
try:
with self.connection.cursor() as cursor:
# Prepare COPY data
copy_data = []
for row in rows:
values = []
for col in columns:
val = row.get(col)
if val is None:
values.append("\\N") # NULL representation
elif isinstance(val, (dict, list)):
values.append(json.dumps(val))
elif isinstance(val, str):
# Escape special characters
val = val.replace("\\", "\\\\").replace("\n", "\\n").replace("\t", "\\t")
values.append(val)
else:
values.append(str(val))
copy_data.append("\t".join(values))
# Use COPY for fast insert
copy_sql = f"COPY {table} ({','.join(columns)}) FROM STDIN"
cursor.copy(copy_sql, "\n".join(copy_data).encode())
self.connection.commit()
logger.debug(f"Inserted {len(rows)} rows into {table}")
return len(rows)
except psycopg.Error as e:
self.connection.rollback()
logger.error(f"Batch insert failed: {e}")
raise
def table_exists(self, table: str) -> bool:
"""Check if a table exists.
Args:
table: Table name
Returns:
True if table exists, False otherwise
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(
"SELECT EXISTS("
" SELECT 1 FROM information_schema.tables "
" WHERE table_name = %s"
")",
(table,)
)
return cursor.fetchone()[0]
except psycopg.Error as e:
logger.error(f"Failed to check if table exists: {e}")
raise
def get_max_timestamp(
self,
table: str,
timestamp_col: str = "created_at"
) -> Optional[str]:
"""Get the maximum timestamp from a table.
Args:
table: Table name
timestamp_col: Timestamp column name
Returns:
ISO format timestamp or None if table is empty
"""
try:
with self.connection.cursor() as cursor:
query = f"SELECT MAX({timestamp_col})::text FROM {table}"
cursor.execute(query)
result = cursor.fetchone()
return result[0] if result and result[0] else None
except psycopg.Error as e:
logger.error(f"Failed to get max timestamp: {e}")
raise
def get_row_count(self, table: str) -> int:
"""Get row count for a table.
Args:
table: Table name
Returns:
Number of rows in the table
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
return cursor.fetchone()[0]
except psycopg.Error as e:
logger.error(f"Failed to get row count: {e}")
raise

0
src/migrator/__init__.py Normal file
View File

View File

@@ -0,0 +1,149 @@
"""Full migration from MySQL to PostgreSQL."""
from typing import Optional
from datetime import datetime
import json
from config import get_settings, TABLE_CONFIGS
from src.connectors.mysql_connector import MySQLConnector
from src.connectors.postgres_connector import PostgreSQLConnector
from src.transformers.data_transformer import DataTransformer
from src.utils.logger import get_logger, setup_logger
from src.utils.progress import ProgressTracker
logger = get_logger(__name__)
class FullMigrator:
"""Perform full migration of a table from MySQL to PostgreSQL."""
def __init__(self, table: str):
"""Initialize migrator for a table.
Args:
table: Table name to migrate ('RAWDATACOR' or 'ELABDATADISP')
"""
if table not in TABLE_CONFIGS:
raise ValueError(f"Unknown table: {table}")
self.table = table
self.config = TABLE_CONFIGS[table]
self.settings = get_settings()
def migrate(self, dry_run: bool = False) -> int:
"""Perform full migration of the table.
Args:
dry_run: If True, log what would be done but don't modify data
Returns:
Total number of rows migrated
"""
setup_logger(__name__)
mysql_table = self.config["mysql_table"]
pg_table = self.config["postgres_table"]
logger.info(f"Starting full migration of {mysql_table} -> {pg_table}")
try:
with MySQLConnector() as mysql_conn:
# Get total row count
total_rows = mysql_conn.get_row_count(mysql_table)
logger.info(f"Total rows to migrate: {total_rows}")
if dry_run:
logger.info("[DRY RUN] Would migrate all rows")
return total_rows
with PostgreSQLConnector() as pg_conn:
# Check if table exists
if not pg_conn.table_exists(pg_table):
raise ValueError(
f"PostgreSQL table {pg_table} does not exist. "
"Run 'setup --create-schema' first."
)
migrated = 0
with ProgressTracker(
total_rows,
f"Migrating {mysql_table}"
) as progress:
# Fetch and migrate rows in batches
for batch in mysql_conn.fetch_all_rows(mysql_table):
# Transform batch
transformed = DataTransformer.transform_batch(
mysql_table,
batch
)
# Insert batch
columns = DataTransformer.get_column_order(pg_table)
inserted = pg_conn.insert_batch(
pg_table,
transformed,
columns
)
migrated += inserted
progress.update(inserted)
logger.info(
f"✓ Migration complete: {migrated} rows migrated "
f"to {pg_table}"
)
# Update migration state
self._update_migration_state(pg_conn, migrated)
return migrated
except Exception as e:
logger.error(f"Migration failed: {e}")
raise
def _update_migration_state(
self,
pg_conn: PostgreSQLConnector,
rows_migrated: int
) -> None:
"""Update migration state tracking table.
Args:
pg_conn: PostgreSQL connection
rows_migrated: Number of rows migrated
"""
try:
pg_table = self.config["postgres_table"]
query = f"""
INSERT INTO migration_state
(table_name, last_migrated_timestamp, total_rows_migrated, migration_completed_at, status)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (table_name) DO UPDATE SET
last_migrated_timestamp = EXCLUDED.last_migrated_timestamp,
total_rows_migrated = EXCLUDED.total_rows_migrated,
migration_completed_at = EXCLUDED.migration_completed_at,
status = EXCLUDED.status
"""
now = datetime.utcnow()
pg_conn.execute(query, (pg_table, now, rows_migrated, now, "completed"))
logger.debug("Migration state updated")
except Exception as e:
logger.warning(f"Failed to update migration state: {e}")
def run_full_migration(
table: str,
dry_run: bool = False
) -> int:
"""Run full migration for a table.
Args:
table: Table name to migrate
dry_run: If True, show what would be done without modifying data
Returns:
Number of rows migrated
"""
migrator = FullMigrator(table)
return migrator.migrate(dry_run=dry_run)

View File

@@ -0,0 +1,155 @@
"""Incremental migration from MySQL to PostgreSQL based on timestamps."""
from datetime import datetime
from typing import Optional
from config import get_settings, TABLE_CONFIGS
from src.connectors.mysql_connector import MySQLConnector
from src.connectors.postgres_connector import PostgreSQLConnector
from src.transformers.data_transformer import DataTransformer
from src.utils.logger import get_logger, setup_logger
from src.utils.progress import ProgressTracker
from src.migrator.state import MigrationState
logger = get_logger(__name__)
class IncrementalMigrator:
"""Perform incremental migration based on timestamps."""
def __init__(self, table: str, state_file: str = "migration_state.json"):
"""Initialize incremental migrator.
Args:
table: Table name to migrate
state_file: Path to migration state file
"""
if table not in TABLE_CONFIGS:
raise ValueError(f"Unknown table: {table}")
self.table = table
self.config = TABLE_CONFIGS[table]
self.settings = get_settings()
self.state = MigrationState(state_file)
def migrate(self, dry_run: bool = False) -> int:
"""Perform incremental migration since last sync.
Args:
dry_run: If True, log what would be done but don't modify data
Returns:
Number of rows migrated
"""
setup_logger(__name__)
mysql_table = self.config["mysql_table"]
pg_table = self.config["postgres_table"]
# Get last migration timestamp
last_timestamp = self.state.get_last_timestamp(pg_table)
if last_timestamp is None:
logger.info(
f"No previous migration found for {pg_table}. "
"Use 'migrate --full' for initial migration."
)
return 0
logger.info(
f"Starting incremental migration of {mysql_table} -> {pg_table} "
f"since {last_timestamp}"
)
try:
with MySQLConnector() as mysql_conn:
# Count rows to migrate
timestamp_col = "updated_at" if mysql_table == "ELABDATADISP" else "created_at"
with PostgreSQLConnector() as pg_conn:
# Get max timestamp from PostgreSQL
pg_max_timestamp = pg_conn.get_max_timestamp(
pg_table,
timestamp_col
)
logger.info(f"Last timestamp in PostgreSQL: {pg_max_timestamp}")
if dry_run:
logger.info("[DRY RUN] Would migrate rows after timestamp")
return 0
migrated = 0
migration_start_time = datetime.utcnow().isoformat()
# Fetch and migrate rows in batches
batch_count = 0
for batch in mysql_conn.fetch_rows_since(
mysql_table,
last_timestamp
):
batch_count += 1
if batch_count == 1:
# Create progress tracker with unknown total
progress = ProgressTracker(
len(batch),
f"Migrating {mysql_table} (incremental)"
)
progress.__enter__()
# Transform batch
transformed = DataTransformer.transform_batch(
mysql_table,
batch
)
# Insert batch
columns = DataTransformer.get_column_order(pg_table)
inserted = pg_conn.insert_batch(
pg_table,
transformed,
columns
)
migrated += inserted
progress.update(inserted)
if batch_count == 0:
logger.info(f"No new rows to migrate for {mysql_table}")
return 0
progress.__exit__(None, None, None)
# Update migration state
self.state.set_last_timestamp(pg_table, migration_start_time)
self.state.increment_migration_count(pg_table, migrated)
logger.info(
f"✓ Incremental migration complete: {migrated} rows migrated "
f"to {pg_table}"
)
return migrated
except Exception as e:
logger.error(f"Incremental migration failed: {e}")
raise
def run_incremental_migration(
table: str,
dry_run: bool = False,
state_file: str = "migration_state.json"
) -> int:
"""Run incremental migration for a table.
Args:
table: Table name to migrate
dry_run: If True, show what would be done without modifying data
state_file: Path to migration state file
Returns:
Number of rows migrated
"""
migrator = IncrementalMigrator(table, state_file)
return migrator.migrate(dry_run=dry_run)

105
src/migrator/state.py Normal file
View File

@@ -0,0 +1,105 @@
"""Migration state management."""
import json
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, Any
from src.utils.logger import get_logger
logger = get_logger(__name__)
class MigrationState:
"""Manage migration state for incremental migrations."""
DEFAULT_STATE_FILE = "migration_state.json"
def __init__(self, state_file: str = DEFAULT_STATE_FILE):
"""Initialize migration state.
Args:
state_file: Path to state file
"""
self.state_file = Path(state_file)
self.state = self._load_state()
def _load_state(self) -> Dict[str, Any]:
"""Load state from file."""
if self.state_file.exists():
try:
with open(self.state_file, "r") as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load state file: {e}")
return {}
return {}
def _save_state(self) -> None:
"""Save state to file."""
try:
with open(self.state_file, "w") as f:
json.dump(self.state, f, indent=2)
except Exception as e:
logger.error(f"Failed to save state file: {e}")
raise
def get_last_timestamp(self, table: str) -> Optional[str]:
"""Get last migration timestamp for a table.
Args:
table: Table name
Returns:
ISO format timestamp or None if not found
"""
return self.state.get(table, {}).get("last_timestamp")
def set_last_timestamp(self, table: str, timestamp: str) -> None:
"""Set last migration timestamp for a table.
Args:
table: Table name
timestamp: ISO format timestamp
"""
if table not in self.state:
self.state[table] = {}
self.state[table]["last_timestamp"] = timestamp
self.state[table]["last_updated"] = datetime.utcnow().isoformat()
self._save_state()
def get_migration_count(self, table: str) -> int:
"""Get total migration count for a table.
Args:
table: Table name
Returns:
Total rows migrated
"""
return self.state.get(table, {}).get("total_migrated", 0)
def increment_migration_count(self, table: str, count: int) -> None:
"""Increment migration count for a table.
Args:
table: Table name
count: Number of rows to add
"""
if table not in self.state:
self.state[table] = {}
current = self.state[table].get("total_migrated", 0)
self.state[table]["total_migrated"] = current + count
self._save_state()
def reset(self, table: Optional[str] = None) -> None:
"""Reset migration state.
Args:
table: Table name to reset, or None to reset all
"""
if table:
self.state[table] = {}
else:
self.state = {}
self._save_state()

View File

View File

@@ -0,0 +1,178 @@
"""Data transformation from MySQL to PostgreSQL format."""
from typing import Dict, Any, List
from datetime import datetime
from config import (
RAWDATACOR_COLUMNS,
ELABDATADISP_FIELD_MAPPING,
TABLE_CONFIGS,
)
from src.utils.logger import get_logger
logger = get_logger(__name__)
class DataTransformer:
"""Transform MySQL data to PostgreSQL format."""
@staticmethod
def transform_rawdatacor_row(mysql_row: Dict[str, Any]) -> Dict[str, Any]:
"""Transform a RAWDATACOR row from MySQL to PostgreSQL format.
Args:
mysql_row: Row dictionary from MySQL
Returns:
Transformed row dictionary for PostgreSQL
"""
# Create measurements JSONB
measurements = {}
# Map Val0-ValF with their units
for i, val_col in enumerate(RAWDATACOR_COLUMNS["val_columns"]):
unit_col = RAWDATACOR_COLUMNS["unit_columns"][i]
value = mysql_row.get(val_col)
unit = mysql_row.get(unit_col)
# Only add to JSONB if value is not None
if value is not None:
measurements[str(i)] = {
"value": str(value),
"unit": unit if unit else None,
}
# Create PostgreSQL row
pg_row = {
"id": mysql_row["id"],
"unit_name": mysql_row.get("UnitName"),
"tool_name_id": mysql_row["ToolNameID"],
"node_num": mysql_row["NodeNum"],
"event_date": mysql_row["EventDate"],
"event_time": mysql_row["EventTime"],
"bat_level": mysql_row["BatLevel"],
"temperature": mysql_row["Temperature"],
"measurements": measurements,
"created_at": mysql_row.get("created_at"),
"bat_level_module": mysql_row.get("BatLevelModule"),
"temperature_module": mysql_row.get("TemperatureModule"),
"rssi_module": mysql_row.get("RssiModule"),
}
return pg_row
@staticmethod
def transform_elabdatadisp_row(mysql_row: Dict[str, Any]) -> Dict[str, Any]:
"""Transform an ELABDATADISP row from MySQL to PostgreSQL format.
Args:
mysql_row: Row dictionary from MySQL
Returns:
Transformed row dictionary for PostgreSQL
"""
# Create measurements JSONB with structured categories
measurements = {
"shifts": {},
"coordinates": {},
"kinematics": {},
"sensors": {},
"calculated": {},
}
# Map all measurement fields using the configuration
for mysql_col, (category, pg_key) in ELABDATADISP_FIELD_MAPPING.items():
value = mysql_row.get(mysql_col)
if value is not None:
measurements[category][pg_key] = float(value) if isinstance(value, str) else value
# Remove empty categories
measurements = {
k: v for k, v in measurements.items() if v
}
# Create PostgreSQL row
pg_row = {
"id_elab_data": mysql_row["idElabData"],
"unit_name": mysql_row.get("UnitName"),
"tool_name_id": mysql_row["ToolNameID"],
"node_num": mysql_row["NodeNum"],
"event_date": mysql_row["EventDate"],
"event_time": mysql_row["EventTime"],
"state": mysql_row.get("State"),
"calc_err": mysql_row.get("calcerr", 0),
"measurements": measurements,
"created_at": mysql_row.get("created_at"),
"updated_at": mysql_row.get("updated_at"),
}
return pg_row
@staticmethod
def transform_batch(
table: str,
rows: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Transform a batch of rows from MySQL to PostgreSQL format.
Args:
table: Table name ('RAWDATACOR' or 'ELABDATADISP')
rows: List of row dictionaries from MySQL
Returns:
List of transformed row dictionaries for PostgreSQL
"""
if table == "RAWDATACOR":
return [
DataTransformer.transform_rawdatacor_row(row)
for row in rows
]
elif table == "ELABDATADISP":
return [
DataTransformer.transform_elabdatadisp_row(row)
for row in rows
]
else:
raise ValueError(f"Unknown table: {table}")
@staticmethod
def get_column_order(table: str) -> List[str]:
"""Get the column order for inserting into PostgreSQL.
Args:
table: PostgreSQL table name
Returns:
List of column names in order
"""
if table == "rawdatacor":
return [
"id",
"unit_name",
"tool_name_id",
"node_num",
"event_date",
"event_time",
"bat_level",
"temperature",
"measurements",
"created_at",
"bat_level_module",
"temperature_module",
"rssi_module",
]
elif table == "elabdatadisp":
return [
"id_elab_data",
"unit_name",
"tool_name_id",
"node_num",
"event_date",
"event_time",
"state",
"calc_err",
"measurements",
"created_at",
"updated_at",
]
else:
raise ValueError(f"Unknown table: {table}")

View File

@@ -0,0 +1,149 @@
"""PostgreSQL schema creation from MySQL structure."""
from config import PARTITION_YEARS
from src.utils.logger import get_logger
logger = get_logger(__name__)
def create_rawdatacor_schema() -> str:
"""Create PostgreSQL schema for RAWDATACOR table.
Returns:
SQL script to create the table with partitions
"""
sql = """
-- Create RAWDATACOR table with partitioning
CREATE TABLE IF NOT EXISTS rawdatacor (
id BIGSERIAL NOT NULL,
unit_name VARCHAR(32),
tool_name_id VARCHAR(32) NOT NULL,
node_num INTEGER NOT NULL,
event_date DATE NOT NULL,
event_time TIME NOT NULL,
bat_level NUMERIC(4,2) NOT NULL,
temperature NUMERIC(5,2) NOT NULL,
measurements JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
bat_level_module NUMERIC(4,2),
temperature_module NUMERIC(5,2),
rssi_module INTEGER,
PRIMARY KEY (id, event_date)
) PARTITION BY RANGE (EXTRACT(YEAR FROM event_date));
-- Create partitions for each year
"""
# Add partition creation statements
for year in PARTITION_YEARS:
next_year = year + 1
sql += f"""
CREATE TABLE IF NOT EXISTS rawdatacor_{year}
PARTITION OF rawdatacor
FOR VALUES FROM ({year}) TO ({next_year});
"""
# Add indexes
sql += """
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_raw
ON rawdatacor(unit_name, tool_name_id, node_num, event_date, event_time);
CREATE INDEX IF NOT EXISTS idx_unit_tool_raw
ON rawdatacor(unit_name, tool_name_id);
CREATE INDEX IF NOT EXISTS idx_measurements_gin_raw
ON rawdatacor USING GIN (measurements);
CREATE INDEX IF NOT EXISTS idx_event_date_raw
ON rawdatacor(event_date);
"""
return sql
def create_elabdatadisp_schema() -> str:
"""Create PostgreSQL schema for ELABDATADISP table.
Returns:
SQL script to create the table with partitions
"""
sql = """
-- Create ELABDATADISP table with partitioning
CREATE TABLE IF NOT EXISTS elabdatadisp (
id_elab_data BIGSERIAL NOT NULL,
unit_name VARCHAR(32),
tool_name_id VARCHAR(32) NOT NULL,
node_num INTEGER NOT NULL,
event_date DATE NOT NULL,
event_time TIME NOT NULL,
state VARCHAR(32),
calc_err INTEGER DEFAULT 0,
measurements JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id_elab_data, event_date)
) PARTITION BY RANGE (EXTRACT(YEAR FROM event_date));
-- Create partitions for each year
"""
# Add partition creation statements
for year in PARTITION_YEARS:
next_year = year + 1
sql += f"""
CREATE TABLE IF NOT EXISTS elabdatadisp_{year}
PARTITION OF elabdatadisp
FOR VALUES FROM ({year}) TO ({next_year});
"""
# Add indexes
sql += """
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_elab
ON elabdatadisp(unit_name, tool_name_id, node_num, event_date, event_time);
CREATE INDEX IF NOT EXISTS idx_unit_tool_elab
ON elabdatadisp(unit_name, tool_name_id);
CREATE INDEX IF NOT EXISTS idx_measurements_gin_elab
ON elabdatadisp USING GIN (measurements);
CREATE INDEX IF NOT EXISTS idx_event_date_elab
ON elabdatadisp(event_date);
"""
return sql
def create_migration_state_table() -> str:
"""Create table to track migration state.
Returns:
SQL to create migration_state table
"""
sql = """
-- Create table to track migration state
CREATE TABLE IF NOT EXISTS migration_state (
table_name VARCHAR(255) PRIMARY KEY,
last_migrated_timestamp TIMESTAMP,
last_migrated_id BIGINT,
migration_started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
migration_completed_at TIMESTAMP,
total_rows_migrated BIGINT DEFAULT 0,
status VARCHAR(32) DEFAULT 'pending'
);
"""
return sql
def get_full_schema_script() -> str:
"""Get complete schema creation script for PostgreSQL.
Returns:
Full SQL script to create all tables and indexes
"""
return (
create_rawdatacor_schema() +
"\n\n" +
create_elabdatadisp_schema() +
"\n\n" +
create_migration_state_table()
)

0
src/utils/__init__.py Normal file
View File

42
src/utils/logger.py Normal file
View File

@@ -0,0 +1,42 @@
"""Logging utility with Rich integration."""
import logging
import sys
from pathlib import Path
from rich.logging import RichHandler
from rich.console import Console
from config import get_settings
def setup_logger(name: str) -> logging.Logger:
"""Set up a logger with Rich handler."""
settings = get_settings()
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, settings.migration.log_level))
# Remove existing handlers
logger.handlers.clear()
# Create console handler with Rich
handler = RichHandler(
console=Console(file=sys.stderr),
show_time=True,
show_level=True,
show_path=False,
)
handler.setLevel(getattr(logging, settings.migration.log_level))
# Create formatter
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_logger(name: str) -> logging.Logger:
"""Get or create a logger."""
return logging.getLogger(name)

73
src/utils/progress.py Normal file
View File

@@ -0,0 +1,73 @@
"""Progress tracking utility."""
from rich.progress import (
Progress,
SpinnerColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
TimeElapsedColumn,
TransferSpeedColumn,
)
from rich.console import Console
import time
class ProgressTracker:
"""Track migration progress with Rich progress bar."""
def __init__(self, total: int, description: str = "Migrating"):
"""Initialize progress tracker.
Args:
total: Total number of items to process
description: Description of the task
"""
self.total = total
self.description = description
self.progress = Progress(
SpinnerColumn(),
BarColumn(),
TaskProgressColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
TransferSpeedColumn(),
console=Console(),
)
self.task_id = None
self.start_time = None
self.processed = 0
def __enter__(self):
"""Context manager entry."""
self.progress.start()
self.task_id = self.progress.add_task(
self.description, total=self.total
)
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.progress.stop()
if exc_type is None:
elapsed = time.time() - self.start_time
rate = self.processed / elapsed if elapsed > 0 else 0
self.progress.console.print(
f"[green]✓ Completed: {self.processed}/{self.total} items "
f"in {elapsed:.2f}s ({rate:.0f} items/sec)[/green]"
)
def update(self, advance: int = 1):
"""Update progress.
Args:
advance: Number of items processed
"""
if self.task_id is not None:
self.progress.update(self.task_id, advance=advance)
self.processed += advance
def print_status(self, message: str):
"""Print a status message without interrupting progress bar."""
if self.task_id is not None:
self.progress.print(message)