Implement comprehensive error handling and fix state management bug in incremental migration: Error Logging System: - Add validation for consolidation keys (NULL dates, empty IDs, corrupted Java strings) - Log invalid keys to dedicated error files with detailed reasons - Full migration: migration_errors_<table>_<partition>.log - Incremental migration: migration_errors_<table>_incremental_<timestamp>.log (timestamped to preserve history) - Report total count of skipped invalid keys at migration completion - Auto-delete empty error log files State Tracking Fix: - Fix critical bug where last_key wasn't updated after final buffer flush - Track last_processed_key throughout migration loop - Update state both during periodic flushes and after final flush - Ensures incremental migration correctly resumes from last migrated key Validation Checks: - EventDate IS NULL or EventDate = '0000-00-00' - EventTime IS NULL - ToolNameID IS NULL or empty string - UnitName IS NULL or empty string - UnitName starting with '[L' (corrupted Java strings) Documentation: - Update README.md with error logging behavior - Update MIGRATION_WORKFLOW.md with validation details - Update CHANGELOG.md with new features and fixes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
14 KiB
MySQL to PostgreSQL Migration Tool
Un tool robusto per la migrazione di database MySQL a PostgreSQL con trasformazione di colonne multiple in JSONB, supporto per partizionamento nativo di PostgreSQL, e sistema completo di benchmark per confrontare le performance.
Caratteristiche
- Migrazione Completa: Trasferimento di tutti i dati da MySQL a PostgreSQL
- Migrazione Incrementale: Sincronizzazione periodica basata su consolidation keys
- Consolidamento Dati: Raggruppa multiple righe MySQL in singoli record PostgreSQL
- Trasformazione JSONB: Consolidamento automatico di colonne multiple in campi JSONB
- Partizionamento: Supporto per partizioni per anno (2014-2031)
- Indici Ottimizzati: GIN indexes per query efficienti su JSONB
- Performance Optimization: Usa
mysql_max_idper evitare full table scans - Progress Tracking: Barra di avanzamento in tempo reale con ETA
- Benchmark: Sistema completo per confrontare performance MySQL vs PostgreSQL
- Logging: Logging strutturato con Rich per output colorato
- Dry-Run Mode: Modalità test senza modificare i dati
- State Management: Tracking affidabile con tabella
migration_statein PostgreSQL
Setup
1. Requisiti
- Python 3.10+
- MySQL 5.7+
- PostgreSQL 13+
- pip
2. Installazione
# Clonare il repository
cd mysql2postgres
# Creare virtual environment
python -m venv venv
source venv/bin/activate # su Windows: venv\Scripts\activate
# Installare dipendenze
pip install -e .
3. Configurazione
Copiare .env.example a .env e configurare i dettagli di connessione:
cp .env.example .env
Modificare .env con i tuoi dettagli:
# MySQL Source Database
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=your_mysql_password
MYSQL_DATABASE=your_database_name
# PostgreSQL Target Database (container Incus)
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=postgres
POSTGRES_PASSWORD=your_postgres_password
POSTGRES_DATABASE=migrated_db
# Migration Settings
LOG_LEVEL=INFO
DRY_RUN=false
CONSOLIDATION_GROUP_LIMIT=40000
PROGRESS_LOG_INTERVAL=10000
# Performance Testing
BENCHMARK_OUTPUT_DIR=benchmark_results
BENCHMARK_ITERATIONS=5
Utilizzo
Comandi Disponibili
Info Configuration
python main.py info
Mostra la configurazione corrente di MySQL e PostgreSQL.
Setup Database
python main.py setup --create-schema
Crea lo schema PostgreSQL con:
- Tabelle
rawdatacoreelabdatadisppartizionate per anno - Indici ottimizzati per JSONB
- Tabella di tracking
migration_state
Migrazione Completa
# Migrare tutte le tabelle
python main.py migrate full
# Migrare una tabella specifica
python main.py migrate full --table RAWDATACOR
# Modalit<69> dry-run (senza modificare i dati)
python main.py migrate full --dry-run
Migrazione Incrementale
# Migrare solo i cambiamenti dal last sync
python main.py migrate incremental
# Per una tabella specifica
python main.py migrate incremental --table ELABDATADISP
# Dry-run per vedere cosa verrebbe migrato
python main.py migrate incremental --dry-run
Benchmark Performance
# Eseguire benchmark con iterations da config (default: 5)
python main.py benchmark
# Benchmark con numero specifico di iterazioni
python main.py benchmark --iterations 10
# Salvare risultati in file specifico
python main.py benchmark --output my_results.json
Come Funziona il Consolidamento
Il tool non migra le righe MySQL 1:1 in PostgreSQL. Invece, consolida multiple righe MySQL in singoli record PostgreSQL raggruppati per:
(UnitName, ToolNameID, EventDate, EventTime)
Validazione e Gestione Dati Corrotti
La migrazione valida automaticamente le chiavi di consolidamento e gestisce dati corrotti:
Validazioni applicate:
EventDate IS NULLoEventDate = '0000-00-00'ToolNameID IS NULLoToolNameID = ''(stringa vuota)UnitName IS NULLoUnitName = ''(stringa vuota)UnitNameche inizia con[L(stringhe Java corrotte come[Ljava.lang.String;@...)EventTime IS NULL
Comportamento:
- Le chiavi non valide vengono saltate automaticamente per evitare interruzioni
- Ogni chiave scartata viene loggata in file dedicati per tracciabilità
- Il numero totale di chiavi scartate viene riportato alla fine della migrazione
File di log degli errori:
- Full migration:
migration_errors_<table>_<partition>.log(es.migration_errors_rawdatacor_p2024.log) - Incremental migration:
migration_errors_<table>_incremental_<timestamp>.log(es.migration_errors_rawdatacor_incremental_20260101_194500.log)
Ogni esecuzione incrementale crea un nuovo file con timestamp per mantenere lo storico.
Questo approccio garantisce che la migrazione non si interrompa per dati corrotti, permettendo comunque di tracciare e analizzare le anomalie.
Perché Consolidare?
MySQL ha molte righe per lo stesso momento:
id | UnitName | ToolNameID | EventDate | EventTime | Val0 | Val1
100 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.5 | 45.2
101 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.6 | 45.3
102 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.7 | 45.1
PostgreSQL ottiene 1 record consolidato con tutti i valori in JSONB:
{
"unit_name": "Unit1",
"tool_name_id": "Tool1",
"event_timestamp": "2024-01-01 10:00:00",
"mysql_max_id": 102,
"measurements": {
"0": [
{"value": 23.5, "unit": "°C"},
{"value": 23.6, "unit": "°C"},
{"value": 23.7, "unit": "°C"}
],
"1": [
{"value": 45.2, "unit": "bar"},
{"value": 45.3, "unit": "bar"},
{"value": 45.1, "unit": "bar"}
]
}
}
Vantaggi:
- 🔥 Meno righe → query più veloci
- 💾 Storage più efficiente (compressione JSONB)
- 📊 Analisi più semplici (tutti i valori in un posto)
- 🚀 Migrazione più veloce (meno transazioni)
Trasformazione Dati
RAWDATACOR
Da MySQL:
Val0, Val1, ..., ValF (16 colonne)
Val0_unitmisure, Val1_unitmisure, ..., ValF_unitmisure (16 colonne)
A PostgreSQL (JSONB measurements):
{
"0": {"value": "123.45", "unit": "<22>C"},
"1": {"value": "67.89", "unit": "bar"},
...
"F": {"value": "11.22", "unit": "m/s"}
}
ELABDATADISP
Da MySQL: 25+ colonne di misure e calcoli
A PostgreSQL (JSONB measurements):
{
"shifts": {
"x": 1.234567, "y": 2.345678, "z": 3.456789,
"h": 4.567890, "h_dir": 5.678901, "h_local": 6.789012
},
"coordinates": {
"x": 10.123456, "y": 20.234567, "z": 30.345678,
"x_star": 40.456789, "z_star": 50.567890
},
"kinematics": {
"speed": 1.111111, "speed_local": 2.222222,
"acceleration": 3.333333, "acceleration_local": 4.444444
},
"sensors": {
"t_node": 25.5, "load_value": 100.5, "water_level": 50.5, "pressure": 1.013
},
"calculated": {
"alfa_x": 0.123456, "alfa_y": 0.234567, "area": 100.5
}
}
Query su JSONB
Esempi di query su PostgreSQL
-- Filtrare per valore specifico in RAWDATACOR
SELECT * FROM rawdatacor
WHERE measurements->>'0'->>'value' IS NOT NULL;
-- Range query su ELABDATADISP
SELECT * FROM elabdatadisp
WHERE (measurements->'kinematics'->>'speed')::NUMERIC > 10.0;
-- Aggregazione su JSONB
SELECT unit_name, AVG((measurements->'kinematics'->>'speed')::NUMERIC) as avg_speed
FROM elabdatadisp
GROUP BY unit_name;
-- Containment check
SELECT * FROM elabdatadisp
WHERE measurements @> '{"kinematics":{}}';
-- GIN index scan (veloce)
SELECT * FROM rawdatacor
WHERE measurements ? '0'
LIMIT 1000;
Partizionamento
Entrambe le tabelle sono partizionate per anno usando la colonna event_year:
-- Partizioni create automaticamente per:
-- rawdatacor_2014, rawdatacor_2015, ..., rawdatacor_2031
-- elabdatadisp_2014, elabdatadisp_2015, ..., elabdatadisp_2031
-- Partizionamento basato su event_year (calcolato da event_timestamp durante insert)
CREATE TABLE rawdatacor_2024 PARTITION OF rawdatacor
FOR VALUES FROM (2024) TO (2025);
-- Query partizionata (constraint exclusion automatico)
SELECT * FROM rawdatacor
WHERE event_year = 2024;
-- PostgreSQL usa solo rawdatacor_2024
-- Oppure usando event_timestamp
SELECT * FROM rawdatacor
WHERE event_timestamp >= '2024-01-01' AND event_timestamp < '2025-01-01';
-- PostgreSQL usa solo rawdatacor_2024
Indici
RAWDATACOR
-- Primary key (necessario per tabelle partizionate)
rawdatacor_pkey -- UNIQUE (id, event_year)
-- Consolidation key (previene duplicati)
rawdatacor_consolidation_key_unique -- UNIQUE (unit_name, tool_name_id, event_timestamp, event_year)
-- Query optimization
idx_rawdatacor_unit_tool -- (unit_name, tool_name_id)
idx_rawdatacor_measurements_gin -- GIN (measurements) per query JSONB
ELABDATADISP
-- Primary key (necessario per tabelle partizionate)
elabdatadisp_pkey -- UNIQUE (id, event_year)
-- Consolidation key (previene duplicati)
elabdatadisp_consolidation_key_unique -- UNIQUE (unit_name, tool_name_id, event_timestamp, event_year)
-- Query optimization
idx_elabdatadisp_unit_tool -- (unit_name, tool_name_id)
idx_elabdatadisp_measurements_gin -- GIN (measurements) per query JSONB
Benchmark
Il benchmark confronta le performance tra MySQL e PostgreSQL su:
- SELECT semplici: By PK, date range, unit+tool
- Query JSONB: Filtri su campi, range query, containment checks
- Aggregazioni: Group by, AVG, COUNT
- JOIN: Tra le due tabelle
Risultati salvati in: benchmark_results/benchmark_TIMESTAMP.json
Formato risultati:
{
"timestamp": "2024-01-15T10:30:45.123456",
"iterations": 5,
"tables": {
"RAWDATACOR": {
"select_by_pk": {
"mysql": {
"min": 0.5,
"max": 0.8,
"mean": 0.65,
"median": 0.65,
"p95": 0.8
},
"postgres": {
"min": 0.3,
"max": 0.6,
"mean": 0.45,
"p95": 0.6
}
}
}
}
}
Struttura Progetto
mysql2postgres/
main.py # CLI entry point
config.py # Configurazione Pydantic
.env.example # Template configurazione
pyproject.toml # Dipendenze
README.md # Questo file
src/
connectors/
mysql_connector.py # Connector MySQL
postgres_connector.py # Connector PostgreSQL
transformers/
schema_transformer.py # Creazione schema PostgreSQL
data_transformer.py # Trasformazione JSONB
migrator/
full_migration.py # Migrazione completa
incremental_migration.py # Migrazione delta
state.py # Tracking stato
benchmark/
query_generator.py # Generatore query test
performance_test.py # Runner benchmark
utils/
logger.py # Logging con Rich
progress.py # Progress bar
Workflow Consigliato
-
Setup iniziale
# Configurare .env cp .env.example .env nano .env # Creare schema PostgreSQL python main.py setup --create-schema -
Prima migrazione (completa)
# Test con dry-run python main.py migrate full --dry-run # Migrazione effettiva python main.py migrate full -
Migrazioni periodiche (incrementali)
# Manuale python main.py migrate incremental # Oppure schedule con cron (daily at 2 AM) 0 2 * * * cd /path/to/mysql2postgres && python main.py migrate incremental >> /var/log/migration.log 2>&1 -
Sincronizzare stato migrazione (se necessario)
# Se migration_state non è sincronizzato con i dati python scripts/sync_migration_state.py -
Benchmark di performance
python main.py benchmark --iterations 10
Troubleshooting
Errore di connessione MySQL
- Verificare credenziali in
.env - Controllare che MySQL sia online:
mysql -h localhost -u root -p
Errore di connessione PostgreSQL
- Verificare che container Incus sia avviato
- Verificare credenziali:
psql -h localhost -U postgres
Timeout durante migrazione
- Aumentare
CONSOLIDATION_GROUP_LIMITin.env(default: 40000) - Verificare performance di rete tra MySQL e PostgreSQL
"No previous migration found" (incremental)
- Causa: Non è stata eseguita una migrazione completa prima
- Soluzione: Eseguire
python main.py migrate fullprima
Migrazione incrementale lenta
- Causa: Query
SELECT MAX(mysql_max_id)su PostgreSQL impiega ~60 secondi - Soluzione: Questo è normale ed è eseguito solo una volta all'inizio. Le query MySQL successive sono istantanee.
- Alternativa: Creare indice su
mysql_max_id(usa spazio disco):CREATE INDEX idx_rawdatacor_mysql_max_id ON rawdatacor (mysql_max_id DESC); CREATE INDEX idx_elabdatadisp_mysql_max_id ON elabdatadisp (mysql_max_id DESC);
migration_state non sincronizzato
- Causa: Dati inseriti manualmente o stato corrotto
- Soluzione: Eseguire
python scripts/sync_migration_state.py
JSONB con valori NULL
- Il tool esclude automaticamente valori NULL da JSONB (solo valori non-NULL vengono aggiunti)
Performance Tips
-
Migration
- Aumentare
CONSOLIDATION_GROUP_LIMITper processare più chiavi per batch (default: 40000) - Aumentare
PROGRESS_LOG_INTERVALper ridurre logging (default: 10000) - Disabilitare indici durante migrazione se possibile (non implementato)
- Aumentare
-
Queries on JSONB
- Usare
->>per testo,->per JSON - GIN indexes accelerano query
?e@> - Castare a NUMERIC/INT quando necessario per operazioni
- Usare
-
Partizionamento
- PostgreSQL usa constraint exclusion per saltare partizioni
- Query su date range sono automaticamente ottimizzate
Supporto
Per bug o suggerimenti, aprire una issue nel repository.
License
MIT