Files
mysql2postgres/config.py
alex bb27f749a0 Implement partition-based consolidation for ELABDATADISP
Changed consolidation strategy to leverage MySQL partitioning:
- Added get_table_partitions() to list all partitions
- Added fetch_consolidation_groups_from_partition() to read groups by consolidation key
- Each group (UnitName, ToolNameID, EventDate, EventTime) is fetched completely
- All nodes of same group are consolidated into single row with JSONB measurements
- Process partitions sequentially for predictable memory usage

Key benefits:
- Guaranteed complete consolidation (no fragmentation across batches)
- Deterministic behavior - same group always consolidated together
- Better memory efficiency with partition limits (100k groups per query)
- Clear audit trail of which partition each row came from

Tested with partition d3: 6960 input rows → 100 consolidated rows (69.6:1 ratio)
with groups containing 24-72 nodes each.

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2025-12-25 21:49:30 +01:00

175 lines
5.0 KiB
Python

"""Configuration management using Pydantic settings."""
from pydantic_settings import BaseSettings
from pydantic import ConfigDict
from typing import Optional
import os
class MySQLConfig(BaseSettings):
"""MySQL source database configuration."""
model_config = ConfigDict(
env_prefix="MYSQL_",
case_sensitive=False,
extra="ignore",
env_file=".env",
env_file_encoding="utf-8"
)
host: str
port: int
user: str
password: str
database: str
class PostgreSQLConfig(BaseSettings):
"""PostgreSQL target database configuration."""
model_config = ConfigDict(
env_prefix="POSTGRES_",
case_sensitive=False,
extra="ignore",
env_file=".env",
env_file_encoding="utf-8"
)
host: str
port: int
user: str
password: str
database: str
class MigrationSettings(BaseSettings):
"""Migration settings."""
model_config = ConfigDict(
case_sensitive=False,
extra="ignore",
env_file=".env",
env_file_encoding="utf-8"
)
batch_size: int = 10000
consolidation_group_limit: int = 100000
log_level: str = "INFO"
dry_run: bool = False
class BenchmarkSettings(BaseSettings):
"""Benchmark settings."""
model_config = ConfigDict(
env_prefix="BENCHMARK_",
case_sensitive=False,
extra="ignore",
env_file=".env",
env_file_encoding="utf-8"
)
output_dir: str = "benchmark_results"
iterations: int = 5
class Settings(BaseSettings):
"""All application settings."""
model_config = ConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore"
)
mysql: MySQLConfig
postgres: PostgreSQLConfig
migration: MigrationSettings
benchmark: BenchmarkSettings
@classmethod
def from_env(cls):
"""Load settings from environment variables."""
return cls(
mysql=MySQLConfig(),
postgres=PostgreSQLConfig(),
migration=MigrationSettings(),
benchmark=BenchmarkSettings(),
)
# Lazy load settings
_settings: Optional[Settings] = None
def get_settings() -> Settings:
"""Get application settings, loading from .env if necessary."""
global _settings
if _settings is None:
_settings = Settings.from_env()
return _settings
# Schema transformation definitions
RAWDATACOR_COLUMNS = {
"val_columns": ["Val0", "Val1", "Val2", "Val3", "Val4", "Val5", "Val6", "Val7", "Val8", "Val9", "ValA", "ValB", "ValC", "ValD", "ValE", "ValF"],
"unit_columns": ["Val0_unitmisure", "Val1_unitmisure", "Val2_unitmisure", "Val3_unitmisure", "Val4_unitmisure", "Val5_unitmisure", "Val6_unitmisure", "Val7_unitmisure", "Val8_unitmisure", "Val9_unitmisure", "ValA_unitmisure", "ValB_unitmisure", "ValC_unitmisure", "ValD_unitmisure", "ValE_unitmisure", "ValF_unitmisure"],
}
ELABDATADISP_MEASUREMENT_FIELDS = {
"shifts": ["XShift", "YShift", "ZShift", "HShift", "HShiftDir", "HShift_local"],
"coordinates": ["X", "Y", "Z", "Xstar", "Zstar"],
"kinematics": ["speed", "speed_local", "acceleration", "acceleration_local"],
"sensors": ["T_node", "load_value", "water_level", "pressure"],
"calculated": ["AlfaX", "AlfaY", "Area"],
}
ELABDATADISP_FIELD_MAPPING = {
# shifts mapping (source -> (category, key))
"XShift": ("shifts", "x"),
"YShift": ("shifts", "y"),
"ZShift": ("shifts", "z"),
"HShift": ("shifts", "h"),
"HShiftDir": ("shifts", "h_dir"),
"HShift_local": ("shifts", "h_local"),
# coordinates mapping
"X": ("coordinates", "x"),
"Y": ("coordinates", "y"),
"Z": ("coordinates", "z"),
"Xstar": ("coordinates", "x_star"),
"Zstar": ("coordinates", "z_star"),
# kinematics mapping
"speed": ("kinematics", "speed"),
"speed_local": ("kinematics", "speed_local"),
"acceleration": ("kinematics", "acceleration"),
"acceleration_local": ("kinematics", "acceleration_local"),
# sensors mapping
"T_node": ("sensors", "t_node"),
"load_value": ("sensors", "load_value"),
"water_level": ("sensors", "water_level"),
"pressure": ("sensors", "pressure"),
# calculated mapping
"AlfaX": ("calculated", "alfa_x"),
"AlfaY": ("calculated", "alfa_y"),
"Area": ("calculated", "area"),
}
# PostgreSQL Partition years (from both tables)
PARTITION_YEARS = list(range(2014, 2032)) # 2014-2031
# Table configurations - support both uppercase and lowercase keys
_rawdatacor_config = {
"mysql_table": "RAWDATACOR",
"postgres_table": "rawdatacor",
"primary_key": "id",
"partition_key": "event_timestamp",
}
_elabdatadisp_config = {
"mysql_table": "ELABDATADISP",
"postgres_table": "elabdatadisp",
"primary_key": "idElabData",
"partition_key": "event_timestamp",
}
TABLE_CONFIGS = {
"rawdatacor": _rawdatacor_config,
"RAWDATACOR": _rawdatacor_config,
"elabdatadisp": _elabdatadisp_config,
"ELABDATADISP": _elabdatadisp_config,
}