Implement comprehensive error handling and fix state management bug in incremental migration: Error Logging System: - Add validation for consolidation keys (NULL dates, empty IDs, corrupted Java strings) - Log invalid keys to dedicated error files with detailed reasons - Full migration: migration_errors_<table>_<partition>.log - Incremental migration: migration_errors_<table>_incremental_<timestamp>.log (timestamped to preserve history) - Report total count of skipped invalid keys at migration completion - Auto-delete empty error log files State Tracking Fix: - Fix critical bug where last_key wasn't updated after final buffer flush - Track last_processed_key throughout migration loop - Update state both during periodic flushes and after final flush - Ensures incremental migration correctly resumes from last migrated key Validation Checks: - EventDate IS NULL or EventDate = '0000-00-00' - EventTime IS NULL - ToolNameID IS NULL or empty string - UnitName IS NULL or empty string - UnitName starting with '[L' (corrupted Java strings) Documentation: - Update README.md with error logging behavior - Update MIGRATION_WORKFLOW.md with validation details - Update CHANGELOG.md with new features and fixes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
398 lines
10 KiB
Markdown
398 lines
10 KiB
Markdown
# MySQL to PostgreSQL Migration Workflow
|
|
|
|
## Overview
|
|
|
|
This tool implements **consolidation-based incremental migration** for two tables:
|
|
- **RAWDATACOR**: Raw sensor measurements
|
|
- **ELABDATADISP**: Elaborated/calculated data
|
|
|
|
Both tables use **consolidation keys** to group and migrate data efficiently.
|
|
|
|
---
|
|
|
|
## Migration Modes
|
|
|
|
### 1. Full Migration
|
|
|
|
Initial migration of all historical data, migrating one partition at a time.
|
|
|
|
```bash
|
|
# Migrate all partitions for all tables
|
|
python main.py migrate full
|
|
|
|
# Migrate specific table
|
|
python main.py migrate full --table RAWDATACOR
|
|
|
|
# Migrate specific partition (year-based)
|
|
python main.py migrate full --table ELABDATADISP --partition 2024
|
|
|
|
# Dry-run to see what would be migrated
|
|
python main.py migrate full --dry-run
|
|
```
|
|
|
|
**Characteristics:**
|
|
- Migrates data partition by partition (year-based)
|
|
- Uses consolidation groups for efficiency
|
|
- Tracks progress in `migration_state` table (PostgreSQL)
|
|
- Can resume from last completed partition if interrupted
|
|
- Uses `mysql_max_id` optimization for performance
|
|
|
|
---
|
|
|
|
### 2. Incremental Migration
|
|
|
|
Sync only new data since the last migration.
|
|
|
|
```bash
|
|
# Migrate new data for all tables
|
|
python main.py migrate incremental
|
|
|
|
# Migrate specific table
|
|
python main.py migrate incremental --table ELABDATADISP
|
|
|
|
# Dry-run to check what would be migrated
|
|
python main.py migrate incremental --dry-run
|
|
```
|
|
|
|
**Characteristics:**
|
|
- Uses **consolidation keys** to identify new records:
|
|
- `(UnitName, ToolNameID, EventDate, EventTime)`
|
|
- Tracks last migrated key in `migration_state` table
|
|
- Optimized with `min_mysql_id` filter for performance
|
|
- Handles duplicates with `ON CONFLICT DO NOTHING`
|
|
- Perfect for scheduled jobs (cron, systemd timers)
|
|
|
|
**How it works:**
|
|
1. Retrieves `last_key` from `migration_state` table
|
|
2. Gets `MAX(mysql_max_id)` from PostgreSQL table for optimization
|
|
3. Queries MySQL: `WHERE id > max_mysql_id AND (key_tuple) > last_key`
|
|
4. Migrates new consolidation groups
|
|
5. Updates `migration_state` with new `last_key`
|
|
|
|
---
|
|
|
|
## Consolidation Keys
|
|
|
|
Both tables use consolidation to group multiple measurements into a single JSONB record.
|
|
|
|
### Consolidation Key Structure
|
|
|
|
```sql
|
|
(UnitName, ToolNameID, EventDate, EventTime)
|
|
```
|
|
|
|
### Data Quality Validation
|
|
|
|
The migration automatically validates and logs invalid consolidation keys:
|
|
- `EventDate IS NULL` or `EventDate = '0000-00-00'`
|
|
- `ToolNameID IS NULL` or `ToolNameID = ''` (empty string)
|
|
- `UnitName IS NULL` or `UnitName = ''` (empty string)
|
|
- `UnitName` starting with `[L` (corrupted Java strings like `[Ljava.lang.String;@...`)
|
|
- `EventTime IS NULL`
|
|
|
|
Invalid keys are:
|
|
- **Logged to error files** for tracking and analysis
|
|
- **Skipped automatically** to prevent migration failures
|
|
- **Counted and reported** at the end of migration
|
|
|
|
Error log files:
|
|
- Full migration: `migration_errors_<table>_<partition>.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
|
|
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
|
|
|
|
Each incremental migration creates a new timestamped file to preserve history.
|
|
|
|
### Why Consolidation?
|
|
|
|
Instead of migrating individual sensor readings, we:
|
|
1. **Group** all measurements for the same (unit, tool, date, time)
|
|
2. **Transform** 16-25 columns into structured JSONB
|
|
3. **Migrate** as a single consolidated record
|
|
|
|
**Example:**
|
|
|
|
MySQL has 10 rows for `(Unit1, Tool1, 2024-01-01, 10:00:00)`:
|
|
```
|
|
id | UnitName | ToolNameID | EventDate | EventTime | Val0 | Val1 | ...
|
|
1 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.5 | 45.2 | ...
|
|
2 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.6 | 45.3 | ...
|
|
...
|
|
```
|
|
|
|
PostgreSQL gets 1 consolidated record:
|
|
```json
|
|
{
|
|
"unit_name": "Unit1",
|
|
"tool_name_id": "Tool1",
|
|
"event_timestamp": "2024-01-01 10:00:00",
|
|
"measurements": {
|
|
"0": {"value": 23.5, "unit": "°C"},
|
|
"1": {"value": 45.2, "unit": "bar"},
|
|
...
|
|
},
|
|
"mysql_max_id": 10
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## State Management
|
|
|
|
### Migration State Table
|
|
|
|
The `migration_state` table in PostgreSQL tracks migration progress:
|
|
|
|
```sql
|
|
CREATE TABLE migration_state (
|
|
table_name VARCHAR(50),
|
|
partition_name VARCHAR(50),
|
|
last_key JSONB, -- Last migrated consolidation key
|
|
started_at TIMESTAMP,
|
|
completed_at TIMESTAMP,
|
|
total_rows INTEGER,
|
|
status VARCHAR(20)
|
|
);
|
|
```
|
|
|
|
### State Records
|
|
|
|
- **Per-partition state**: Tracks each partition's progress
|
|
- Example: `('rawdatacor', '2024', {...}, '2024-01-15 10:30:00', 'completed', 1000000)`
|
|
|
|
- **Global state**: Tracks overall incremental migration position
|
|
- Example: `('rawdatacor', '_global', {...}, NULL, NULL, 0, 'in_progress')`
|
|
|
|
### Checking State
|
|
|
|
```sql
|
|
-- View all migration state
|
|
SELECT * FROM migration_state ORDER BY table_name, partition_name;
|
|
|
|
-- View global state (for incremental migration)
|
|
SELECT * FROM migration_state WHERE partition_name = '_global';
|
|
```
|
|
|
|
---
|
|
|
|
## Performance Optimization
|
|
|
|
### MySQL ID Filter
|
|
|
|
The incremental migration uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries:
|
|
|
|
```sql
|
|
SELECT UnitName, ToolNameID, EventDate, EventTime
|
|
FROM RAWDATACOR
|
|
WHERE id > 267399536 -- max_mysql_id from PostgreSQL
|
|
AND (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
|
|
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
|
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
|
LIMIT 10000
|
|
```
|
|
|
|
**Why this is fast:**
|
|
- Uses PRIMARY KEY index on `id` to skip millions of already-migrated rows
|
|
- Tuple comparison only applied to filtered subset
|
|
- Avoids full table scans
|
|
|
|
### Consolidation Group Batching
|
|
|
|
Instead of fetching individual rows, we:
|
|
1. Fetch 10,000 consolidation keys at a time
|
|
2. For each key, fetch all matching rows from MySQL
|
|
3. Transform and insert into PostgreSQL
|
|
4. Update state every batch
|
|
|
|
---
|
|
|
|
## Recommended Workflow
|
|
|
|
### Initial Setup (One-time)
|
|
|
|
```bash
|
|
# 1. Configure .env file
|
|
cp .env.example .env
|
|
nano .env
|
|
|
|
# 2. Create PostgreSQL schema
|
|
python main.py setup --create-schema
|
|
|
|
# 3. Run full migration
|
|
python main.py migrate full
|
|
```
|
|
|
|
### Daily Incremental Sync
|
|
|
|
```bash
|
|
# Run incremental migration (via cron or manual)
|
|
python main.py migrate incremental
|
|
```
|
|
|
|
**Cron example** (daily at 2 AM):
|
|
```cron
|
|
0 2 * * * cd /path/to/mysql2postgres && python main.py migrate incremental >> /var/log/migration.log 2>&1
|
|
```
|
|
|
|
---
|
|
|
|
## Resuming Interrupted Migrations
|
|
|
|
### Full Migration
|
|
|
|
If interrupted, full migration resumes from the last completed partition:
|
|
|
|
```bash
|
|
# First run: migrates partitions 2014, 2015, 2016... (interrupted after 2020)
|
|
python main.py migrate full --table RAWDATACOR
|
|
|
|
# Resume: continues from partition 2021
|
|
python main.py migrate full --table RAWDATACOR
|
|
```
|
|
|
|
### Incremental Migration
|
|
|
|
Incremental migration uses the `last_key` from `migration_state`:
|
|
|
|
```bash
|
|
# Always safe to re-run - uses ON CONFLICT DO NOTHING
|
|
python main.py migrate incremental
|
|
```
|
|
|
|
---
|
|
|
|
## Syncing Migration State
|
|
|
|
If `migration_state` becomes out of sync with actual data, use the sync utility:
|
|
|
|
```bash
|
|
# Sync migration_state with actual PostgreSQL data
|
|
python scripts/sync_migration_state.py
|
|
```
|
|
|
|
This finds the most recent row (by `created_at`) and updates `migration_state._global`.
|
|
|
|
---
|
|
|
|
## Monitoring
|
|
|
|
### Check Progress
|
|
|
|
```bash
|
|
# View migration state
|
|
psql -h localhost -U postgres -d migrated_db -c \
|
|
"SELECT table_name, partition_name, status, total_rows, completed_at
|
|
FROM migration_state
|
|
ORDER BY table_name, partition_name"
|
|
```
|
|
|
|
### Verify Row Counts
|
|
|
|
```sql
|
|
-- PostgreSQL
|
|
SELECT COUNT(*) FROM rawdatacor;
|
|
SELECT COUNT(*) FROM elabdatadisp;
|
|
|
|
-- Compare with MySQL
|
|
-- mysql> SELECT COUNT(DISTINCT UnitName, ToolNameID, EventDate, EventTime) FROM RAWDATACOR;
|
|
```
|
|
|
|
---
|
|
|
|
## Common Issues
|
|
|
|
### "No previous migration found"
|
|
|
|
**Cause**: Running incremental migration before full migration
|
|
|
|
**Solution**: Run full migration first
|
|
```bash
|
|
python main.py migrate full
|
|
```
|
|
|
|
### "Duplicate key value violates unique constraint"
|
|
|
|
**Cause**: Data already exists (shouldn't happen with ON CONFLICT DO NOTHING)
|
|
|
|
**Solution**: Migration handles this automatically - check logs for details
|
|
|
|
### Slow Incremental Migration
|
|
|
|
**Cause**: `MAX(mysql_max_id)` query is slow (~60 seconds on large tables)
|
|
|
|
**Solution**: This is expected and only happens once at start. The MySQL queries are instant thanks to the optimization.
|
|
|
|
**Alternative**: Create an index on `mysql_max_id` in PostgreSQL (uses disk space):
|
|
```sql
|
|
CREATE INDEX idx_rawdatacor_mysql_max_id ON rawdatacor (mysql_max_id DESC);
|
|
CREATE INDEX idx_elabdatadisp_mysql_max_id ON elabdatadisp (mysql_max_id DESC);
|
|
```
|
|
|
|
---
|
|
|
|
## Key Technical Details
|
|
|
|
### Tuple Comparison in MySQL
|
|
|
|
MySQL supports lexicographic tuple comparison:
|
|
|
|
```sql
|
|
WHERE (UnitName, ToolNameID, EventDate, EventTime) > ('Unit1', 'Tool1', '2024-01-01', '10:00:00')
|
|
```
|
|
|
|
This is equivalent to:
|
|
```sql
|
|
WHERE UnitName > 'Unit1'
|
|
OR (UnitName = 'Unit1' AND ToolNameID > 'Tool1')
|
|
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate > '2024-01-01')
|
|
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate = '2024-01-01' AND EventTime > '10:00:00')
|
|
```
|
|
|
|
But much more efficient!
|
|
|
|
### Partitioning in PostgreSQL
|
|
|
|
Tables are partitioned by year (2014-2031):
|
|
```sql
|
|
CREATE TABLE rawdatacor_2024 PARTITION OF rawdatacor
|
|
FOR VALUES FROM (2024) TO (2025);
|
|
```
|
|
|
|
PostgreSQL automatically routes INSERTs to the correct partition based on `event_year`.
|
|
|
|
### Indexes in PostgreSQL
|
|
|
|
Both tables have these indexes automatically created:
|
|
|
|
**Primary Key** (required for partitioned tables):
|
|
```sql
|
|
-- Must include partition key (event_year)
|
|
UNIQUE (id, event_year)
|
|
```
|
|
|
|
**Consolidation Key** (prevents duplicates):
|
|
```sql
|
|
-- Ensures one record per consolidation group
|
|
UNIQUE (unit_name, tool_name_id, event_timestamp, event_year)
|
|
```
|
|
|
|
**Query Optimization**:
|
|
```sql
|
|
-- Fast filtering by unit/tool
|
|
(unit_name, tool_name_id)
|
|
|
|
-- JSONB queries with GIN index
|
|
GIN (measurements)
|
|
```
|
|
|
|
**Note**: All indexes are automatically created on all partitions when you run `setup --create-schema`.
|
|
|
|
---
|
|
|
|
## Summary
|
|
|
|
1. **Full migration**: One-time initial load, partition by partition
|
|
2. **Incremental migration**: Daily sync of new data using consolidation keys
|
|
3. **State tracking**: PostgreSQL `migration_state` table
|
|
4. **Performance**: `mysql_max_id` filter + consolidation batching
|
|
5. **Resumable**: Both modes can resume from interruptions
|
|
6. **Safe**: ON CONFLICT DO NOTHING prevents duplicates
|