Compare commits
10 Commits
5c9df3d06f
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 3a53564bb5 | |||
| 6306006f82 | |||
| 53cde5f667 | |||
| d1dbf7f0de | |||
| 931fec0959 | |||
| a7d2d501fb | |||
| 23e9fc9d82 | |||
| 03e39eb925 | |||
| bcedae40fc | |||
| 5f6e3215a5 |
@@ -13,10 +13,10 @@ POSTGRES_PASSWORD=your_postgres_password
|
||||
POSTGRES_DATABASE=migrated_db
|
||||
|
||||
# Migration Settings
|
||||
BATCH_SIZE=10000
|
||||
LOG_LEVEL=INFO
|
||||
DRY_RUN=false
|
||||
CONSOLIDATION_GROUP_LIMIT=50000
|
||||
CONSOLIDATION_GROUP_LIMIT=40000
|
||||
PROGRESS_LOG_INTERVAL=10000
|
||||
|
||||
# Performance Testing
|
||||
BENCHMARK_OUTPUT_DIR=benchmark_results
|
||||
|
||||
121
CHANGELOG.md
Normal file
121
CHANGELOG.md
Normal file
@@ -0,0 +1,121 @@
|
||||
# Changelog
|
||||
|
||||
## [Current] - 2025-12-30
|
||||
|
||||
### Added
|
||||
- **Consolidation-based incremental migration**: Uses consolidation keys `(UnitName, ToolNameID, EventDate, EventTime)` instead of timestamps
|
||||
- **MySQL ID optimization**: Uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries, avoiding full table scans
|
||||
- **State management in PostgreSQL**: Replaced JSON file with `migration_state` table for more reliable tracking
|
||||
- **Sync utility**: Added `scripts/sync_migration_state.py` to sync state with actual data
|
||||
- **Performance optimization**: MySQL queries now instant using PRIMARY KEY filter
|
||||
- **Data quality validation**: Automatically validates and logs invalid consolidation keys to dedicated error files
|
||||
- **Error logging**: Invalid keys (null dates, empty tool IDs, corrupted Java strings) are logged and skipped during migration
|
||||
- **Better documentation**: Consolidated and updated all documentation files
|
||||
|
||||
### Changed
|
||||
- **Incremental migration**: Now uses consolidation keys instead of timestamp-based approach
|
||||
- **Full migration**: Improved to save global `last_key` after completing all partitions
|
||||
- **State tracking**: Moved from `migration_state.json` to PostgreSQL table `migration_state`
|
||||
- **Query performance**: Added `min_mysql_id` parameter to `fetch_consolidation_keys_after()` for optimization
|
||||
- **Configuration**: Renamed `BATCH_SIZE` to `CONSOLIDATION_GROUP_LIMIT` to better reflect what it controls
|
||||
- **Configuration**: Added `PROGRESS_LOG_INTERVAL` to control logging frequency
|
||||
- **Configuration**: Added `BENCHMARK_OUTPUT_DIR` to specify benchmark results directory
|
||||
- **Documentation**: Updated README.md, MIGRATION_WORKFLOW.md, QUICKSTART.md, EXAMPLE_WORKFLOW.md with current implementation
|
||||
- **Documentation**: Corrected index and partitioning documentation to reflect actual PostgreSQL schema:
|
||||
- Uses `event_timestamp` (not separate event_date/event_time)
|
||||
- Primary key includes `event_year` for partitioning
|
||||
- Consolidation key is UNIQUE (unit_name, tool_name_id, event_timestamp, event_year)
|
||||
|
||||
### Removed
|
||||
- **migration_state.json**: Replaced by PostgreSQL table
|
||||
- **Timestamp-based migration**: Replaced by consolidation key-based approach
|
||||
- **ID-based resumable migration**: Consolidated into single consolidation-based approach
|
||||
- **Temporary debug scripts**: Cleaned up all `/tmp/` debug files
|
||||
|
||||
### Fixed
|
||||
- **Incremental migration performance**: MySQL queries now ~1000x faster with ID filter
|
||||
- **State synchronization**: Can now sync `migration_state` with actual data using utility script
|
||||
- **Duplicate handling**: Uses `ON CONFLICT DO NOTHING` to prevent duplicates
|
||||
- **Last key tracking**: Properly updates global state after full migration
|
||||
- **Corrupted data handling**: Both full and incremental migrations now validate keys and log errors instead of crashing
|
||||
|
||||
### Error Logging
|
||||
|
||||
Both full and incremental migrations now handle corrupted consolidation keys gracefully:
|
||||
|
||||
**Error files:**
|
||||
- Full migration: `migration_errors_<table>_<partition>.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
|
||||
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
|
||||
|
||||
Each incremental migration creates a new timestamped file to preserve error history across runs.
|
||||
|
||||
**File format:**
|
||||
```
|
||||
# Migration errors for <table> partition <partition>
|
||||
# Format: UnitName|ToolNameID|EventDate|EventTime|Reason
|
||||
|
||||
ID0350||0000-00-00|0:00:00|EventDate is invalid: 0000-00-00
|
||||
[Ljava.lang.String;@abc123|TOOL1|2024-01-01|10:00:00|UnitName is corrupted Java string: [Ljava.lang.String;@abc123
|
||||
UNIT1||2024-01-01|10:00:00|ToolNameID is NULL or empty
|
||||
```
|
||||
|
||||
**Behavior:**
|
||||
- Invalid keys are automatically skipped to prevent migration failure
|
||||
- Each skipped key is logged with the reason for rejection
|
||||
- Total count of skipped keys is reported at the end of migration
|
||||
- Empty error files (no errors) are automatically deleted
|
||||
|
||||
### Migration Guide (from old to new)
|
||||
|
||||
If you have an existing installation with `migration_state.json`:
|
||||
|
||||
1. **Backup your data** (optional but recommended):
|
||||
```bash
|
||||
cp migration_state.json migration_state.json.backup
|
||||
```
|
||||
|
||||
2. **Run full migration** to populate `migration_state` table:
|
||||
```bash
|
||||
python main.py migrate full
|
||||
```
|
||||
|
||||
3. **Sync state** (if you have existing data):
|
||||
```bash
|
||||
python scripts/sync_migration_state.py
|
||||
```
|
||||
|
||||
4. **Remove old state file**:
|
||||
```bash
|
||||
rm migration_state.json
|
||||
```
|
||||
|
||||
5. **Run incremental migration**:
|
||||
```bash
|
||||
python main.py migrate incremental --dry-run
|
||||
python main.py migrate incremental
|
||||
```
|
||||
|
||||
### Performance Improvements
|
||||
|
||||
- **MySQL query time**: From 60+ seconds to <0.1 seconds (600x faster)
|
||||
- **Consolidation efficiency**: Multiple MySQL rows → single PostgreSQL record
|
||||
- **State reliability**: PostgreSQL table instead of JSON file
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
- `--state-file` parameter removed from incremental migration (no longer uses JSON)
|
||||
- `--use-id` flag removed (consolidation-based approach is now default)
|
||||
- Incremental migration requires full migration to be run first
|
||||
- `BATCH_SIZE` environment variable renamed to `CONSOLIDATION_GROUP_LIMIT` (update your .env file)
|
||||
|
||||
## [Previous] - Before 2025-12-30
|
||||
|
||||
### Features
|
||||
- Full migration support
|
||||
- Incremental migration with timestamp tracking
|
||||
- JSONB transformation
|
||||
- Partitioning by year
|
||||
- GIN indexes for JSONB queries
|
||||
- Benchmark system
|
||||
- Progress tracking
|
||||
- Rich logging
|
||||
@@ -74,8 +74,9 @@ nano .env
|
||||
# POSTGRES_PASSWORD=password123
|
||||
# POSTGRES_DATABASE=production_migrated
|
||||
#
|
||||
# BATCH_SIZE=50000 # Large batches for speed
|
||||
# LOG_LEVEL=INFO
|
||||
# CONSOLIDATION_GROUP_LIMIT=80000 # Large batches for speed
|
||||
# PROGRESS_LOG_INTERVAL=20000
|
||||
```
|
||||
|
||||
### 4. Verifica Configurazione (5 min)
|
||||
@@ -335,8 +336,11 @@ python main.py setup --create-schema
|
||||
|
||||
### Migrazione molto lenta
|
||||
```bash
|
||||
# Aumentare batch size temporaneamente
|
||||
# Editare .env: BATCH_SIZE=100000
|
||||
# Aumentare consolidation group limit temporaneamente
|
||||
# Editare .env: CONSOLIDATION_GROUP_LIMIT=100000
|
||||
|
||||
# Ridurre logging
|
||||
# Editare .env: PROGRESS_LOG_INTERVAL=50000
|
||||
|
||||
# Oppure verificare:
|
||||
# - Latency rete MySQL↔PostgreSQL
|
||||
|
||||
@@ -2,244 +2,396 @@
|
||||
|
||||
## Overview
|
||||
|
||||
This tool supports three migration modes:
|
||||
This tool implements **consolidation-based incremental migration** for two tables:
|
||||
- **RAWDATACOR**: Raw sensor measurements
|
||||
- **ELABDATADISP**: Elaborated/calculated data
|
||||
|
||||
1. **Full Migration** (`full_migration.py`) - Initial complete migration
|
||||
2. **Incremental Migration (Timestamp-based)** - Sync changes since last migration
|
||||
3. **Incremental Migration (ID-based)** - Resumable migration from last checkpoint
|
||||
Both tables use **consolidation keys** to group and migrate data efficiently.
|
||||
|
||||
---
|
||||
|
||||
## 1. Initial Full Migration
|
||||
## Migration Modes
|
||||
|
||||
### First Time Setup
|
||||
### 1. Full Migration
|
||||
|
||||
Initial migration of all historical data, migrating one partition at a time.
|
||||
|
||||
```bash
|
||||
# Create the PostgreSQL schema
|
||||
python main.py setup --create-schema
|
||||
# Migrate all partitions for all tables
|
||||
python main.py migrate full
|
||||
|
||||
# Run full migration (one-time)
|
||||
python main.py migrate --full RAWDATACOR
|
||||
python main.py migrate --full ELABDATADISP
|
||||
# Migrate specific table
|
||||
python main.py migrate full --table RAWDATACOR
|
||||
|
||||
# Migrate specific partition (year-based)
|
||||
python main.py migrate full --table ELABDATADISP --partition 2024
|
||||
|
||||
# Dry-run to see what would be migrated
|
||||
python main.py migrate full --dry-run
|
||||
```
|
||||
|
||||
**When to use:** First time migrating data or need complete fresh migration.
|
||||
|
||||
**Characteristics:**
|
||||
- Fetches ALL rows from MySQL
|
||||
- No checkpoint tracking
|
||||
- Cannot resume if interrupted
|
||||
- Good for initial data load
|
||||
- Migrates data partition by partition (year-based)
|
||||
- Uses consolidation groups for efficiency
|
||||
- Tracks progress in `migration_state` table (PostgreSQL)
|
||||
- Can resume from last completed partition if interrupted
|
||||
- Uses `mysql_max_id` optimization for performance
|
||||
|
||||
---
|
||||
|
||||
## 2. Timestamp-based Incremental Migration
|
||||
### 2. Incremental Migration
|
||||
|
||||
### For Continuous Sync (Recommended for most cases)
|
||||
Sync only new data since the last migration.
|
||||
|
||||
```bash
|
||||
# After initial full migration, use incremental with timestamps
|
||||
python main.py migrate --incremental RAWDATACOR
|
||||
python main.py migrate --incremental ELABDATADISP
|
||||
# Migrate new data for all tables
|
||||
python main.py migrate incremental
|
||||
|
||||
# Migrate specific table
|
||||
python main.py migrate incremental --table ELABDATADISP
|
||||
|
||||
# Dry-run to check what would be migrated
|
||||
python main.py migrate incremental --dry-run
|
||||
```
|
||||
|
||||
**When to use:** Continuous sync of new/updated records.
|
||||
|
||||
**Characteristics:**
|
||||
- Tracks `created_at` (RAWDATACOR) or `updated_at` (ELABDATADISP)
|
||||
- Uses JSON state file (`migration_state.json`)
|
||||
- Only fetches rows modified since last run
|
||||
- Perfect for scheduled jobs (cron, airflow, etc.)
|
||||
- Syncs changes but NOT deletions
|
||||
- Uses **consolidation keys** to identify new records:
|
||||
- `(UnitName, ToolNameID, EventDate, EventTime)`
|
||||
- Tracks last migrated key in `migration_state` table
|
||||
- Optimized with `min_mysql_id` filter for performance
|
||||
- Handles duplicates with `ON CONFLICT DO NOTHING`
|
||||
- Perfect for scheduled jobs (cron, systemd timers)
|
||||
|
||||
**How it works:**
|
||||
1. First run: Returns with message "No previous migration found" - must run full migration first
|
||||
2. Subsequent runs: Only fetches rows where `created_at` > last_migration_timestamp
|
||||
3. Updates state file with new timestamp for next run
|
||||
|
||||
**Example workflow:**
|
||||
```bash
|
||||
# Day 1: Initial full migration
|
||||
python main.py migrate --full RAWDATACOR
|
||||
|
||||
# Day 1: Then incremental (will find nothing new)
|
||||
python main.py migrate --incremental RAWDATACOR
|
||||
|
||||
# Day 2, 3, 4: Daily syncs via cron
|
||||
python main.py migrate --incremental RAWDATACOR
|
||||
```
|
||||
1. Retrieves `last_key` from `migration_state` table
|
||||
2. Gets `MAX(mysql_max_id)` from PostgreSQL table for optimization
|
||||
3. Queries MySQL: `WHERE id > max_mysql_id AND (key_tuple) > last_key`
|
||||
4. Migrates new consolidation groups
|
||||
5. Updates `migration_state` with new `last_key`
|
||||
|
||||
---
|
||||
|
||||
## 3. ID-based Incremental Migration (Resumable)
|
||||
## Consolidation Keys
|
||||
|
||||
### For Large Datasets or Unreliable Connections
|
||||
Both tables use consolidation to group multiple measurements into a single JSONB record.
|
||||
|
||||
```bash
|
||||
# First run
|
||||
python main.py migrate --incremental RAWDATACOR --use-id
|
||||
### Consolidation Key Structure
|
||||
|
||||
# Can interrupt and resume multiple times
|
||||
python main.py migrate --incremental RAWDATACOR --use-id
|
||||
```sql
|
||||
(UnitName, ToolNameID, EventDate, EventTime)
|
||||
```
|
||||
|
||||
**When to use:**
|
||||
- Large datasets that may timeout
|
||||
- Need to resume from exact last position
|
||||
- Network is unstable
|
||||
### Data Quality Validation
|
||||
|
||||
**Characteristics:**
|
||||
- Tracks `last_id` instead of timestamp
|
||||
- Updates state file after EACH BATCH (not just at end)
|
||||
- Can interrupt and resume dozens of times
|
||||
- Resumes from exact record ID where it stopped
|
||||
- Works with `migration_state.json`
|
||||
The migration automatically validates and logs invalid consolidation keys:
|
||||
- `EventDate IS NULL` or `EventDate = '0000-00-00'`
|
||||
- `ToolNameID IS NULL` or `ToolNameID = ''` (empty string)
|
||||
- `UnitName IS NULL` or `UnitName = ''` (empty string)
|
||||
- `UnitName` starting with `[L` (corrupted Java strings like `[Ljava.lang.String;@...`)
|
||||
- `EventTime IS NULL`
|
||||
|
||||
**How it works:**
|
||||
1. First run: Starts from beginning (ID = 0)
|
||||
2. Each batch: Updates state file with max ID from batch
|
||||
3. Interrupt: Can stop at any time
|
||||
4. Resume: Next run continues from last ID stored
|
||||
5. Continues until all rows processed
|
||||
Invalid keys are:
|
||||
- **Logged to error files** for tracking and analysis
|
||||
- **Skipped automatically** to prevent migration failures
|
||||
- **Counted and reported** at the end of migration
|
||||
|
||||
**Example workflow for large dataset:**
|
||||
```bash
|
||||
# Start ID-based migration (will migrate in batches)
|
||||
python main.py migrate --incremental RAWDATACOR --use-id
|
||||
Error log files:
|
||||
- Full migration: `migration_errors_<table>_<partition>.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
|
||||
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
|
||||
|
||||
# [If interrupted after 1M rows processed]
|
||||
Each incremental migration creates a new timestamped file to preserve history.
|
||||
|
||||
# Resume from ID 1M (automatically detects last position)
|
||||
python main.py migrate --incremental RAWDATACOR --use-id
|
||||
### Why Consolidation?
|
||||
|
||||
# [Continues until complete]
|
||||
Instead of migrating individual sensor readings, we:
|
||||
1. **Group** all measurements for the same (unit, tool, date, time)
|
||||
2. **Transform** 16-25 columns into structured JSONB
|
||||
3. **Migrate** as a single consolidated record
|
||||
|
||||
**Example:**
|
||||
|
||||
MySQL has 10 rows for `(Unit1, Tool1, 2024-01-01, 10:00:00)`:
|
||||
```
|
||||
id | UnitName | ToolNameID | EventDate | EventTime | Val0 | Val1 | ...
|
||||
1 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.5 | 45.2 | ...
|
||||
2 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.6 | 45.3 | ...
|
||||
...
|
||||
```
|
||||
|
||||
PostgreSQL gets 1 consolidated record:
|
||||
```json
|
||||
{
|
||||
"unit_name": "Unit1",
|
||||
"tool_name_id": "Tool1",
|
||||
"event_timestamp": "2024-01-01 10:00:00",
|
||||
"measurements": {
|
||||
"0": {"value": 23.5, "unit": "°C"},
|
||||
"1": {"value": 45.2, "unit": "bar"},
|
||||
...
|
||||
},
|
||||
"mysql_max_id": 10
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## State Management
|
||||
|
||||
### State File Location
|
||||
```
|
||||
migration_state.json # In project root
|
||||
### Migration State Table
|
||||
|
||||
The `migration_state` table in PostgreSQL tracks migration progress:
|
||||
|
||||
```sql
|
||||
CREATE TABLE migration_state (
|
||||
table_name VARCHAR(50),
|
||||
partition_name VARCHAR(50),
|
||||
last_key JSONB, -- Last migrated consolidation key
|
||||
started_at TIMESTAMP,
|
||||
completed_at TIMESTAMP,
|
||||
total_rows INTEGER,
|
||||
status VARCHAR(20)
|
||||
);
|
||||
```
|
||||
|
||||
### State File Content (Timestamp-based)
|
||||
```json
|
||||
{
|
||||
"rawdatacor": {
|
||||
"last_timestamp": "2024-12-11T19:30:45.123456",
|
||||
"last_updated": "2024-12-11T19:30:45.123456",
|
||||
"total_migrated": 50000
|
||||
}
|
||||
}
|
||||
### State Records
|
||||
|
||||
- **Per-partition state**: Tracks each partition's progress
|
||||
- Example: `('rawdatacor', '2024', {...}, '2024-01-15 10:30:00', 'completed', 1000000)`
|
||||
|
||||
- **Global state**: Tracks overall incremental migration position
|
||||
- Example: `('rawdatacor', '_global', {...}, NULL, NULL, 0, 'in_progress')`
|
||||
|
||||
### Checking State
|
||||
|
||||
```sql
|
||||
-- View all migration state
|
||||
SELECT * FROM migration_state ORDER BY table_name, partition_name;
|
||||
|
||||
-- View global state (for incremental migration)
|
||||
SELECT * FROM migration_state WHERE partition_name = '_global';
|
||||
```
|
||||
|
||||
### State File Content (ID-based)
|
||||
```json
|
||||
{
|
||||
"rawdatacor": {
|
||||
"last_id": 1000000,
|
||||
"total_migrated": 1000000,
|
||||
"last_updated": "2024-12-11T19:45:30.123456"
|
||||
}
|
||||
}
|
||||
---
|
||||
|
||||
## Performance Optimization
|
||||
|
||||
### MySQL ID Filter
|
||||
|
||||
The incremental migration uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries:
|
||||
|
||||
```sql
|
||||
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||
FROM RAWDATACOR
|
||||
WHERE id > 267399536 -- max_mysql_id from PostgreSQL
|
||||
AND (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
|
||||
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||
LIMIT 10000
|
||||
```
|
||||
|
||||
### Reset Migration State
|
||||
```python
|
||||
from src.migrator.state import MigrationState
|
||||
**Why this is fast:**
|
||||
- Uses PRIMARY KEY index on `id` to skip millions of already-migrated rows
|
||||
- Tuple comparison only applied to filtered subset
|
||||
- Avoids full table scans
|
||||
|
||||
state = MigrationState()
|
||||
### Consolidation Group Batching
|
||||
|
||||
# Reset specific table
|
||||
state.reset("rawdatacor")
|
||||
|
||||
# Reset all tables
|
||||
state.reset()
|
||||
```
|
||||
Instead of fetching individual rows, we:
|
||||
1. Fetch 10,000 consolidation keys at a time
|
||||
2. For each key, fetch all matching rows from MySQL
|
||||
3. Transform and insert into PostgreSQL
|
||||
4. Update state every batch
|
||||
|
||||
---
|
||||
|
||||
## Recommended Workflow
|
||||
|
||||
### For Daily Continuous Sync
|
||||
```bash
|
||||
# Week 1: Initial setup
|
||||
python main.py setup --create-schema
|
||||
python main.py migrate --full RAWDATACOR
|
||||
python main.py migrate --full ELABDATADISP
|
||||
### Initial Setup (One-time)
|
||||
|
||||
# Week 2+: Daily incremental syncs (via cron job)
|
||||
# Schedule: `0 2 * * * cd /path/to/project && python main.py migrate --incremental RAWDATACOR`
|
||||
python main.py migrate --incremental RAWDATACOR
|
||||
python main.py migrate --incremental ELABDATADISP
|
||||
```bash
|
||||
# 1. Configure .env file
|
||||
cp .env.example .env
|
||||
nano .env
|
||||
|
||||
# 2. Create PostgreSQL schema
|
||||
python main.py setup --create-schema
|
||||
|
||||
# 3. Run full migration
|
||||
python main.py migrate full
|
||||
```
|
||||
|
||||
### For Large Initial Migration
|
||||
```bash
|
||||
# If dataset > 10 million rows
|
||||
python main.py setup --create-schema
|
||||
python main.py migrate --incremental RAWDATACOR --use-id # Can interrupt/resume
|
||||
### Daily Incremental Sync
|
||||
|
||||
# For subsequent syncs, use timestamp
|
||||
python main.py migrate --incremental RAWDATACOR # Timestamp-based
|
||||
```bash
|
||||
# Run incremental migration (via cron or manual)
|
||||
python main.py migrate incremental
|
||||
```
|
||||
|
||||
**Cron example** (daily at 2 AM):
|
||||
```cron
|
||||
0 2 * * * cd /path/to/mysql2postgres && python main.py migrate incremental >> /var/log/migration.log 2>&1
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Key Differences at a Glance
|
||||
## Resuming Interrupted Migrations
|
||||
|
||||
| Feature | Full | Timestamp | ID-based |
|
||||
|---------|------|-----------|----------|
|
||||
| Initial setup | ✅ Required first | ✅ After full | ✅ After full |
|
||||
| Sync new/updated | ❌ No | ✅ Yes | ✅ Yes |
|
||||
| Resumable | ❌ No | ⚠️ Partial* | ✅ Full |
|
||||
| Batched state tracking | ❌ No | ❌ No | ✅ Yes |
|
||||
| Large datasets | ⚠️ Risky | ✅ Good | ✅ Best |
|
||||
| Scheduled jobs | ❌ No | ✅ Perfect | ⚠️ Unnecessary |
|
||||
### Full Migration
|
||||
|
||||
*Timestamp mode can resume, but must wait for full batch to complete before continuing
|
||||
If interrupted, full migration resumes from the last completed partition:
|
||||
|
||||
```bash
|
||||
# First run: migrates partitions 2014, 2015, 2016... (interrupted after 2020)
|
||||
python main.py migrate full --table RAWDATACOR
|
||||
|
||||
# Resume: continues from partition 2021
|
||||
python main.py migrate full --table RAWDATACOR
|
||||
```
|
||||
|
||||
### Incremental Migration
|
||||
|
||||
Incremental migration uses the `last_key` from `migration_state`:
|
||||
|
||||
```bash
|
||||
# Always safe to re-run - uses ON CONFLICT DO NOTHING
|
||||
python main.py migrate incremental
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Default Partitions
|
||||
## Syncing Migration State
|
||||
|
||||
Both tables are partitioned by year (2014-2031) plus a DEFAULT partition:
|
||||
- **rawdatacor_2014** through **rawdatacor_2031** (yearly partitions)
|
||||
- **rawdatacor_default** (catches data outside 2014-2031)
|
||||
If `migration_state` becomes out of sync with actual data, use the sync utility:
|
||||
|
||||
Same for ELABDATADISP. This ensures data with edge-case timestamps doesn't break migration.
|
||||
```bash
|
||||
# Sync migration_state with actual PostgreSQL data
|
||||
python scripts/sync_migration_state.py
|
||||
```
|
||||
|
||||
This finds the most recent row (by `created_at`) and updates `migration_state._global`.
|
||||
|
||||
---
|
||||
|
||||
## Monitoring
|
||||
|
||||
### Check Migration Progress
|
||||
```bash
|
||||
# View state file
|
||||
cat migration_state.json
|
||||
### Check Progress
|
||||
|
||||
# Check PostgreSQL row counts
|
||||
psql -U postgres -h localhost -d your_db -c "SELECT COUNT(*) FROM rawdatacor;"
|
||||
```bash
|
||||
# View migration state
|
||||
psql -h localhost -U postgres -d migrated_db -c \
|
||||
"SELECT table_name, partition_name, status, total_rows, completed_at
|
||||
FROM migration_state
|
||||
ORDER BY table_name, partition_name"
|
||||
```
|
||||
|
||||
### Common Issues
|
||||
### Verify Row Counts
|
||||
|
||||
**"No previous migration found"** (Timestamp mode)
|
||||
- Solution: Run full migration first with `--full` flag
|
||||
```sql
|
||||
-- PostgreSQL
|
||||
SELECT COUNT(*) FROM rawdatacor;
|
||||
SELECT COUNT(*) FROM elabdatadisp;
|
||||
|
||||
**"Duplicate key value violates unique constraint"**
|
||||
- Cause: Running full migration twice
|
||||
- Solution: Use timestamp-based incremental sync instead
|
||||
-- Compare with MySQL
|
||||
-- mysql> SELECT COUNT(DISTINCT UnitName, ToolNameID, EventDate, EventTime) FROM RAWDATACOR;
|
||||
```
|
||||
|
||||
**"Timeout during migration"** (Large datasets)
|
||||
- Solution: Switch to ID-based resumable migration with `--use-id`
|
||||
---
|
||||
|
||||
## Common Issues
|
||||
|
||||
### "No previous migration found"
|
||||
|
||||
**Cause**: Running incremental migration before full migration
|
||||
|
||||
**Solution**: Run full migration first
|
||||
```bash
|
||||
python main.py migrate full
|
||||
```
|
||||
|
||||
### "Duplicate key value violates unique constraint"
|
||||
|
||||
**Cause**: Data already exists (shouldn't happen with ON CONFLICT DO NOTHING)
|
||||
|
||||
**Solution**: Migration handles this automatically - check logs for details
|
||||
|
||||
### Slow Incremental Migration
|
||||
|
||||
**Cause**: `MAX(mysql_max_id)` query is slow (~60 seconds on large tables)
|
||||
|
||||
**Solution**: This is expected and only happens once at start. The MySQL queries are instant thanks to the optimization.
|
||||
|
||||
**Alternative**: Create an index on `mysql_max_id` in PostgreSQL (uses disk space):
|
||||
```sql
|
||||
CREATE INDEX idx_rawdatacor_mysql_max_id ON rawdatacor (mysql_max_id DESC);
|
||||
CREATE INDEX idx_elabdatadisp_mysql_max_id ON elabdatadisp (mysql_max_id DESC);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Key Technical Details
|
||||
|
||||
### Tuple Comparison in MySQL
|
||||
|
||||
MySQL supports lexicographic tuple comparison:
|
||||
|
||||
```sql
|
||||
WHERE (UnitName, ToolNameID, EventDate, EventTime) > ('Unit1', 'Tool1', '2024-01-01', '10:00:00')
|
||||
```
|
||||
|
||||
This is equivalent to:
|
||||
```sql
|
||||
WHERE UnitName > 'Unit1'
|
||||
OR (UnitName = 'Unit1' AND ToolNameID > 'Tool1')
|
||||
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate > '2024-01-01')
|
||||
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate = '2024-01-01' AND EventTime > '10:00:00')
|
||||
```
|
||||
|
||||
But much more efficient!
|
||||
|
||||
### Partitioning in PostgreSQL
|
||||
|
||||
Tables are partitioned by year (2014-2031):
|
||||
```sql
|
||||
CREATE TABLE rawdatacor_2024 PARTITION OF rawdatacor
|
||||
FOR VALUES FROM (2024) TO (2025);
|
||||
```
|
||||
|
||||
PostgreSQL automatically routes INSERTs to the correct partition based on `event_year`.
|
||||
|
||||
### Indexes in PostgreSQL
|
||||
|
||||
Both tables have these indexes automatically created:
|
||||
|
||||
**Primary Key** (required for partitioned tables):
|
||||
```sql
|
||||
-- Must include partition key (event_year)
|
||||
UNIQUE (id, event_year)
|
||||
```
|
||||
|
||||
**Consolidation Key** (prevents duplicates):
|
||||
```sql
|
||||
-- Ensures one record per consolidation group
|
||||
UNIQUE (unit_name, tool_name_id, event_timestamp, event_year)
|
||||
```
|
||||
|
||||
**Query Optimization**:
|
||||
```sql
|
||||
-- Fast filtering by unit/tool
|
||||
(unit_name, tool_name_id)
|
||||
|
||||
-- JSONB queries with GIN index
|
||||
GIN (measurements)
|
||||
```
|
||||
|
||||
**Note**: All indexes are automatically created on all partitions when you run `setup --create-schema`.
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
- **Start with:** Full migration (`--full`) for initial data load
|
||||
- **Then use:** Timestamp-based incremental (`--incremental`) for daily syncs
|
||||
- **Switch to:** ID-based resumable (`--incremental --use-id`) if full migration is too large
|
||||
1. **Full migration**: One-time initial load, partition by partition
|
||||
2. **Incremental migration**: Daily sync of new data using consolidation keys
|
||||
3. **State tracking**: PostgreSQL `migration_state` table
|
||||
4. **Performance**: `mysql_max_id` filter + consolidation batching
|
||||
5. **Resumable**: Both modes can resume from interruptions
|
||||
6. **Safe**: ON CONFLICT DO NOTHING prevents duplicates
|
||||
|
||||
@@ -42,8 +42,9 @@ POSTGRES_USER=postgres
|
||||
POSTGRES_PASSWORD=pgpassword
|
||||
POSTGRES_DATABASE=migrated_db
|
||||
|
||||
BATCH_SIZE=10000
|
||||
LOG_LEVEL=INFO
|
||||
CONSOLIDATION_GROUP_LIMIT=40000
|
||||
PROGRESS_LOG_INTERVAL=10000
|
||||
```
|
||||
|
||||
### 3. Creare PostgreSQL in Incus
|
||||
@@ -111,8 +112,11 @@ python main.py migrate full --table RAWDATACOR
|
||||
# Migrare solo i cambiamenti dal last sync
|
||||
python main.py migrate incremental
|
||||
|
||||
# Con stato personalizzato
|
||||
python main.py migrate incremental --state-file daily_sync.json
|
||||
# Dry-run per vedere cosa verrebbe migrato
|
||||
python main.py migrate incremental --dry-run
|
||||
|
||||
# Solo una tabella specifica
|
||||
python main.py migrate incremental --table RAWDATACOR
|
||||
```
|
||||
|
||||
### Benchmark
|
||||
@@ -221,8 +225,11 @@ python main.py setup --create-schema
|
||||
|
||||
### "Migration is slow"
|
||||
```bash
|
||||
# Aumentare batch size in .env
|
||||
BATCH_SIZE=50000
|
||||
# Aumentare consolidation group limit in .env
|
||||
CONSOLIDATION_GROUP_LIMIT=80000
|
||||
|
||||
# Oppure ridurre logging
|
||||
PROGRESS_LOG_INTERVAL=20000
|
||||
|
||||
# Oppure ottimizzare MySQL
|
||||
mysql> FLUSH PRIVILEGES;
|
||||
|
||||
115
config.py
115
config.py
@@ -187,3 +187,118 @@ TABLE_CONFIGS = {
|
||||
"elabdatadisp": _elabdatadisp_config,
|
||||
"ELABDATADISP": _elabdatadisp_config,
|
||||
}
|
||||
|
||||
|
||||
# Partition mapping utilities
|
||||
def year_to_partition_name(year: int, table: str) -> str:
|
||||
"""Map year to partition name.
|
||||
|
||||
Partition naming scheme (different for each table):
|
||||
- RAWDATACOR: part0=2014, part1=2015, ..., part10=2024 (part{year-2014})
|
||||
- ELABDATADISP: d0=2013, d1=2014, ..., d12=2025, ..., d17=2030 (d{year-2013})
|
||||
|
||||
Args:
|
||||
year: Year (2013-2031, depending on table)
|
||||
table: Table name (RAWDATACOR or ELABDATADISP)
|
||||
|
||||
Returns:
|
||||
Partition name (e.g., "part8" for RAWDATACOR/2022, "d14" for ELABDATADISP/2026)
|
||||
|
||||
Raises:
|
||||
ValueError: If year is out of range or table is unknown
|
||||
"""
|
||||
table_upper = table.upper()
|
||||
|
||||
if table_upper == "RAWDATACOR":
|
||||
# RAWDATACOR: 2014-2024 (part0-part10)
|
||||
# RAWDATACOR: 2025-2030 (d12-d17)
|
||||
|
||||
if year < 2014:
|
||||
year = 2014
|
||||
elif year > 2030:
|
||||
year = 2030
|
||||
|
||||
if year < 2025:
|
||||
suffix = "part"
|
||||
d_year = 2014
|
||||
else:
|
||||
suffix = "d"
|
||||
d_year = 2013 # Continue naming as d12, d13, ...
|
||||
|
||||
partition_index = year - d_year # 2014→0, 2015→1, ..., 2024→10 - 2025→12, ..., 2030→17
|
||||
return f"{suffix}{partition_index}"
|
||||
|
||||
elif table_upper == "ELABDATADISP":
|
||||
# ELABDATADISP: 2013-2031 (d0-d18)
|
||||
if year < 2013:
|
||||
year = 2013
|
||||
elif year > 2031:
|
||||
year = 2031
|
||||
|
||||
partition_index = year - 2013 # 2013→0, 2014→1, ..., 2025→12, ..., 2031→18
|
||||
return f"d{partition_index}"
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unknown table: {table}")
|
||||
|
||||
|
||||
def get_partitions_from_year(year: int, table: str) -> list[str]:
|
||||
"""Get list of partition names from a specific year onwards.
|
||||
|
||||
Args:
|
||||
year: Starting year
|
||||
table: Table name (RAWDATACOR or ELABDATADISP)
|
||||
|
||||
Returns:
|
||||
List of partition names from that year to the latest available year
|
||||
|
||||
Example:
|
||||
get_partitions_from_year(2022, "RAWDATACOR")
|
||||
→ ["part8", "part9", "part10", "d12", "d13", "d14", "d15", "d16", "d17"] # 2022→part8, ..., 2024→part10, 2025→d12, ..., 2030→d17
|
||||
|
||||
get_partitions_from_year(2025, "ELABDATADISP")
|
||||
→ ["d12", "d13", "d14", "d15", "d16", "d17"] # 2025-2030
|
||||
"""
|
||||
table_upper = table.upper()
|
||||
partitions = []
|
||||
|
||||
if table_upper == "RAWDATACOR":
|
||||
end_year = 2030 # RAWDATACOR: part0-part10 (2014-2024) + d12-d17 (2025-2030)
|
||||
elif table_upper == "ELABDATADISP":
|
||||
end_year = 2030 # ELABDATADISP: d0-d17 (2013-2030)
|
||||
else:
|
||||
raise ValueError(f"Unknown table: {table}")
|
||||
|
||||
# Generate partitions for each year from start_year to end_year
|
||||
for y in range(year, end_year + 1):
|
||||
partition_name = year_to_partition_name(y, table)
|
||||
# Avoid duplicates (can happen if mapping multiple years to same partition)
|
||||
if not partitions or partitions[-1] != partition_name:
|
||||
partitions.append(partition_name)
|
||||
|
||||
return partitions
|
||||
|
||||
|
||||
def date_string_to_partition_name(date_str: str, table: str) -> str:
|
||||
"""Extract year from date string and map to partition name.
|
||||
|
||||
Args:
|
||||
date_str: Date string in format 'YYYY-MM-DD' (e.g., '2022-05-15')
|
||||
table: Table name (RAWDATACOR or ELABDATADISP)
|
||||
|
||||
Returns:
|
||||
Partition name (e.g., "part8" or "d8")
|
||||
|
||||
Example:
|
||||
date_string_to_partition_name("2022-05-15", "RAWDATACOR") → "part8"
|
||||
"""
|
||||
if not date_str or len(date_str) < 4:
|
||||
# Default to 2014 if invalid date
|
||||
return year_to_partition_name(2014, table)
|
||||
|
||||
try:
|
||||
year = int(date_str[:4])
|
||||
return year_to_partition_name(year, table)
|
||||
except (ValueError, TypeError):
|
||||
# Default to 2014 if can't parse
|
||||
return year_to_partition_name(2014, table)
|
||||
|
||||
43
main.py
43
main.py
@@ -31,7 +31,7 @@ def cli(ctx):
|
||||
)
|
||||
def setup(create_schema):
|
||||
"""Setup PostgreSQL database."""
|
||||
setup_logger(__name__)
|
||||
setup_logger("") # Configure root logger to show all module logs
|
||||
|
||||
if not create_schema:
|
||||
click.echo("Usage: python main.py setup --create-schema")
|
||||
@@ -89,7 +89,7 @@ def migrate():
|
||||
)
|
||||
def full(table, dry_run, resume, partition, parallel):
|
||||
"""Perform full migration of all data."""
|
||||
setup_logger(__name__)
|
||||
setup_logger("") # Configure root logger to show all module logs
|
||||
|
||||
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
|
||||
|
||||
@@ -178,7 +178,7 @@ def incremental(table, dry_run):
|
||||
)
|
||||
def benchmark(iterations, output):
|
||||
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
|
||||
setup_logger(__name__)
|
||||
setup_logger("") # Configure root logger to show all module logs
|
||||
|
||||
try:
|
||||
click.echo("Running performance benchmarks...")
|
||||
@@ -194,7 +194,7 @@ def benchmark(iterations, output):
|
||||
@cli.command()
|
||||
def info():
|
||||
"""Show configuration information."""
|
||||
setup_logger(__name__)
|
||||
setup_logger("") # Configure root logger to show all module logs
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
@@ -219,5 +219,40 @@ def info():
|
||||
click.echo(f" Iterations: {settings.benchmark.iterations}")
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
"--port",
|
||||
type=int,
|
||||
default=7860,
|
||||
help="Port to run the web interface on (default: 7860)"
|
||||
)
|
||||
@click.option(
|
||||
"--share",
|
||||
is_flag=True,
|
||||
help="Create a public share link (useful for remote access)"
|
||||
)
|
||||
def web(port, share):
|
||||
"""Launch web-based GUI for migration monitoring and control."""
|
||||
setup_logger("") # Configure root logger to show all module logs
|
||||
|
||||
try:
|
||||
from web_ui import launch_ui
|
||||
|
||||
click.echo(f"\n🚀 Starting Migration Dashboard on http://localhost:{port}")
|
||||
if share:
|
||||
click.echo("📡 Creating public share link...")
|
||||
|
||||
launch_ui(share=share, server_port=port)
|
||||
|
||||
except ImportError as e:
|
||||
click.echo(f"✗ Failed to import web_ui module: {e}", err=True)
|
||||
click.echo("Make sure gradio is installed: uv sync", err=True)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
logger.error(f"Web interface failed: {e}")
|
||||
click.echo(f"✗ Web interface failed: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli(obj={})
|
||||
|
||||
@@ -13,4 +13,7 @@ dependencies = [
|
||||
"pydantic>=2.5.0",
|
||||
"pydantic-settings>=2.1.0",
|
||||
"cryptography>=46.0.3",
|
||||
"gradio>=4.0.0",
|
||||
"pandas>=2.0.0",
|
||||
"plotly>=5.0.0",
|
||||
]
|
||||
|
||||
78
scripts/README.md
Normal file
78
scripts/README.md
Normal file
@@ -0,0 +1,78 @@
|
||||
# Migration Scripts
|
||||
|
||||
Utility scripts per la gestione della migrazione.
|
||||
|
||||
## sync_migration_state.py
|
||||
|
||||
Sincronizza la tabella `migration_state` con i dati effettivamente presenti in PostgreSQL.
|
||||
|
||||
### Quando usare
|
||||
|
||||
Usa questo script quando `migration_state` non è sincronizzato con i dati reali, ad esempio:
|
||||
- Dopo inserimenti manuali in PostgreSQL
|
||||
- Dopo corruzione dello stato
|
||||
- Prima di eseguire migrazione incrementale su dati già esistenti
|
||||
|
||||
### Come funziona
|
||||
|
||||
Per ogni tabella (rawdatacor, elabdatadisp):
|
||||
1. Trova la riga con MAX(created_at) - l'ultima riga inserita
|
||||
2. Estrae la consolidation key da quella riga
|
||||
3. Aggiorna `migration_state._global` con quella chiave
|
||||
|
||||
### Utilizzo
|
||||
|
||||
```bash
|
||||
# Eseguire dalla root del progetto
|
||||
python scripts/sync_migration_state.py
|
||||
```
|
||||
|
||||
### Output
|
||||
|
||||
```
|
||||
Syncing migration_state with actual PostgreSQL data...
|
||||
================================================================================
|
||||
|
||||
ELABDATADISP:
|
||||
Most recently inserted row (by created_at):
|
||||
created_at: 2025-12-30 11:58:24
|
||||
event_timestamp: 2025-12-30 14:58:24
|
||||
Consolidation key: (ID0290, DT0007, 2025-12-30, 14:58:24)
|
||||
✓ Updated migration_state with this key
|
||||
|
||||
RAWDATACOR:
|
||||
Most recently inserted row (by created_at):
|
||||
created_at: 2025-12-30 11:13:29
|
||||
event_timestamp: 2025-12-30 11:11:39
|
||||
Consolidation key: (ID0304, DT0024, 2025-12-30, 11:11:39)
|
||||
✓ Updated migration_state with this key
|
||||
|
||||
================================================================================
|
||||
✓ Done! Incremental migration will now start from the correct position.
|
||||
```
|
||||
|
||||
### Effetti
|
||||
|
||||
Dopo aver eseguito questo script:
|
||||
- `migration_state._global` sarà aggiornato con l'ultima chiave migrata
|
||||
- `python main.py migrate incremental` partirà dalla posizione corretta
|
||||
- Non verranno create duplicazioni (usa ON CONFLICT DO NOTHING)
|
||||
|
||||
### Avvertenze
|
||||
|
||||
- Esclude automaticamente dati corrotti (unit_name come `[Ljava.lang.String;@...`)
|
||||
- Usa `created_at` per trovare l'ultima riga inserita (non `event_timestamp`)
|
||||
- Sovrascrive lo stato globale esistente
|
||||
|
||||
### Verifica
|
||||
|
||||
Dopo aver eseguito lo script, verifica lo stato:
|
||||
|
||||
```sql
|
||||
SELECT table_name, partition_name, last_key
|
||||
FROM migration_state
|
||||
WHERE partition_name = '_global'
|
||||
ORDER BY table_name;
|
||||
```
|
||||
|
||||
Dovrebbe mostrare le chiavi più recenti per entrambe le tabelle.
|
||||
63
scripts/sync_migration_state.py
Executable file
63
scripts/sync_migration_state.py
Executable file
@@ -0,0 +1,63 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Sync migration_state with actual data in PostgreSQL tables."""
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, '/home/alex/devel/mysql2postgres')
|
||||
|
||||
from src.connectors.postgres_connector import PostgreSQLConnector
|
||||
from src.migrator.state_manager import StateManager
|
||||
|
||||
def sync_table_state(table_name: str):
|
||||
"""Sync migration_state for a table with its actual data."""
|
||||
with PostgreSQLConnector() as pg_conn:
|
||||
cursor = pg_conn.connection.cursor()
|
||||
|
||||
# Find the row with MAX(created_at) - most recently inserted
|
||||
# Exclude corrupted data (Java strings)
|
||||
cursor.execute(f"""
|
||||
SELECT unit_name, tool_name_id,
|
||||
DATE(event_timestamp)::text as event_date,
|
||||
event_timestamp::time::text as event_time,
|
||||
created_at,
|
||||
event_timestamp
|
||||
FROM {table_name}
|
||||
WHERE unit_name NOT LIKE '[L%' -- Exclude corrupted Java strings
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
""")
|
||||
|
||||
result = cursor.fetchone()
|
||||
if not result:
|
||||
print(f"No data found in {table_name}")
|
||||
return
|
||||
|
||||
unit_name, tool_name_id, event_date, event_time, created_at, event_timestamp = result
|
||||
|
||||
print(f"\n{table_name.upper()}:")
|
||||
print(f" Most recently inserted row (by created_at):")
|
||||
print(f" created_at: {created_at}")
|
||||
print(f" event_timestamp: {event_timestamp}")
|
||||
print(f" Consolidation key: ({unit_name}, {tool_name_id}, {event_date}, {event_time})")
|
||||
|
||||
# Update global migration_state with this key
|
||||
state_mgr = StateManager(pg_conn, table_name, partition_name="_global")
|
||||
|
||||
last_key = {
|
||||
"unit_name": unit_name,
|
||||
"tool_name_id": tool_name_id,
|
||||
"event_date": event_date,
|
||||
"event_time": event_time
|
||||
}
|
||||
|
||||
state_mgr.update_state(last_key=last_key)
|
||||
print(f" ✓ Updated migration_state with this key")
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Syncing migration_state with actual PostgreSQL data...")
|
||||
print("="*80)
|
||||
|
||||
sync_table_state("elabdatadisp")
|
||||
sync_table_state("rawdatacor")
|
||||
|
||||
print("\n" + "="*80)
|
||||
print("✓ Done! Incremental migration will now start from the correct position.")
|
||||
@@ -294,6 +294,7 @@ class MySQLConnector:
|
||||
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
|
||||
LIMIT %s
|
||||
"""
|
||||
logger.info(f"Executing first query on partition {partition} (fetching up to {limit} rows, sorted by consolidation key)...")
|
||||
cursor.execute(rows_query, (limit,))
|
||||
else:
|
||||
# Resume AFTER last completely yielded key
|
||||
@@ -304,9 +305,11 @@ class MySQLConnector:
|
||||
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
|
||||
LIMIT %s
|
||||
"""
|
||||
logger.debug(f"Executing query on partition {partition} (resuming from key {last_completed_key}, limit {limit})...")
|
||||
cursor.execute(rows_query, (last_completed_key[0], last_completed_key[1], last_completed_key[2], last_completed_key[3], limit))
|
||||
|
||||
rows = cursor.fetchall()
|
||||
logger.debug(f"Fetched {len(rows)} rows from partition {partition}")
|
||||
|
||||
if not rows:
|
||||
# No more rows - yield any buffered group and finish
|
||||
@@ -692,6 +695,92 @@ class MySQLConnector:
|
||||
)
|
||||
self._reconnect()
|
||||
|
||||
def fetch_consolidation_keys_from_partition_after(
|
||||
self,
|
||||
table: str,
|
||||
partition: str,
|
||||
after_key: Optional[Dict[str, Any]] = None,
|
||||
limit: Optional[int] = None
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Fetch distinct consolidation keys from a specific partition after a specific key.
|
||||
|
||||
Optimized version for incremental migration that queries only one partition.
|
||||
|
||||
Query pattern:
|
||||
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||
FROM table PARTITION (partition_name)
|
||||
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
|
||||
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||
LIMIT X
|
||||
|
||||
Args:
|
||||
table: Table name (RAWDATACOR or ELABDATADISP)
|
||||
partition: Partition name (e.g., 'part8', 'd9')
|
||||
after_key: Start after this key (dict with unit_name, tool_name_id, event_date, event_time)
|
||||
limit: Number of keys to fetch (uses CONSOLIDATION_GROUP_LIMIT if None)
|
||||
|
||||
Returns:
|
||||
List of dicts with keys: UnitName, ToolNameID, EventDate, EventTime
|
||||
"""
|
||||
if table not in ("RAWDATACOR", "ELABDATADISP"):
|
||||
raise ValueError(f"Consolidation not supported for table {table}")
|
||||
|
||||
if limit is None:
|
||||
limit = self.settings.migration.consolidation_group_limit
|
||||
|
||||
retries = 0
|
||||
while retries < self.MAX_RETRIES:
|
||||
try:
|
||||
with self.connection.cursor() as cursor:
|
||||
if after_key:
|
||||
# Fetch keys AFTER the last migrated key from this specific partition
|
||||
query = f"""
|
||||
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||
FROM `{table}` PARTITION (`{partition}`)
|
||||
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s)
|
||||
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||
LIMIT %s
|
||||
"""
|
||||
cursor.execute(
|
||||
query,
|
||||
(
|
||||
after_key.get("unit_name"),
|
||||
after_key.get("tool_name_id"),
|
||||
after_key.get("event_date"),
|
||||
after_key.get("event_time"),
|
||||
limit
|
||||
)
|
||||
)
|
||||
else:
|
||||
# No after_key: fetch from beginning of partition
|
||||
query = f"""
|
||||
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||
FROM `{table}` PARTITION (`{partition}`)
|
||||
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||
LIMIT %s
|
||||
"""
|
||||
cursor.execute(query, (limit,))
|
||||
|
||||
keys = cursor.fetchall()
|
||||
return keys
|
||||
|
||||
except pymysql.Error as e:
|
||||
retries += 1
|
||||
if retries >= self.MAX_RETRIES:
|
||||
logger.error(
|
||||
f"Failed to fetch consolidation keys from {table} PARTITION ({partition}) "
|
||||
f"(after_key={after_key}) after {self.MAX_RETRIES} retries: {e}"
|
||||
)
|
||||
raise
|
||||
else:
|
||||
logger.warning(
|
||||
f"Fetch consolidation keys from partition failed (retry {retries}/{self.MAX_RETRIES}): {e}"
|
||||
)
|
||||
self._reconnect()
|
||||
|
||||
def fetch_records_for_key_all_partitions(
|
||||
self,
|
||||
table: str,
|
||||
|
||||
@@ -11,6 +11,7 @@ from src.migrator.consolidator import consolidate_rows
|
||||
from src.migrator.state_manager import StateManager
|
||||
from src.utils.logger import get_logger
|
||||
from src.utils.progress import ProgressTracker
|
||||
from src.utils.validation import validate_consolidation_key, ErrorLogger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -60,6 +61,9 @@ class IncrementalMigrator:
|
||||
# Initialize state manager
|
||||
state_mgr = StateManager(pg_conn, pg_table)
|
||||
|
||||
# Initialize error logger
|
||||
error_logger = ErrorLogger(pg_table, "incremental")
|
||||
|
||||
# Get last migrated key from migration_state
|
||||
# This was saved during the last full/incremental migration
|
||||
last_key = state_mgr.get_last_key()
|
||||
@@ -77,52 +81,63 @@ class IncrementalMigrator:
|
||||
f"{last_key.get('event_date')}, {last_key.get('event_time')})"
|
||||
)
|
||||
|
||||
# Get max MySQL ID already migrated to optimize query performance
|
||||
cursor = pg_conn.connection.cursor()
|
||||
cursor.execute(f"SELECT MAX(mysql_max_id) FROM {pg_table}")
|
||||
result = cursor.fetchone()
|
||||
max_mysql_id = result[0] if result and result[0] else 0
|
||||
# Determine which partitions to process based on last_key's event_date
|
||||
# This is a MAJOR optimization: instead of querying the entire table,
|
||||
# we only process partitions from the last migrated year onwards
|
||||
from config import get_partitions_from_year, date_string_to_partition_name
|
||||
|
||||
logger.info(f"Max MySQL ID already migrated: {max_mysql_id}")
|
||||
last_event_date = last_key.get('event_date')
|
||||
if not last_event_date:
|
||||
logger.warning("Last key has no event_date, starting from 2014")
|
||||
partitions_to_process = mysql_conn.get_table_partitions(mysql_table)
|
||||
else:
|
||||
# Extract year from last_event_date and get partitions from that year onwards
|
||||
year = int(str(last_event_date)[:4]) if len(str(last_event_date)) >= 4 else 2014
|
||||
partitions_to_process = get_partitions_from_year(year, mysql_table)
|
||||
|
||||
logger.info(
|
||||
f"Optimized incremental sync: Processing only {len(partitions_to_process)} "
|
||||
f"partitions from year {year} onwards: {partitions_to_process}"
|
||||
)
|
||||
logger.info(
|
||||
f"Skipping partitions before year {year} (no new data possible there)"
|
||||
)
|
||||
|
||||
if dry_run:
|
||||
# In dry-run, check how many new keys exist in MySQL
|
||||
logger.info("[DRY RUN] Checking for new keys in MySQL...")
|
||||
logger.info("[DRY RUN] Checking for new keys across relevant partitions...")
|
||||
|
||||
# Sample first 100 keys to check if there are new records
|
||||
sample_keys = mysql_conn.fetch_consolidation_keys_after(
|
||||
mysql_table,
|
||||
after_key=last_key,
|
||||
min_mysql_id=max_mysql_id,
|
||||
limit=100,
|
||||
offset=0
|
||||
)
|
||||
total_new_keys = 0
|
||||
first_keys_found = []
|
||||
|
||||
if sample_keys:
|
||||
# If we found 100 keys in the sample, there might be many more
|
||||
# Try to get a rough count by checking larger offsets
|
||||
if len(sample_keys) == 100:
|
||||
# There are at least 100 keys, check if there are more
|
||||
logger.info(
|
||||
f"[DRY RUN] Found at least 100 new keys, checking total count..."
|
||||
)
|
||||
# Sample at different offsets to estimate total
|
||||
test_batch = mysql_conn.fetch_consolidation_keys_after(
|
||||
for partition in partitions_to_process:
|
||||
# For first partition (same year as last_key), use after_key
|
||||
# For subsequent partitions, start from beginning
|
||||
if partition == partitions_to_process[0]:
|
||||
sample_keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
|
||||
mysql_table,
|
||||
partition=partition,
|
||||
after_key=last_key,
|
||||
min_mysql_id=max_mysql_id,
|
||||
limit=1,
|
||||
offset=1000
|
||||
limit=100
|
||||
)
|
||||
if test_batch:
|
||||
logger.info(f"[DRY RUN] Estimated: More than 1000 new keys to migrate")
|
||||
else:
|
||||
logger.info(f"[DRY RUN] Estimated: Between 100-1000 new keys to migrate")
|
||||
logger.info(f"[DRY RUN] Partition {partition}: {len(sample_keys)} new keys (after last_key)")
|
||||
else:
|
||||
logger.info(f"[DRY RUN] Found {len(sample_keys)} new keys to migrate")
|
||||
sample_keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
|
||||
mysql_table,
|
||||
partition=partition,
|
||||
after_key=None, # All keys from this partition are new
|
||||
limit=100
|
||||
)
|
||||
logger.info(f"[DRY RUN] Partition {partition}: {len(sample_keys)} new keys (all new)")
|
||||
|
||||
total_new_keys += len(sample_keys)
|
||||
if sample_keys and len(first_keys_found) < 3:
|
||||
first_keys_found.extend(sample_keys[:3 - len(first_keys_found)])
|
||||
|
||||
if total_new_keys > 0:
|
||||
logger.info(f"[DRY RUN] Found at least {total_new_keys} new keys across {len(partitions_to_process)} partitions")
|
||||
logger.info("[DRY RUN] First 3 keys:")
|
||||
for i, key in enumerate(sample_keys[:3]):
|
||||
for i, key in enumerate(first_keys_found[:3]):
|
||||
logger.info(
|
||||
f" {i+1}. ({key.get('UnitName')}, {key.get('ToolNameID')}, "
|
||||
f"{key.get('EventDate')}, {key.get('EventTime')})"
|
||||
@@ -130,17 +145,16 @@ class IncrementalMigrator:
|
||||
logger.info(
|
||||
f"[DRY RUN] Run without --dry-run to perform actual migration"
|
||||
)
|
||||
# Return a positive number to indicate there's data to migrate
|
||||
return len(sample_keys)
|
||||
return total_new_keys
|
||||
else:
|
||||
logger.info("[DRY RUN] No new keys found - database is up to date")
|
||||
return 0
|
||||
|
||||
# Migrate new keys
|
||||
migrated_rows = 0
|
||||
offset = 0
|
||||
insert_buffer = []
|
||||
buffer_size = self.settings.migration.consolidation_group_limit // 10
|
||||
last_processed_key = None # Track last key for final state update
|
||||
|
||||
with ProgressTracker(
|
||||
total=None, # Unknown total
|
||||
@@ -150,99 +164,133 @@ class IncrementalMigrator:
|
||||
# Get column order for PostgreSQL insert
|
||||
pg_columns = self._get_pg_columns()
|
||||
|
||||
while True:
|
||||
# Fetch batch of consolidation keys AFTER last_key
|
||||
logger.debug(f"Fetching keys after last_key with offset={offset}")
|
||||
keys = mysql_conn.fetch_consolidation_keys_after(
|
||||
mysql_table,
|
||||
after_key=last_key,
|
||||
min_mysql_id=max_mysql_id,
|
||||
limit=self.settings.migration.consolidation_group_limit,
|
||||
offset=offset
|
||||
# Process each partition
|
||||
for partition_idx, partition in enumerate(partitions_to_process, 1):
|
||||
logger.info(
|
||||
f"[{partition_idx}/{len(partitions_to_process)}] "
|
||||
f"Processing partition {partition}..."
|
||||
)
|
||||
|
||||
if not keys:
|
||||
logger.info("No more new keys to migrate")
|
||||
break
|
||||
# For first partition (same year as last_key), fetch keys AFTER last_key
|
||||
# For subsequent partitions, fetch ALL keys (they're all new)
|
||||
use_after_key = last_key if partition == partitions_to_process[0] else None
|
||||
|
||||
logger.info(f"Processing {len(keys)} new keys (offset={offset})")
|
||||
|
||||
# Process each consolidation key
|
||||
keys_processed = 0
|
||||
for key in keys:
|
||||
keys_processed += 1
|
||||
# Log progress every 1000 keys
|
||||
if keys_processed % 1000 == 0:
|
||||
logger.info(f" Processed {keys_processed}/{len(keys)} keys in this batch...")
|
||||
unit_name = key.get("UnitName")
|
||||
tool_name_id = key.get("ToolNameID")
|
||||
event_date = key.get("EventDate")
|
||||
event_time = key.get("EventTime")
|
||||
|
||||
# Fetch all MySQL rows for this key (all nodes, all partitions)
|
||||
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
|
||||
while True:
|
||||
# Fetch batch of consolidation keys from this partition
|
||||
keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
|
||||
mysql_table,
|
||||
unit_name,
|
||||
tool_name_id,
|
||||
event_date,
|
||||
event_time
|
||||
partition=partition,
|
||||
after_key=use_after_key,
|
||||
limit=self.settings.migration.consolidation_group_limit
|
||||
)
|
||||
|
||||
if not mysql_rows:
|
||||
logger.warning(
|
||||
f"No records found for key: "
|
||||
f"({unit_name}, {tool_name_id}, {event_date}, {event_time})"
|
||||
if not keys:
|
||||
logger.info(f" No more keys in partition {partition}")
|
||||
break
|
||||
|
||||
logger.info(f" Processing {len(keys)} keys from {partition}...")
|
||||
|
||||
# Process each consolidation key
|
||||
keys_processed = 0
|
||||
for key in keys:
|
||||
keys_processed += 1
|
||||
# Log progress every 1000 keys
|
||||
if keys_processed % 1000 == 0:
|
||||
logger.info(f" Processed {keys_processed}/{len(keys)} keys in this batch...")
|
||||
unit_name = key.get("UnitName")
|
||||
tool_name_id = key.get("ToolNameID")
|
||||
event_date = key.get("EventDate")
|
||||
event_time = key.get("EventTime")
|
||||
|
||||
# Validate consolidation key before fetching
|
||||
is_valid, error_reason = validate_consolidation_key(
|
||||
unit_name, tool_name_id, event_date, event_time
|
||||
)
|
||||
continue
|
||||
|
||||
# Consolidate into single PostgreSQL row
|
||||
try:
|
||||
pg_row = consolidate_rows(self.table, mysql_rows)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to consolidate key "
|
||||
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}"
|
||||
)
|
||||
continue
|
||||
if not is_valid:
|
||||
# Log invalid key and skip
|
||||
error_logger.log_invalid_key(
|
||||
unit_name, tool_name_id, event_date, event_time, error_reason
|
||||
)
|
||||
continue
|
||||
|
||||
# Add to insert buffer
|
||||
insert_buffer.append(pg_row)
|
||||
# Fetch all MySQL rows for this key (all nodes, all partitions)
|
||||
try:
|
||||
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
|
||||
mysql_table,
|
||||
unit_name,
|
||||
tool_name_id,
|
||||
event_date,
|
||||
event_time
|
||||
)
|
||||
except Exception as e:
|
||||
# Log corrupted key that caused fetch error
|
||||
error_logger.log_invalid_key(
|
||||
unit_name, tool_name_id, event_date, event_time,
|
||||
f"Fetch failed: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Flush buffer when full
|
||||
if len(insert_buffer) >= buffer_size:
|
||||
# Use COPY with ON CONFLICT to handle duplicates
|
||||
inserted = pg_conn.copy_from_with_conflict(
|
||||
pg_table,
|
||||
insert_buffer,
|
||||
pg_columns,
|
||||
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
|
||||
)
|
||||
migrated_rows += inserted
|
||||
progress.update(inserted)
|
||||
if not mysql_rows:
|
||||
logger.warning(
|
||||
f"No records found for key: "
|
||||
f"({unit_name}, {tool_name_id}, {event_date}, {event_time})"
|
||||
)
|
||||
continue
|
||||
|
||||
# Update state with last key
|
||||
# Consolidate into single PostgreSQL row
|
||||
try:
|
||||
pg_row = consolidate_rows(self.table, mysql_rows)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to consolidate key "
|
||||
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Add to insert buffer
|
||||
insert_buffer.append(pg_row)
|
||||
|
||||
# Track last processed key
|
||||
last_processed_key = {
|
||||
"unit_name": unit_name,
|
||||
"tool_name_id": tool_name_id,
|
||||
"event_date": str(event_date) if event_date else None,
|
||||
"event_time": str(event_time) if event_time else None,
|
||||
}
|
||||
state_mgr.update_state(
|
||||
last_key=last_processed_key,
|
||||
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Flushed {inserted} rows, total new: {migrated_rows}"
|
||||
)
|
||||
insert_buffer = []
|
||||
# Flush buffer when full
|
||||
if len(insert_buffer) >= buffer_size:
|
||||
# Use COPY with ON CONFLICT to handle duplicates
|
||||
inserted = pg_conn.copy_from_with_conflict(
|
||||
pg_table,
|
||||
insert_buffer,
|
||||
pg_columns,
|
||||
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
|
||||
)
|
||||
migrated_rows += inserted
|
||||
progress.update(inserted)
|
||||
|
||||
# Move to next batch of keys
|
||||
offset += len(keys)
|
||||
# Update state with last key (from tracked variable)
|
||||
state_mgr.update_state(
|
||||
last_key=last_processed_key,
|
||||
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
||||
)
|
||||
|
||||
# If we got fewer keys than requested, we're done
|
||||
if len(keys) < self.settings.migration.consolidation_group_limit:
|
||||
break
|
||||
logger.debug(
|
||||
f"Flushed {inserted} rows, total new: {migrated_rows}"
|
||||
)
|
||||
insert_buffer = []
|
||||
|
||||
# After processing all keys in batch, update use_after_key for next iteration
|
||||
if keys:
|
||||
last_key_in_batch = keys[-1]
|
||||
use_after_key = {
|
||||
"unit_name": last_key_in_batch.get("UnitName"),
|
||||
"tool_name_id": last_key_in_batch.get("ToolNameID"),
|
||||
"event_date": str(last_key_in_batch.get("EventDate")) if last_key_in_batch.get("EventDate") else None,
|
||||
"event_time": str(last_key_in_batch.get("EventTime")) if last_key_in_batch.get("EventTime") else None,
|
||||
}
|
||||
|
||||
# Flush remaining buffer
|
||||
if insert_buffer:
|
||||
@@ -257,13 +305,24 @@ class IncrementalMigrator:
|
||||
progress.update(inserted)
|
||||
logger.debug(f"Final flush: {inserted} rows")
|
||||
|
||||
# Update state with last key after final flush
|
||||
if last_processed_key:
|
||||
state_mgr.update_state(
|
||||
last_key=last_processed_key,
|
||||
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
||||
)
|
||||
|
||||
# Get final row count
|
||||
final_count = pg_conn.get_row_count(pg_table)
|
||||
logger.info(f"Total PostgreSQL rows: {final_count}")
|
||||
|
||||
# Close error logger and get count
|
||||
error_logger.close()
|
||||
|
||||
logger.info(
|
||||
f"✓ Incremental migration complete: "
|
||||
f"{migrated_rows} new rows migrated to {pg_table}"
|
||||
f"{migrated_rows} new rows migrated to {pg_table}, "
|
||||
f"{error_logger.get_error_count()} invalid keys skipped"
|
||||
)
|
||||
|
||||
return migrated_rows
|
||||
|
||||
@@ -120,6 +120,9 @@ class PartitionMigrator:
|
||||
description=f"Streaming {mysql_table} partition {partition_name}"
|
||||
) as progress:
|
||||
|
||||
# Log before starting to fetch (this query can take several minutes for large partitions)
|
||||
logger.info(f"Fetching consolidation groups from MySQL partition {partition_name}...")
|
||||
|
||||
# Use fetch_consolidation_groups_from_partition with start_key for efficient resume
|
||||
# MySQL will skip all keys <= start_key using WHERE clause (no unnecessary data transfer)
|
||||
for group in mysql_conn.fetch_consolidation_groups_from_partition(
|
||||
|
||||
@@ -7,7 +7,7 @@ Tracks migration progress with:
|
||||
- status: pending, in_progress, completed
|
||||
"""
|
||||
from typing import Optional, Dict, Any
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
import json
|
||||
|
||||
from src.connectors.postgres_connector import PostgreSQLConnector
|
||||
@@ -199,7 +199,7 @@ class StateManager:
|
||||
|
||||
if mark_completed:
|
||||
updates.append("migration_completed_at = %s")
|
||||
params.append(datetime.utcnow())
|
||||
params.append(datetime.now(timezone.utc))
|
||||
if status is None:
|
||||
updates.append("status = 'completed'")
|
||||
|
||||
|
||||
@@ -183,8 +183,8 @@ CREATE TABLE IF NOT EXISTS migration_state (
|
||||
table_name VARCHAR(255) NOT NULL,
|
||||
partition_name VARCHAR(255) NOT NULL,
|
||||
last_key JSONB,
|
||||
migration_started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
migration_completed_at TIMESTAMP,
|
||||
migration_started_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
|
||||
migration_completed_at TIMESTAMPTZ,
|
||||
total_rows_migrated BIGINT DEFAULT 0,
|
||||
status VARCHAR(32) DEFAULT 'pending',
|
||||
PRIMARY KEY (table_name, partition_name),
|
||||
|
||||
@@ -10,21 +10,30 @@ logger = get_logger(__name__)
|
||||
class ErrorLogger:
|
||||
"""Log invalid migration keys to a file."""
|
||||
|
||||
def __init__(self, table: str, partition: str):
|
||||
def __init__(self, table: str, partition: str, use_timestamp: bool = False):
|
||||
"""Initialize error logger.
|
||||
|
||||
Args:
|
||||
table: Table name
|
||||
partition: Partition name
|
||||
partition: Partition name (e.g., 'p2024' or 'incremental')
|
||||
use_timestamp: If True, add timestamp to filename (for incremental migrations)
|
||||
"""
|
||||
self.table = table
|
||||
self.partition = partition
|
||||
self.error_file = f"migration_errors_{table}_{partition}.log"
|
||||
|
||||
# Add timestamp to filename for incremental migrations to avoid overwriting
|
||||
if use_timestamp or partition == "incremental":
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
self.error_file = f"migration_errors_{table}_{partition}_{timestamp}.log"
|
||||
else:
|
||||
self.error_file = f"migration_errors_{table}_{partition}.log"
|
||||
|
||||
self.error_count = 0
|
||||
|
||||
# Create error file with header
|
||||
with open(self.error_file, "w") as f:
|
||||
f.write(f"# Migration errors for {table} partition {partition}\n")
|
||||
f.write(f"# Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
f.write("# Format: UnitName|ToolNameID|EventDate|EventTime|Reason\n\n")
|
||||
|
||||
logger.info(f"Error log file created: {self.error_file}")
|
||||
@@ -99,6 +108,10 @@ def validate_consolidation_key(
|
||||
if unit_name is None or unit_name == "":
|
||||
return False, "UnitName is NULL or empty"
|
||||
|
||||
# Check for corrupted Java strings (like '[Ljava.lang.String;@...')
|
||||
if isinstance(unit_name, str) and unit_name.startswith("[L"):
|
||||
return False, f"UnitName is corrupted Java string: {unit_name}"
|
||||
|
||||
if tool_name_id is None or tool_name_id == "":
|
||||
return False, "ToolNameID is NULL or empty"
|
||||
|
||||
|
||||
593
web_ui.py
Normal file
593
web_ui.py
Normal file
@@ -0,0 +1,593 @@
|
||||
"""Web UI for MySQL to PostgreSQL migration monitoring and control.
|
||||
|
||||
Provides a Gradio-based interface for:
|
||||
- Viewing migration status and progress
|
||||
- Starting/monitoring migrations
|
||||
- Viewing logs and errors
|
||||
- Performance metrics and graphs
|
||||
"""
|
||||
import gradio as gr
|
||||
import pandas as pd
|
||||
import plotly.graph_objects as go
|
||||
import plotly.express as px
|
||||
from pathlib import Path
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, List, Tuple
|
||||
import logging
|
||||
|
||||
from config import get_settings
|
||||
from src.connectors.postgres_connector import PostgreSQLConnector
|
||||
from src.migrator.full_migrator import run_full_migration
|
||||
from src.migrator.parallel_migrator import run_parallel_migration
|
||||
from src.utils.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Global state for tracking running migrations
|
||||
running_migrations = {}
|
||||
migration_logs = {}
|
||||
|
||||
|
||||
def get_migration_state_df() -> pd.DataFrame:
|
||||
"""Fetch current migration state from PostgreSQL.
|
||||
|
||||
Returns:
|
||||
DataFrame with columns: table_name, partition_name, status,
|
||||
total_rows_migrated, migration_started_at, migration_completed_at
|
||||
"""
|
||||
try:
|
||||
with PostgreSQLConnector() as pg_conn:
|
||||
with pg_conn.connection.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT
|
||||
table_name,
|
||||
partition_name,
|
||||
status,
|
||||
total_rows_migrated,
|
||||
migration_started_at,
|
||||
migration_completed_at,
|
||||
last_key
|
||||
FROM migration_state
|
||||
WHERE partition_name != '_global'
|
||||
ORDER BY table_name, partition_name
|
||||
""")
|
||||
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
return pd.DataFrame(columns=[
|
||||
'Table', 'Partition', 'Status', 'Rows Migrated',
|
||||
'Started At', 'Completed At'
|
||||
])
|
||||
|
||||
df = pd.DataFrame(rows, columns=[
|
||||
'Table', 'Partition', 'Status', 'Rows Migrated',
|
||||
'Started At', 'Completed At', 'Last Key'
|
||||
])
|
||||
|
||||
# Format dates
|
||||
df['Started At'] = pd.to_datetime(df['Started At']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||
df['Completed At'] = pd.to_datetime(df['Completed At']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||
df['Completed At'] = df['Completed At'].fillna('-')
|
||||
|
||||
# Drop Last Key column for display (too verbose)
|
||||
df = df.drop('Last Key', axis=1)
|
||||
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch migration state: {e}")
|
||||
return pd.DataFrame(columns=[
|
||||
'Table', 'Partition', 'Status', 'Rows Migrated',
|
||||
'Started At', 'Completed At'
|
||||
])
|
||||
|
||||
|
||||
def get_migration_summary() -> Dict[str, any]:
|
||||
"""Get summary statistics for migrations.
|
||||
|
||||
Returns:
|
||||
Dict with total/completed/in_progress/pending counts per table
|
||||
"""
|
||||
try:
|
||||
with PostgreSQLConnector() as pg_conn:
|
||||
with pg_conn.connection.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT
|
||||
table_name,
|
||||
status,
|
||||
COUNT(*) as count,
|
||||
SUM(total_rows_migrated) as total_rows
|
||||
FROM migration_state
|
||||
WHERE partition_name != '_global'
|
||||
GROUP BY table_name, status
|
||||
ORDER BY table_name, status
|
||||
""")
|
||||
|
||||
rows = cursor.fetchall()
|
||||
|
||||
summary = {
|
||||
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
|
||||
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
|
||||
}
|
||||
|
||||
for table, status, count, total_rows in rows:
|
||||
table_upper = table.upper()
|
||||
if table_upper in summary:
|
||||
summary[table_upper][status] = count
|
||||
summary[table_upper]['total_rows'] += total_rows or 0
|
||||
|
||||
return summary
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch migration summary: {e}")
|
||||
return {
|
||||
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
|
||||
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
|
||||
}
|
||||
|
||||
|
||||
def create_status_chart() -> go.Figure:
|
||||
"""Create a bar chart showing migration status by table.
|
||||
|
||||
Returns:
|
||||
Plotly figure
|
||||
"""
|
||||
summary = get_migration_summary()
|
||||
|
||||
tables = []
|
||||
completed = []
|
||||
in_progress = []
|
||||
pending = []
|
||||
|
||||
for table, stats in summary.items():
|
||||
tables.append(table)
|
||||
completed.append(stats['completed'])
|
||||
in_progress.append(stats['in_progress'])
|
||||
pending.append(stats['pending'])
|
||||
|
||||
fig = go.Figure(data=[
|
||||
go.Bar(name='Completed', x=tables, y=completed, marker_color='green'),
|
||||
go.Bar(name='In Progress', x=tables, y=in_progress, marker_color='orange'),
|
||||
go.Bar(name='Pending', x=tables, y=pending, marker_color='gray')
|
||||
])
|
||||
|
||||
fig.update_layout(
|
||||
barmode='stack',
|
||||
title='Migration Status by Table',
|
||||
xaxis_title='Table',
|
||||
yaxis_title='Number of Partitions',
|
||||
height=400
|
||||
)
|
||||
|
||||
return fig
|
||||
|
||||
|
||||
def get_error_logs() -> List[Tuple[str, str]]:
|
||||
"""Get list of error log files with their paths.
|
||||
|
||||
Returns:
|
||||
List of (filename, filepath) tuples
|
||||
"""
|
||||
error_logs = []
|
||||
for log_file in Path('.').glob('migration_errors_*.log'):
|
||||
error_logs.append((log_file.name, str(log_file)))
|
||||
|
||||
return sorted(error_logs, key=lambda x: x[0], reverse=True)
|
||||
|
||||
|
||||
def read_error_log(log_file: str) -> str:
|
||||
"""Read contents of an error log file.
|
||||
|
||||
Args:
|
||||
log_file: Path to log file
|
||||
|
||||
Returns:
|
||||
Contents of log file
|
||||
"""
|
||||
try:
|
||||
if not log_file or log_file == "Select a log file...":
|
||||
return "No log file selected"
|
||||
|
||||
with open(log_file, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
if not content:
|
||||
return f"Log file {log_file} is empty (no errors logged)"
|
||||
|
||||
return content
|
||||
|
||||
except Exception as e:
|
||||
return f"Error reading log file: {e}"
|
||||
|
||||
|
||||
def start_migration_task(
|
||||
table: str,
|
||||
parallel_workers: int,
|
||||
resume: bool,
|
||||
dry_run: bool,
|
||||
partition: Optional[str] = None
|
||||
) -> str:
|
||||
"""Start a migration in a background thread.
|
||||
|
||||
Args:
|
||||
table: Table name (RAWDATACOR, ELABDATADISP, or all)
|
||||
parallel_workers: Number of parallel workers (0 = sequential)
|
||||
resume: Whether to resume from last checkpoint
|
||||
dry_run: Whether to run in dry-run mode
|
||||
partition: Optional partition name for single partition migration
|
||||
|
||||
Returns:
|
||||
Status message
|
||||
"""
|
||||
task_id = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
if task_id in running_migrations:
|
||||
return f"❌ Migration already running for {table}"
|
||||
|
||||
# Validate options
|
||||
if parallel_workers > 0 and partition:
|
||||
return "❌ Cannot use parallel workers with single partition mode"
|
||||
|
||||
# Initialize log buffer for this task
|
||||
migration_logs[task_id] = []
|
||||
|
||||
def run_migration():
|
||||
"""Worker function to run migration."""
|
||||
try:
|
||||
migration_logs[task_id].append(f"Starting migration for {table}...")
|
||||
migration_logs[task_id].append(f"Mode: {'Parallel' if parallel_workers > 0 else 'Sequential'}")
|
||||
migration_logs[task_id].append(f"Resume: {resume}, Dry-run: {dry_run}")
|
||||
|
||||
if parallel_workers > 0:
|
||||
# Parallel migration
|
||||
rows = run_parallel_migration(
|
||||
table,
|
||||
num_workers=parallel_workers,
|
||||
dry_run=dry_run,
|
||||
resume=resume
|
||||
)
|
||||
else:
|
||||
# Sequential migration
|
||||
rows = run_full_migration(
|
||||
table,
|
||||
dry_run=dry_run,
|
||||
resume=resume,
|
||||
partition=partition
|
||||
)
|
||||
|
||||
migration_logs[task_id].append(f"✓ Migration complete: {rows:,} rows migrated")
|
||||
running_migrations[task_id] = 'completed'
|
||||
|
||||
except Exception as e:
|
||||
migration_logs[task_id].append(f"❌ Migration failed: {e}")
|
||||
running_migrations[task_id] = 'failed'
|
||||
logger.error(f"Migration failed: {e}")
|
||||
|
||||
# Start migration in background thread
|
||||
thread = threading.Thread(target=run_migration, daemon=True)
|
||||
thread.start()
|
||||
|
||||
running_migrations[task_id] = 'running'
|
||||
|
||||
return f"✓ Migration started: {task_id}\nCheck the 'Logs' tab for progress"
|
||||
|
||||
|
||||
def start_incremental_migration_task(
|
||||
table: str,
|
||||
dry_run: bool
|
||||
) -> str:
|
||||
"""Start an incremental migration in a background thread.
|
||||
|
||||
Args:
|
||||
table: Table name (RAWDATACOR, ELABDATADISP, or all)
|
||||
dry_run: Whether to run in dry-run mode
|
||||
|
||||
Returns:
|
||||
Status message
|
||||
"""
|
||||
task_id = f"incremental_{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
if task_id in running_migrations:
|
||||
return f"❌ Incremental migration already running for {table}"
|
||||
|
||||
# Initialize log buffer for this task
|
||||
migration_logs[task_id] = []
|
||||
|
||||
def run_migration():
|
||||
"""Worker function to run incremental migration."""
|
||||
try:
|
||||
migration_logs[task_id].append(f"Starting INCREMENTAL migration for {table}...")
|
||||
migration_logs[task_id].append(f"Dry-run: {dry_run}")
|
||||
migration_logs[task_id].append("This will sync only NEW data added since last migration")
|
||||
|
||||
from src.migrator.incremental_migrator import run_incremental_migration
|
||||
|
||||
rows = run_incremental_migration(
|
||||
table,
|
||||
dry_run=dry_run
|
||||
)
|
||||
|
||||
migration_logs[task_id].append(f"✓ Incremental migration complete: {rows:,} rows migrated")
|
||||
running_migrations[task_id] = 'completed'
|
||||
|
||||
except Exception as e:
|
||||
migration_logs[task_id].append(f"❌ Incremental migration failed: {e}")
|
||||
running_migrations[task_id] = 'failed'
|
||||
logger.error(f"Incremental migration failed: {e}")
|
||||
|
||||
# Start migration in background thread
|
||||
thread = threading.Thread(target=run_migration, daemon=True)
|
||||
thread.start()
|
||||
|
||||
running_migrations[task_id] = 'running'
|
||||
|
||||
return f"✓ Incremental migration started: {task_id}\nCheck the 'Logs' tab for progress"
|
||||
|
||||
|
||||
def get_migration_logs(task_id: str) -> str:
|
||||
"""Get logs for a specific migration task.
|
||||
|
||||
Args:
|
||||
task_id: Task identifier
|
||||
|
||||
Returns:
|
||||
Log contents
|
||||
"""
|
||||
if not task_id or task_id == "No migrations running":
|
||||
return "Select a migration to view logs"
|
||||
|
||||
logs = migration_logs.get(task_id, [])
|
||||
if not logs:
|
||||
return f"No logs available for {task_id}"
|
||||
|
||||
return "\n".join(logs)
|
||||
|
||||
|
||||
def refresh_migration_state():
|
||||
"""Refresh migration state display."""
|
||||
return get_migration_state_df()
|
||||
|
||||
|
||||
def refresh_status_chart():
|
||||
"""Refresh status chart."""
|
||||
return create_status_chart()
|
||||
|
||||
|
||||
def refresh_running_migrations() -> gr.Dropdown:
|
||||
"""Refresh list of running migrations."""
|
||||
if not running_migrations:
|
||||
return gr.Dropdown(choices=["No migrations running"], value="No migrations running")
|
||||
|
||||
choices = list(running_migrations.keys())
|
||||
return gr.Dropdown(choices=choices, value=choices[0] if choices else None)
|
||||
|
||||
|
||||
# Build Gradio interface
|
||||
with gr.Blocks(title="MySQL to PostgreSQL Migration", theme=gr.themes.Soft()) as demo:
|
||||
gr.Markdown("# 🔄 MySQL to PostgreSQL Migration Dashboard")
|
||||
gr.Markdown("Monitor and control data migration from MySQL to PostgreSQL")
|
||||
|
||||
with gr.Tabs():
|
||||
# Tab 1: Overview
|
||||
with gr.Tab("📊 Overview"):
|
||||
gr.Markdown("## Migration Status Overview")
|
||||
|
||||
with gr.Row():
|
||||
refresh_btn = gr.Button("🔄 Refresh", scale=1)
|
||||
|
||||
with gr.Row():
|
||||
status_chart = gr.Plot(label="Partition Status by Table")
|
||||
|
||||
with gr.Row():
|
||||
state_table = gr.Dataframe(
|
||||
value=get_migration_state_df(),
|
||||
label="Migration State (All Partitions)",
|
||||
interactive=False,
|
||||
wrap=True
|
||||
)
|
||||
|
||||
# Auto-refresh every 10 seconds
|
||||
refresh_btn.click(
|
||||
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
|
||||
outputs=[state_table, status_chart]
|
||||
)
|
||||
|
||||
# Initial load
|
||||
demo.load(
|
||||
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
|
||||
outputs=[state_table, status_chart]
|
||||
)
|
||||
|
||||
# Tab 2: Start Migration
|
||||
with gr.Tab("▶️ Start Migration"):
|
||||
gr.Markdown("## Start New Migration")
|
||||
|
||||
with gr.Row():
|
||||
table_select = gr.Dropdown(
|
||||
choices=["RAWDATACOR", "ELABDATADISP", "all"],
|
||||
value="RAWDATACOR",
|
||||
label="Table to Migrate"
|
||||
)
|
||||
|
||||
partition_input = gr.Textbox(
|
||||
label="Partition (optional)",
|
||||
placeholder="Leave empty for all partitions, or enter e.g. 'part7', 'd9'",
|
||||
value=""
|
||||
)
|
||||
|
||||
with gr.Row():
|
||||
parallel_slider = gr.Slider(
|
||||
minimum=0,
|
||||
maximum=10,
|
||||
value=0,
|
||||
step=1,
|
||||
label="Parallel Workers (0 = sequential)"
|
||||
)
|
||||
|
||||
with gr.Row():
|
||||
resume_check = gr.Checkbox(label="Resume from last checkpoint", value=True)
|
||||
dry_run_check = gr.Checkbox(label="Dry run (simulate without writing)", value=False)
|
||||
|
||||
start_btn = gr.Button("▶️ Start Migration", variant="primary", size="lg")
|
||||
|
||||
migration_output = gr.Textbox(
|
||||
label="Migration Status",
|
||||
lines=3,
|
||||
interactive=False
|
||||
)
|
||||
|
||||
start_btn.click(
|
||||
fn=start_migration_task,
|
||||
inputs=[table_select, parallel_slider, resume_check, dry_run_check, partition_input],
|
||||
outputs=migration_output
|
||||
)
|
||||
|
||||
# Tab 3: Incremental Migration
|
||||
with gr.Tab("🔄 Incremental Sync"):
|
||||
gr.Markdown("## Incremental Migration (Sync New Data)")
|
||||
gr.Markdown("""
|
||||
Questa modalità sincronizza **solo i dati nuovi** aggiunti dopo l'ultima migrazione full.
|
||||
|
||||
**Come funziona:**
|
||||
- Trova le nuove consolidation keys in MySQL che non esistono ancora in PostgreSQL
|
||||
- Migra solo quelle chiavi (non riprocessa dati già migrati)
|
||||
- Usa `migration_state` per tracciare l'ultima chiave processata
|
||||
|
||||
**Quando usare:**
|
||||
- Dopo aver completato una migrazione full di tutte le partizioni
|
||||
- Per sincronizzare periodicamente nuovi dati senza rifare tutto
|
||||
- Per aggiornamenti quotidiani/settimanali
|
||||
""")
|
||||
|
||||
with gr.Row():
|
||||
inc_table_select = gr.Dropdown(
|
||||
choices=["RAWDATACOR", "ELABDATADISP", "all"],
|
||||
value="RAWDATACOR",
|
||||
label="Table to Sync"
|
||||
)
|
||||
|
||||
with gr.Row():
|
||||
inc_dry_run_check = gr.Checkbox(
|
||||
label="Dry run (simulate without writing)",
|
||||
value=False
|
||||
)
|
||||
|
||||
inc_start_btn = gr.Button("🔄 Start Incremental Sync", variant="primary", size="lg")
|
||||
|
||||
inc_migration_output = gr.Textbox(
|
||||
label="Sync Status",
|
||||
lines=3,
|
||||
interactive=False
|
||||
)
|
||||
|
||||
inc_start_btn.click(
|
||||
fn=start_incremental_migration_task,
|
||||
inputs=[inc_table_select, inc_dry_run_check],
|
||||
outputs=inc_migration_output
|
||||
)
|
||||
|
||||
# Tab 4: Logs
|
||||
with gr.Tab("📝 Logs"):
|
||||
gr.Markdown("## Migration Logs")
|
||||
|
||||
with gr.Row():
|
||||
running_migrations_dropdown = gr.Dropdown(
|
||||
choices=["No migrations running"],
|
||||
value="No migrations running",
|
||||
label="Select Migration",
|
||||
interactive=True
|
||||
)
|
||||
refresh_migrations_btn = gr.Button("🔄 Refresh List")
|
||||
|
||||
log_output = gr.Textbox(
|
||||
label="Log Output",
|
||||
lines=20,
|
||||
interactive=False,
|
||||
max_lines=50
|
||||
)
|
||||
|
||||
refresh_migrations_btn.click(
|
||||
fn=refresh_running_migrations,
|
||||
outputs=running_migrations_dropdown
|
||||
)
|
||||
|
||||
running_migrations_dropdown.change(
|
||||
fn=get_migration_logs,
|
||||
inputs=running_migrations_dropdown,
|
||||
outputs=log_output
|
||||
)
|
||||
|
||||
# Tab 5: Error Logs
|
||||
with gr.Tab("⚠️ Error Logs"):
|
||||
gr.Markdown("## Error Log Viewer")
|
||||
gr.Markdown("View logged validation errors (invalid consolidation keys)")
|
||||
|
||||
with gr.Row():
|
||||
error_log_dropdown = gr.Dropdown(
|
||||
choices=[x[0] for x in get_error_logs()] if get_error_logs() else ["No error logs found"],
|
||||
label="Select Error Log File",
|
||||
value="Select a log file..."
|
||||
)
|
||||
refresh_error_logs_btn = gr.Button("🔄 Refresh")
|
||||
|
||||
error_log_content = gr.Textbox(
|
||||
label="Error Log Contents",
|
||||
lines=20,
|
||||
interactive=False,
|
||||
max_lines=50
|
||||
)
|
||||
|
||||
def refresh_error_log_list():
|
||||
logs = get_error_logs()
|
||||
choices = [x[0] for x in logs] if logs else ["No error logs found"]
|
||||
return gr.Dropdown(choices=choices)
|
||||
|
||||
def show_error_log(filename):
|
||||
if not filename or filename == "Select a log file..." or filename == "No error logs found":
|
||||
return "Select a log file to view its contents"
|
||||
return read_error_log(filename)
|
||||
|
||||
refresh_error_logs_btn.click(
|
||||
fn=refresh_error_log_list,
|
||||
outputs=error_log_dropdown
|
||||
)
|
||||
|
||||
error_log_dropdown.change(
|
||||
fn=show_error_log,
|
||||
inputs=error_log_dropdown,
|
||||
outputs=error_log_content
|
||||
)
|
||||
|
||||
|
||||
def launch_ui(share=False, server_port=7860):
|
||||
"""Launch the Gradio UI.
|
||||
|
||||
Args:
|
||||
share: Whether to create a public share link
|
||||
server_port: Port to run the server on
|
||||
"""
|
||||
demo.launch(
|
||||
share=share,
|
||||
server_port=server_port,
|
||||
server_name="0.0.0.0" # Listen on all interfaces
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
# Parse command line args
|
||||
share = "--share" in sys.argv
|
||||
port = 7860
|
||||
|
||||
for arg in sys.argv:
|
||||
if arg.startswith("--port="):
|
||||
port = int(arg.split("=")[1])
|
||||
|
||||
print(f"Starting Migration Dashboard on port {port}...")
|
||||
print(f"Share mode: {share}")
|
||||
|
||||
launch_ui(share=share, server_port=port)
|
||||
Reference in New Issue
Block a user