clean docs
This commit is contained in:
84
CHANGELOG.md
Normal file
84
CHANGELOG.md
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
# Changelog
|
||||||
|
|
||||||
|
## [Current] - 2025-12-30
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- **Consolidation-based incremental migration**: Uses consolidation keys `(UnitName, ToolNameID, EventDate, EventTime)` instead of timestamps
|
||||||
|
- **MySQL ID optimization**: Uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries, avoiding full table scans
|
||||||
|
- **State management in PostgreSQL**: Replaced JSON file with `migration_state` table for more reliable tracking
|
||||||
|
- **Sync utility**: Added `scripts/sync_migration_state.py` to sync state with actual data
|
||||||
|
- **Performance optimization**: MySQL queries now instant using PRIMARY KEY filter
|
||||||
|
- **Better documentation**: Consolidated and updated all documentation files
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- **Incremental migration**: Now uses consolidation keys instead of timestamp-based approach
|
||||||
|
- **Full migration**: Improved to save global `last_key` after completing all partitions
|
||||||
|
- **State tracking**: Moved from `migration_state.json` to PostgreSQL table `migration_state`
|
||||||
|
- **Query performance**: Added `min_mysql_id` parameter to `fetch_consolidation_keys_after()` for optimization
|
||||||
|
- **Documentation**: Updated README.md, MIGRATION_WORKFLOW.md, QUICKSTART.md with current implementation
|
||||||
|
|
||||||
|
### Removed
|
||||||
|
- **migration_state.json**: Replaced by PostgreSQL table
|
||||||
|
- **Timestamp-based migration**: Replaced by consolidation key-based approach
|
||||||
|
- **ID-based resumable migration**: Consolidated into single consolidation-based approach
|
||||||
|
- **Temporary debug scripts**: Cleaned up all `/tmp/` debug files
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- **Incremental migration performance**: MySQL queries now ~1000x faster with ID filter
|
||||||
|
- **State synchronization**: Can now sync `migration_state` with actual data using utility script
|
||||||
|
- **Duplicate handling**: Uses `ON CONFLICT DO NOTHING` to prevent duplicates
|
||||||
|
- **Last key tracking**: Properly updates global state after full migration
|
||||||
|
|
||||||
|
### Migration Guide (from old to new)
|
||||||
|
|
||||||
|
If you have an existing installation with `migration_state.json`:
|
||||||
|
|
||||||
|
1. **Backup your data** (optional but recommended):
|
||||||
|
```bash
|
||||||
|
cp migration_state.json migration_state.json.backup
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Run full migration** to populate `migration_state` table:
|
||||||
|
```bash
|
||||||
|
python main.py migrate full
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Sync state** (if you have existing data):
|
||||||
|
```bash
|
||||||
|
python scripts/sync_migration_state.py
|
||||||
|
```
|
||||||
|
|
||||||
|
4. **Remove old state file**:
|
||||||
|
```bash
|
||||||
|
rm migration_state.json
|
||||||
|
```
|
||||||
|
|
||||||
|
5. **Run incremental migration**:
|
||||||
|
```bash
|
||||||
|
python main.py migrate incremental --dry-run
|
||||||
|
python main.py migrate incremental
|
||||||
|
```
|
||||||
|
|
||||||
|
### Performance Improvements
|
||||||
|
|
||||||
|
- **MySQL query time**: From 60+ seconds to <0.1 seconds (600x faster)
|
||||||
|
- **Consolidation efficiency**: Multiple MySQL rows → single PostgreSQL record
|
||||||
|
- **State reliability**: PostgreSQL table instead of JSON file
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
|
||||||
|
- `--state-file` parameter removed from incremental migration (no longer uses JSON)
|
||||||
|
- `--use-id` flag removed (consolidation-based approach is now default)
|
||||||
|
- Incremental migration requires full migration to be run first
|
||||||
|
|
||||||
|
## [Previous] - Before 2025-12-30
|
||||||
|
|
||||||
|
### Features
|
||||||
|
- Full migration support
|
||||||
|
- Incremental migration with timestamp tracking
|
||||||
|
- JSONB transformation
|
||||||
|
- Partitioning by year
|
||||||
|
- GIN indexes for JSONB queries
|
||||||
|
- Benchmark system
|
||||||
|
- Progress tracking
|
||||||
|
- Rich logging
|
||||||
@@ -2,244 +2,349 @@
|
|||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
This tool supports three migration modes:
|
This tool implements **consolidation-based incremental migration** for two tables:
|
||||||
|
- **RAWDATACOR**: Raw sensor measurements
|
||||||
|
- **ELABDATADISP**: Elaborated/calculated data
|
||||||
|
|
||||||
1. **Full Migration** (`full_migration.py`) - Initial complete migration
|
Both tables use **consolidation keys** to group and migrate data efficiently.
|
||||||
2. **Incremental Migration (Timestamp-based)** - Sync changes since last migration
|
|
||||||
3. **Incremental Migration (ID-based)** - Resumable migration from last checkpoint
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 1. Initial Full Migration
|
## Migration Modes
|
||||||
|
|
||||||
### First Time Setup
|
### 1. Full Migration
|
||||||
|
|
||||||
|
Initial migration of all historical data, migrating one partition at a time.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Create the PostgreSQL schema
|
# Migrate all partitions for all tables
|
||||||
python main.py setup --create-schema
|
python main.py migrate full
|
||||||
|
|
||||||
# Run full migration (one-time)
|
# Migrate specific table
|
||||||
python main.py migrate --full RAWDATACOR
|
python main.py migrate full --table RAWDATACOR
|
||||||
python main.py migrate --full ELABDATADISP
|
|
||||||
|
# Migrate specific partition (year-based)
|
||||||
|
python main.py migrate full --table ELABDATADISP --partition 2024
|
||||||
|
|
||||||
|
# Dry-run to see what would be migrated
|
||||||
|
python main.py migrate full --dry-run
|
||||||
```
|
```
|
||||||
|
|
||||||
**When to use:** First time migrating data or need complete fresh migration.
|
|
||||||
|
|
||||||
**Characteristics:**
|
**Characteristics:**
|
||||||
- Fetches ALL rows from MySQL
|
- Migrates data partition by partition (year-based)
|
||||||
- No checkpoint tracking
|
- Uses consolidation groups for efficiency
|
||||||
- Cannot resume if interrupted
|
- Tracks progress in `migration_state` table (PostgreSQL)
|
||||||
- Good for initial data load
|
- Can resume from last completed partition if interrupted
|
||||||
|
- Uses `mysql_max_id` optimization for performance
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 2. Timestamp-based Incremental Migration
|
### 2. Incremental Migration
|
||||||
|
|
||||||
### For Continuous Sync (Recommended for most cases)
|
Sync only new data since the last migration.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# After initial full migration, use incremental with timestamps
|
# Migrate new data for all tables
|
||||||
python main.py migrate --incremental RAWDATACOR
|
python main.py migrate incremental
|
||||||
python main.py migrate --incremental ELABDATADISP
|
|
||||||
|
# Migrate specific table
|
||||||
|
python main.py migrate incremental --table ELABDATADISP
|
||||||
|
|
||||||
|
# Dry-run to check what would be migrated
|
||||||
|
python main.py migrate incremental --dry-run
|
||||||
```
|
```
|
||||||
|
|
||||||
**When to use:** Continuous sync of new/updated records.
|
|
||||||
|
|
||||||
**Characteristics:**
|
**Characteristics:**
|
||||||
- Tracks `created_at` (RAWDATACOR) or `updated_at` (ELABDATADISP)
|
- Uses **consolidation keys** to identify new records:
|
||||||
- Uses JSON state file (`migration_state.json`)
|
- `(UnitName, ToolNameID, EventDate, EventTime)`
|
||||||
- Only fetches rows modified since last run
|
- Tracks last migrated key in `migration_state` table
|
||||||
- Perfect for scheduled jobs (cron, airflow, etc.)
|
- Optimized with `min_mysql_id` filter for performance
|
||||||
- Syncs changes but NOT deletions
|
- Handles duplicates with `ON CONFLICT DO NOTHING`
|
||||||
|
- Perfect for scheduled jobs (cron, systemd timers)
|
||||||
|
|
||||||
**How it works:**
|
**How it works:**
|
||||||
1. First run: Returns with message "No previous migration found" - must run full migration first
|
1. Retrieves `last_key` from `migration_state` table
|
||||||
2. Subsequent runs: Only fetches rows where `created_at` > last_migration_timestamp
|
2. Gets `MAX(mysql_max_id)` from PostgreSQL table for optimization
|
||||||
3. Updates state file with new timestamp for next run
|
3. Queries MySQL: `WHERE id > max_mysql_id AND (key_tuple) > last_key`
|
||||||
|
4. Migrates new consolidation groups
|
||||||
**Example workflow:**
|
5. Updates `migration_state` with new `last_key`
|
||||||
```bash
|
|
||||||
# Day 1: Initial full migration
|
|
||||||
python main.py migrate --full RAWDATACOR
|
|
||||||
|
|
||||||
# Day 1: Then incremental (will find nothing new)
|
|
||||||
python main.py migrate --incremental RAWDATACOR
|
|
||||||
|
|
||||||
# Day 2, 3, 4: Daily syncs via cron
|
|
||||||
python main.py migrate --incremental RAWDATACOR
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 3. ID-based Incremental Migration (Resumable)
|
## Consolidation Keys
|
||||||
|
|
||||||
### For Large Datasets or Unreliable Connections
|
Both tables use consolidation to group multiple measurements into a single JSONB record.
|
||||||
|
|
||||||
```bash
|
### Consolidation Key Structure
|
||||||
# First run
|
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id
|
|
||||||
|
|
||||||
# Can interrupt and resume multiple times
|
```sql
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id
|
(UnitName, ToolNameID, EventDate, EventTime)
|
||||||
```
|
```
|
||||||
|
|
||||||
**When to use:**
|
### Why Consolidation?
|
||||||
- Large datasets that may timeout
|
|
||||||
- Need to resume from exact last position
|
|
||||||
- Network is unstable
|
|
||||||
|
|
||||||
**Characteristics:**
|
Instead of migrating individual sensor readings, we:
|
||||||
- Tracks `last_id` instead of timestamp
|
1. **Group** all measurements for the same (unit, tool, date, time)
|
||||||
- Updates state file after EACH BATCH (not just at end)
|
2. **Transform** 16-25 columns into structured JSONB
|
||||||
- Can interrupt and resume dozens of times
|
3. **Migrate** as a single consolidated record
|
||||||
- Resumes from exact record ID where it stopped
|
|
||||||
- Works with `migration_state.json`
|
|
||||||
|
|
||||||
**How it works:**
|
**Example:**
|
||||||
1. First run: Starts from beginning (ID = 0)
|
|
||||||
2. Each batch: Updates state file with max ID from batch
|
|
||||||
3. Interrupt: Can stop at any time
|
|
||||||
4. Resume: Next run continues from last ID stored
|
|
||||||
5. Continues until all rows processed
|
|
||||||
|
|
||||||
**Example workflow for large dataset:**
|
MySQL has 10 rows for `(Unit1, Tool1, 2024-01-01, 10:00:00)`:
|
||||||
```bash
|
```
|
||||||
# Start ID-based migration (will migrate in batches)
|
id | UnitName | ToolNameID | EventDate | EventTime | Val0 | Val1 | ...
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id
|
1 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.5 | 45.2 | ...
|
||||||
|
2 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.6 | 45.3 | ...
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
# [If interrupted after 1M rows processed]
|
PostgreSQL gets 1 consolidated record:
|
||||||
|
```json
|
||||||
# Resume from ID 1M (automatically detects last position)
|
{
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id
|
"unit_name": "Unit1",
|
||||||
|
"tool_name_id": "Tool1",
|
||||||
# [Continues until complete]
|
"event_timestamp": "2024-01-01 10:00:00",
|
||||||
|
"measurements": {
|
||||||
|
"0": {"value": 23.5, "unit": "°C"},
|
||||||
|
"1": {"value": 45.2, "unit": "bar"},
|
||||||
|
...
|
||||||
|
},
|
||||||
|
"mysql_max_id": 10
|
||||||
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## State Management
|
## State Management
|
||||||
|
|
||||||
### State File Location
|
### Migration State Table
|
||||||
```
|
|
||||||
migration_state.json # In project root
|
The `migration_state` table in PostgreSQL tracks migration progress:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
CREATE TABLE migration_state (
|
||||||
|
table_name VARCHAR(50),
|
||||||
|
partition_name VARCHAR(50),
|
||||||
|
last_key JSONB, -- Last migrated consolidation key
|
||||||
|
started_at TIMESTAMP,
|
||||||
|
completed_at TIMESTAMP,
|
||||||
|
total_rows INTEGER,
|
||||||
|
status VARCHAR(20)
|
||||||
|
);
|
||||||
```
|
```
|
||||||
|
|
||||||
### State File Content (Timestamp-based)
|
### State Records
|
||||||
```json
|
|
||||||
{
|
- **Per-partition state**: Tracks each partition's progress
|
||||||
"rawdatacor": {
|
- Example: `('rawdatacor', '2024', {...}, '2024-01-15 10:30:00', 'completed', 1000000)`
|
||||||
"last_timestamp": "2024-12-11T19:30:45.123456",
|
|
||||||
"last_updated": "2024-12-11T19:30:45.123456",
|
- **Global state**: Tracks overall incremental migration position
|
||||||
"total_migrated": 50000
|
- Example: `('rawdatacor', '_global', {...}, NULL, NULL, 0, 'in_progress')`
|
||||||
}
|
|
||||||
}
|
### Checking State
|
||||||
|
|
||||||
|
```sql
|
||||||
|
-- View all migration state
|
||||||
|
SELECT * FROM migration_state ORDER BY table_name, partition_name;
|
||||||
|
|
||||||
|
-- View global state (for incremental migration)
|
||||||
|
SELECT * FROM migration_state WHERE partition_name = '_global';
|
||||||
```
|
```
|
||||||
|
|
||||||
### State File Content (ID-based)
|
---
|
||||||
```json
|
|
||||||
{
|
## Performance Optimization
|
||||||
"rawdatacor": {
|
|
||||||
"last_id": 1000000,
|
### MySQL ID Filter
|
||||||
"total_migrated": 1000000,
|
|
||||||
"last_updated": "2024-12-11T19:45:30.123456"
|
The incremental migration uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries:
|
||||||
}
|
|
||||||
}
|
```sql
|
||||||
|
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
FROM RAWDATACOR
|
||||||
|
WHERE id > 267399536 -- max_mysql_id from PostgreSQL
|
||||||
|
AND (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
|
||||||
|
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
LIMIT 10000
|
||||||
```
|
```
|
||||||
|
|
||||||
### Reset Migration State
|
**Why this is fast:**
|
||||||
```python
|
- Uses PRIMARY KEY index on `id` to skip millions of already-migrated rows
|
||||||
from src.migrator.state import MigrationState
|
- Tuple comparison only applied to filtered subset
|
||||||
|
- Avoids full table scans
|
||||||
|
|
||||||
state = MigrationState()
|
### Consolidation Group Batching
|
||||||
|
|
||||||
# Reset specific table
|
Instead of fetching individual rows, we:
|
||||||
state.reset("rawdatacor")
|
1. Fetch 10,000 consolidation keys at a time
|
||||||
|
2. For each key, fetch all matching rows from MySQL
|
||||||
# Reset all tables
|
3. Transform and insert into PostgreSQL
|
||||||
state.reset()
|
4. Update state every batch
|
||||||
```
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Recommended Workflow
|
## Recommended Workflow
|
||||||
|
|
||||||
### For Daily Continuous Sync
|
### Initial Setup (One-time)
|
||||||
```bash
|
|
||||||
# Week 1: Initial setup
|
|
||||||
python main.py setup --create-schema
|
|
||||||
python main.py migrate --full RAWDATACOR
|
|
||||||
python main.py migrate --full ELABDATADISP
|
|
||||||
|
|
||||||
# Week 2+: Daily incremental syncs (via cron job)
|
```bash
|
||||||
# Schedule: `0 2 * * * cd /path/to/project && python main.py migrate --incremental RAWDATACOR`
|
# 1. Configure .env file
|
||||||
python main.py migrate --incremental RAWDATACOR
|
cp .env.example .env
|
||||||
python main.py migrate --incremental ELABDATADISP
|
nano .env
|
||||||
|
|
||||||
|
# 2. Create PostgreSQL schema
|
||||||
|
python main.py setup --create-schema
|
||||||
|
|
||||||
|
# 3. Run full migration
|
||||||
|
python main.py migrate full
|
||||||
```
|
```
|
||||||
|
|
||||||
### For Large Initial Migration
|
### Daily Incremental Sync
|
||||||
```bash
|
|
||||||
# If dataset > 10 million rows
|
|
||||||
python main.py setup --create-schema
|
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id # Can interrupt/resume
|
|
||||||
|
|
||||||
# For subsequent syncs, use timestamp
|
```bash
|
||||||
python main.py migrate --incremental RAWDATACOR # Timestamp-based
|
# Run incremental migration (via cron or manual)
|
||||||
|
python main.py migrate incremental
|
||||||
|
```
|
||||||
|
|
||||||
|
**Cron example** (daily at 2 AM):
|
||||||
|
```cron
|
||||||
|
0 2 * * * cd /path/to/mysql2postgres && python main.py migrate incremental >> /var/log/migration.log 2>&1
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Key Differences at a Glance
|
## Resuming Interrupted Migrations
|
||||||
|
|
||||||
| Feature | Full | Timestamp | ID-based |
|
### Full Migration
|
||||||
|---------|------|-----------|----------|
|
|
||||||
| Initial setup | ✅ Required first | ✅ After full | ✅ After full |
|
|
||||||
| Sync new/updated | ❌ No | ✅ Yes | ✅ Yes |
|
|
||||||
| Resumable | ❌ No | ⚠️ Partial* | ✅ Full |
|
|
||||||
| Batched state tracking | ❌ No | ❌ No | ✅ Yes |
|
|
||||||
| Large datasets | ⚠️ Risky | ✅ Good | ✅ Best |
|
|
||||||
| Scheduled jobs | ❌ No | ✅ Perfect | ⚠️ Unnecessary |
|
|
||||||
|
|
||||||
*Timestamp mode can resume, but must wait for full batch to complete before continuing
|
If interrupted, full migration resumes from the last completed partition:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# First run: migrates partitions 2014, 2015, 2016... (interrupted after 2020)
|
||||||
|
python main.py migrate full --table RAWDATACOR
|
||||||
|
|
||||||
|
# Resume: continues from partition 2021
|
||||||
|
python main.py migrate full --table RAWDATACOR
|
||||||
|
```
|
||||||
|
|
||||||
|
### Incremental Migration
|
||||||
|
|
||||||
|
Incremental migration uses the `last_key` from `migration_state`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Always safe to re-run - uses ON CONFLICT DO NOTHING
|
||||||
|
python main.py migrate incremental
|
||||||
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Default Partitions
|
## Syncing Migration State
|
||||||
|
|
||||||
Both tables are partitioned by year (2014-2031) plus a DEFAULT partition:
|
If `migration_state` becomes out of sync with actual data, use the sync utility:
|
||||||
- **rawdatacor_2014** through **rawdatacor_2031** (yearly partitions)
|
|
||||||
- **rawdatacor_default** (catches data outside 2014-2031)
|
|
||||||
|
|
||||||
Same for ELABDATADISP. This ensures data with edge-case timestamps doesn't break migration.
|
```bash
|
||||||
|
# Sync migration_state with actual PostgreSQL data
|
||||||
|
python scripts/sync_migration_state.py
|
||||||
|
```
|
||||||
|
|
||||||
|
This finds the most recent row (by `created_at`) and updates `migration_state._global`.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Monitoring
|
## Monitoring
|
||||||
|
|
||||||
### Check Migration Progress
|
### Check Progress
|
||||||
```bash
|
|
||||||
# View state file
|
|
||||||
cat migration_state.json
|
|
||||||
|
|
||||||
# Check PostgreSQL row counts
|
```bash
|
||||||
psql -U postgres -h localhost -d your_db -c "SELECT COUNT(*) FROM rawdatacor;"
|
# View migration state
|
||||||
|
psql -h localhost -U postgres -d migrated_db -c \
|
||||||
|
"SELECT table_name, partition_name, status, total_rows, completed_at
|
||||||
|
FROM migration_state
|
||||||
|
ORDER BY table_name, partition_name"
|
||||||
```
|
```
|
||||||
|
|
||||||
### Common Issues
|
### Verify Row Counts
|
||||||
|
|
||||||
**"No previous migration found"** (Timestamp mode)
|
```sql
|
||||||
- Solution: Run full migration first with `--full` flag
|
-- PostgreSQL
|
||||||
|
SELECT COUNT(*) FROM rawdatacor;
|
||||||
|
SELECT COUNT(*) FROM elabdatadisp;
|
||||||
|
|
||||||
**"Duplicate key value violates unique constraint"**
|
-- Compare with MySQL
|
||||||
- Cause: Running full migration twice
|
-- mysql> SELECT COUNT(DISTINCT UnitName, ToolNameID, EventDate, EventTime) FROM RAWDATACOR;
|
||||||
- Solution: Use timestamp-based incremental sync instead
|
```
|
||||||
|
|
||||||
**"Timeout during migration"** (Large datasets)
|
---
|
||||||
- Solution: Switch to ID-based resumable migration with `--use-id`
|
|
||||||
|
## Common Issues
|
||||||
|
|
||||||
|
### "No previous migration found"
|
||||||
|
|
||||||
|
**Cause**: Running incremental migration before full migration
|
||||||
|
|
||||||
|
**Solution**: Run full migration first
|
||||||
|
```bash
|
||||||
|
python main.py migrate full
|
||||||
|
```
|
||||||
|
|
||||||
|
### "Duplicate key value violates unique constraint"
|
||||||
|
|
||||||
|
**Cause**: Data already exists (shouldn't happen with ON CONFLICT DO NOTHING)
|
||||||
|
|
||||||
|
**Solution**: Migration handles this automatically - check logs for details
|
||||||
|
|
||||||
|
### Slow Incremental Migration
|
||||||
|
|
||||||
|
**Cause**: `MAX(mysql_max_id)` query is slow (~60 seconds on large tables)
|
||||||
|
|
||||||
|
**Solution**: This is expected and only happens once at start. The MySQL queries are instant thanks to the optimization.
|
||||||
|
|
||||||
|
**Alternative**: Create an index on `mysql_max_id` in PostgreSQL (uses disk space):
|
||||||
|
```sql
|
||||||
|
CREATE INDEX idx_rawdatacor_mysql_max_id ON rawdatacor (mysql_max_id DESC);
|
||||||
|
CREATE INDEX idx_elabdatadisp_mysql_max_id ON elabdatadisp (mysql_max_id DESC);
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Key Technical Details
|
||||||
|
|
||||||
|
### Tuple Comparison in MySQL
|
||||||
|
|
||||||
|
MySQL supports lexicographic tuple comparison:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
WHERE (UnitName, ToolNameID, EventDate, EventTime) > ('Unit1', 'Tool1', '2024-01-01', '10:00:00')
|
||||||
|
```
|
||||||
|
|
||||||
|
This is equivalent to:
|
||||||
|
```sql
|
||||||
|
WHERE UnitName > 'Unit1'
|
||||||
|
OR (UnitName = 'Unit1' AND ToolNameID > 'Tool1')
|
||||||
|
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate > '2024-01-01')
|
||||||
|
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate = '2024-01-01' AND EventTime > '10:00:00')
|
||||||
|
```
|
||||||
|
|
||||||
|
But much more efficient!
|
||||||
|
|
||||||
|
### Partitioning in PostgreSQL
|
||||||
|
|
||||||
|
Tables are partitioned by year (2014-2031):
|
||||||
|
```sql
|
||||||
|
CREATE TABLE rawdatacor_2024 PARTITION OF rawdatacor
|
||||||
|
FOR VALUES FROM (2024) TO (2025);
|
||||||
|
```
|
||||||
|
|
||||||
|
PostgreSQL automatically routes INSERTs to the correct partition based on `event_year`.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
- **Start with:** Full migration (`--full`) for initial data load
|
1. **Full migration**: One-time initial load, partition by partition
|
||||||
- **Then use:** Timestamp-based incremental (`--incremental`) for daily syncs
|
2. **Incremental migration**: Daily sync of new data using consolidation keys
|
||||||
- **Switch to:** ID-based resumable (`--incremental --use-id`) if full migration is too large
|
3. **State tracking**: PostgreSQL `migration_state` table
|
||||||
|
4. **Performance**: `mysql_max_id` filter + consolidation batching
|
||||||
|
5. **Resumable**: Both modes can resume from interruptions
|
||||||
|
6. **Safe**: ON CONFLICT DO NOTHING prevents duplicates
|
||||||
|
|||||||
@@ -111,8 +111,11 @@ python main.py migrate full --table RAWDATACOR
|
|||||||
# Migrare solo i cambiamenti dal last sync
|
# Migrare solo i cambiamenti dal last sync
|
||||||
python main.py migrate incremental
|
python main.py migrate incremental
|
||||||
|
|
||||||
# Con stato personalizzato
|
# Dry-run per vedere cosa verrebbe migrato
|
||||||
python main.py migrate incremental --state-file daily_sync.json
|
python main.py migrate incremental --dry-run
|
||||||
|
|
||||||
|
# Solo una tabella specifica
|
||||||
|
python main.py migrate incremental --table RAWDATACOR
|
||||||
```
|
```
|
||||||
|
|
||||||
### Benchmark
|
### Benchmark
|
||||||
|
|||||||
78
scripts/README.md
Normal file
78
scripts/README.md
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
# Migration Scripts
|
||||||
|
|
||||||
|
Utility scripts per la gestione della migrazione.
|
||||||
|
|
||||||
|
## sync_migration_state.py
|
||||||
|
|
||||||
|
Sincronizza la tabella `migration_state` con i dati effettivamente presenti in PostgreSQL.
|
||||||
|
|
||||||
|
### Quando usare
|
||||||
|
|
||||||
|
Usa questo script quando `migration_state` non è sincronizzato con i dati reali, ad esempio:
|
||||||
|
- Dopo inserimenti manuali in PostgreSQL
|
||||||
|
- Dopo corruzione dello stato
|
||||||
|
- Prima di eseguire migrazione incrementale su dati già esistenti
|
||||||
|
|
||||||
|
### Come funziona
|
||||||
|
|
||||||
|
Per ogni tabella (rawdatacor, elabdatadisp):
|
||||||
|
1. Trova la riga con MAX(created_at) - l'ultima riga inserita
|
||||||
|
2. Estrae la consolidation key da quella riga
|
||||||
|
3. Aggiorna `migration_state._global` con quella chiave
|
||||||
|
|
||||||
|
### Utilizzo
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Eseguire dalla root del progetto
|
||||||
|
python scripts/sync_migration_state.py
|
||||||
|
```
|
||||||
|
|
||||||
|
### Output
|
||||||
|
|
||||||
|
```
|
||||||
|
Syncing migration_state with actual PostgreSQL data...
|
||||||
|
================================================================================
|
||||||
|
|
||||||
|
ELABDATADISP:
|
||||||
|
Most recently inserted row (by created_at):
|
||||||
|
created_at: 2025-12-30 11:58:24
|
||||||
|
event_timestamp: 2025-12-30 14:58:24
|
||||||
|
Consolidation key: (ID0290, DT0007, 2025-12-30, 14:58:24)
|
||||||
|
✓ Updated migration_state with this key
|
||||||
|
|
||||||
|
RAWDATACOR:
|
||||||
|
Most recently inserted row (by created_at):
|
||||||
|
created_at: 2025-12-30 11:13:29
|
||||||
|
event_timestamp: 2025-12-30 11:11:39
|
||||||
|
Consolidation key: (ID0304, DT0024, 2025-12-30, 11:11:39)
|
||||||
|
✓ Updated migration_state with this key
|
||||||
|
|
||||||
|
================================================================================
|
||||||
|
✓ Done! Incremental migration will now start from the correct position.
|
||||||
|
```
|
||||||
|
|
||||||
|
### Effetti
|
||||||
|
|
||||||
|
Dopo aver eseguito questo script:
|
||||||
|
- `migration_state._global` sarà aggiornato con l'ultima chiave migrata
|
||||||
|
- `python main.py migrate incremental` partirà dalla posizione corretta
|
||||||
|
- Non verranno create duplicazioni (usa ON CONFLICT DO NOTHING)
|
||||||
|
|
||||||
|
### Avvertenze
|
||||||
|
|
||||||
|
- Esclude automaticamente dati corrotti (unit_name come `[Ljava.lang.String;@...`)
|
||||||
|
- Usa `created_at` per trovare l'ultima riga inserita (non `event_timestamp`)
|
||||||
|
- Sovrascrive lo stato globale esistente
|
||||||
|
|
||||||
|
### Verifica
|
||||||
|
|
||||||
|
Dopo aver eseguito lo script, verifica lo stato:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SELECT table_name, partition_name, last_key
|
||||||
|
FROM migration_state
|
||||||
|
WHERE partition_name = '_global'
|
||||||
|
ORDER BY table_name;
|
||||||
|
```
|
||||||
|
|
||||||
|
Dovrebbe mostrare le chiavi più recenti per entrambe le tabelle.
|
||||||
63
scripts/sync_migration_state.py
Executable file
63
scripts/sync_migration_state.py
Executable file
@@ -0,0 +1,63 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Sync migration_state with actual data in PostgreSQL tables."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
sys.path.insert(0, '/home/alex/devel/mysql2postgres')
|
||||||
|
|
||||||
|
from src.connectors.postgres_connector import PostgreSQLConnector
|
||||||
|
from src.migrator.state_manager import StateManager
|
||||||
|
|
||||||
|
def sync_table_state(table_name: str):
|
||||||
|
"""Sync migration_state for a table with its actual data."""
|
||||||
|
with PostgreSQLConnector() as pg_conn:
|
||||||
|
cursor = pg_conn.connection.cursor()
|
||||||
|
|
||||||
|
# Find the row with MAX(created_at) - most recently inserted
|
||||||
|
# Exclude corrupted data (Java strings)
|
||||||
|
cursor.execute(f"""
|
||||||
|
SELECT unit_name, tool_name_id,
|
||||||
|
DATE(event_timestamp)::text as event_date,
|
||||||
|
event_timestamp::time::text as event_time,
|
||||||
|
created_at,
|
||||||
|
event_timestamp
|
||||||
|
FROM {table_name}
|
||||||
|
WHERE unit_name NOT LIKE '[L%' -- Exclude corrupted Java strings
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
""")
|
||||||
|
|
||||||
|
result = cursor.fetchone()
|
||||||
|
if not result:
|
||||||
|
print(f"No data found in {table_name}")
|
||||||
|
return
|
||||||
|
|
||||||
|
unit_name, tool_name_id, event_date, event_time, created_at, event_timestamp = result
|
||||||
|
|
||||||
|
print(f"\n{table_name.upper()}:")
|
||||||
|
print(f" Most recently inserted row (by created_at):")
|
||||||
|
print(f" created_at: {created_at}")
|
||||||
|
print(f" event_timestamp: {event_timestamp}")
|
||||||
|
print(f" Consolidation key: ({unit_name}, {tool_name_id}, {event_date}, {event_time})")
|
||||||
|
|
||||||
|
# Update global migration_state with this key
|
||||||
|
state_mgr = StateManager(pg_conn, table_name, partition_name="_global")
|
||||||
|
|
||||||
|
last_key = {
|
||||||
|
"unit_name": unit_name,
|
||||||
|
"tool_name_id": tool_name_id,
|
||||||
|
"event_date": event_date,
|
||||||
|
"event_time": event_time
|
||||||
|
}
|
||||||
|
|
||||||
|
state_mgr.update_state(last_key=last_key)
|
||||||
|
print(f" ✓ Updated migration_state with this key")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("Syncing migration_state with actual PostgreSQL data...")
|
||||||
|
print("="*80)
|
||||||
|
|
||||||
|
sync_table_state("elabdatadisp")
|
||||||
|
sync_table_state("rawdatacor")
|
||||||
|
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("✓ Done! Incremental migration will now start from the correct position.")
|
||||||
Reference in New Issue
Block a user