# MySQL to PostgreSQL Migration Workflow ## Overview This tool implements **consolidation-based incremental migration** for two tables: - **RAWDATACOR**: Raw sensor measurements - **ELABDATADISP**: Elaborated/calculated data Both tables use **consolidation keys** to group and migrate data efficiently. --- ## Migration Modes ### 1. Full Migration Initial migration of all historical data, migrating one partition at a time. ```bash # Migrate all partitions for all tables python main.py migrate full # Migrate specific table python main.py migrate full --table RAWDATACOR # Migrate specific partition (year-based) python main.py migrate full --table ELABDATADISP --partition 2024 # Dry-run to see what would be migrated python main.py migrate full --dry-run ``` **Characteristics:** - Migrates data partition by partition (year-based) - Uses consolidation groups for efficiency - Tracks progress in `migration_state` table (PostgreSQL) - Can resume from last completed partition if interrupted - Uses `mysql_max_id` optimization for performance --- ### 2. Incremental Migration Sync only new data since the last migration. ```bash # Migrate new data for all tables python main.py migrate incremental # Migrate specific table python main.py migrate incremental --table ELABDATADISP # Dry-run to check what would be migrated python main.py migrate incremental --dry-run ``` **Characteristics:** - Uses **consolidation keys** to identify new records: - `(UnitName, ToolNameID, EventDate, EventTime)` - Tracks last migrated key in `migration_state` table - Optimized with `min_mysql_id` filter for performance - Handles duplicates with `ON CONFLICT DO NOTHING` - Perfect for scheduled jobs (cron, systemd timers) **How it works:** 1. Retrieves `last_key` from `migration_state` table 2. Gets `MAX(mysql_max_id)` from PostgreSQL table for optimization 3. Queries MySQL: `WHERE id > max_mysql_id AND (key_tuple) > last_key` 4. Migrates new consolidation groups 5. Updates `migration_state` with new `last_key` --- ## Consolidation Keys Both tables use consolidation to group multiple measurements into a single JSONB record. ### Consolidation Key Structure ```sql (UnitName, ToolNameID, EventDate, EventTime) ``` ### Data Quality Validation The migration automatically validates and logs invalid consolidation keys: - `EventDate IS NULL` or `EventDate = '0000-00-00'` - `ToolNameID IS NULL` or `ToolNameID = ''` (empty string) - `UnitName IS NULL` or `UnitName = ''` (empty string) - `UnitName` starting with `[L` (corrupted Java strings like `[Ljava.lang.String;@...`) - `EventTime IS NULL` Invalid keys are: - **Logged to error files** for tracking and analysis - **Skipped automatically** to prevent migration failures - **Counted and reported** at the end of migration Error log files: - Full migration: `migration_errors__.log` (e.g., `migration_errors_rawdatacor_p2024.log`) - Incremental migration: `migration_errors_
_incremental_.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`) Each incremental migration creates a new timestamped file to preserve history. ### Why Consolidation? Instead of migrating individual sensor readings, we: 1. **Group** all measurements for the same (unit, tool, date, time) 2. **Transform** 16-25 columns into structured JSONB 3. **Migrate** as a single consolidated record **Example:** MySQL has 10 rows for `(Unit1, Tool1, 2024-01-01, 10:00:00)`: ``` id | UnitName | ToolNameID | EventDate | EventTime | Val0 | Val1 | ... 1 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.5 | 45.2 | ... 2 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.6 | 45.3 | ... ... ``` PostgreSQL gets 1 consolidated record: ```json { "unit_name": "Unit1", "tool_name_id": "Tool1", "event_timestamp": "2024-01-01 10:00:00", "measurements": { "0": {"value": 23.5, "unit": "°C"}, "1": {"value": 45.2, "unit": "bar"}, ... }, "mysql_max_id": 10 } ``` --- ## State Management ### Migration State Table The `migration_state` table in PostgreSQL tracks migration progress: ```sql CREATE TABLE migration_state ( table_name VARCHAR(50), partition_name VARCHAR(50), last_key JSONB, -- Last migrated consolidation key started_at TIMESTAMP, completed_at TIMESTAMP, total_rows INTEGER, status VARCHAR(20) ); ``` ### State Records - **Per-partition state**: Tracks each partition's progress - Example: `('rawdatacor', '2024', {...}, '2024-01-15 10:30:00', 'completed', 1000000)` - **Global state**: Tracks overall incremental migration position - Example: `('rawdatacor', '_global', {...}, NULL, NULL, 0, 'in_progress')` ### Checking State ```sql -- View all migration state SELECT * FROM migration_state ORDER BY table_name, partition_name; -- View global state (for incremental migration) SELECT * FROM migration_state WHERE partition_name = '_global'; ``` --- ## Performance Optimization ### MySQL ID Filter The incremental migration uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries: ```sql SELECT UnitName, ToolNameID, EventDate, EventTime FROM RAWDATACOR WHERE id > 267399536 -- max_mysql_id from PostgreSQL AND (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?) GROUP BY UnitName, ToolNameID, EventDate, EventTime ORDER BY UnitName, ToolNameID, EventDate, EventTime LIMIT 10000 ``` **Why this is fast:** - Uses PRIMARY KEY index on `id` to skip millions of already-migrated rows - Tuple comparison only applied to filtered subset - Avoids full table scans ### Consolidation Group Batching Instead of fetching individual rows, we: 1. Fetch 10,000 consolidation keys at a time 2. For each key, fetch all matching rows from MySQL 3. Transform and insert into PostgreSQL 4. Update state every batch --- ## Recommended Workflow ### Initial Setup (One-time) ```bash # 1. Configure .env file cp .env.example .env nano .env # 2. Create PostgreSQL schema python main.py setup --create-schema # 3. Run full migration python main.py migrate full ``` ### Daily Incremental Sync ```bash # Run incremental migration (via cron or manual) python main.py migrate incremental ``` **Cron example** (daily at 2 AM): ```cron 0 2 * * * cd /path/to/mysql2postgres && python main.py migrate incremental >> /var/log/migration.log 2>&1 ``` --- ## Resuming Interrupted Migrations ### Full Migration If interrupted, full migration resumes from the last completed partition: ```bash # First run: migrates partitions 2014, 2015, 2016... (interrupted after 2020) python main.py migrate full --table RAWDATACOR # Resume: continues from partition 2021 python main.py migrate full --table RAWDATACOR ``` ### Incremental Migration Incremental migration uses the `last_key` from `migration_state`: ```bash # Always safe to re-run - uses ON CONFLICT DO NOTHING python main.py migrate incremental ``` --- ## Syncing Migration State If `migration_state` becomes out of sync with actual data, use the sync utility: ```bash # Sync migration_state with actual PostgreSQL data python scripts/sync_migration_state.py ``` This finds the most recent row (by `created_at`) and updates `migration_state._global`. --- ## Monitoring ### Check Progress ```bash # View migration state psql -h localhost -U postgres -d migrated_db -c \ "SELECT table_name, partition_name, status, total_rows, completed_at FROM migration_state ORDER BY table_name, partition_name" ``` ### Verify Row Counts ```sql -- PostgreSQL SELECT COUNT(*) FROM rawdatacor; SELECT COUNT(*) FROM elabdatadisp; -- Compare with MySQL -- mysql> SELECT COUNT(DISTINCT UnitName, ToolNameID, EventDate, EventTime) FROM RAWDATACOR; ``` --- ## Common Issues ### "No previous migration found" **Cause**: Running incremental migration before full migration **Solution**: Run full migration first ```bash python main.py migrate full ``` ### "Duplicate key value violates unique constraint" **Cause**: Data already exists (shouldn't happen with ON CONFLICT DO NOTHING) **Solution**: Migration handles this automatically - check logs for details ### Slow Incremental Migration **Cause**: `MAX(mysql_max_id)` query is slow (~60 seconds on large tables) **Solution**: This is expected and only happens once at start. The MySQL queries are instant thanks to the optimization. **Alternative**: Create an index on `mysql_max_id` in PostgreSQL (uses disk space): ```sql CREATE INDEX idx_rawdatacor_mysql_max_id ON rawdatacor (mysql_max_id DESC); CREATE INDEX idx_elabdatadisp_mysql_max_id ON elabdatadisp (mysql_max_id DESC); ``` --- ## Key Technical Details ### Tuple Comparison in MySQL MySQL supports lexicographic tuple comparison: ```sql WHERE (UnitName, ToolNameID, EventDate, EventTime) > ('Unit1', 'Tool1', '2024-01-01', '10:00:00') ``` This is equivalent to: ```sql WHERE UnitName > 'Unit1' OR (UnitName = 'Unit1' AND ToolNameID > 'Tool1') OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate > '2024-01-01') OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate = '2024-01-01' AND EventTime > '10:00:00') ``` But much more efficient! ### Partitioning in PostgreSQL Tables are partitioned by year (2014-2031): ```sql CREATE TABLE rawdatacor_2024 PARTITION OF rawdatacor FOR VALUES FROM (2024) TO (2025); ``` PostgreSQL automatically routes INSERTs to the correct partition based on `event_year`. ### Indexes in PostgreSQL Both tables have these indexes automatically created: **Primary Key** (required for partitioned tables): ```sql -- Must include partition key (event_year) UNIQUE (id, event_year) ``` **Consolidation Key** (prevents duplicates): ```sql -- Ensures one record per consolidation group UNIQUE (unit_name, tool_name_id, event_timestamp, event_year) ``` **Query Optimization**: ```sql -- Fast filtering by unit/tool (unit_name, tool_name_id) -- JSONB queries with GIN index GIN (measurements) ``` **Note**: All indexes are automatically created on all partitions when you run `setup --create-schema`. --- ## Summary 1. **Full migration**: One-time initial load, partition by partition 2. **Incremental migration**: Daily sync of new data using consolidation keys 3. **State tracking**: PostgreSQL `migration_state` table 4. **Performance**: `mysql_max_id` filter + consolidation batching 5. **Resumable**: Both modes can resume from interruptions 6. **Safe**: ON CONFLICT DO NOTHING prevents duplicates