Compare commits
10 Commits
5c9df3d06f
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 3a53564bb5 | |||
| 6306006f82 | |||
| 53cde5f667 | |||
| d1dbf7f0de | |||
| 931fec0959 | |||
| a7d2d501fb | |||
| 23e9fc9d82 | |||
| 03e39eb925 | |||
| bcedae40fc | |||
| 5f6e3215a5 |
@@ -13,10 +13,10 @@ POSTGRES_PASSWORD=your_postgres_password
|
|||||||
POSTGRES_DATABASE=migrated_db
|
POSTGRES_DATABASE=migrated_db
|
||||||
|
|
||||||
# Migration Settings
|
# Migration Settings
|
||||||
BATCH_SIZE=10000
|
|
||||||
LOG_LEVEL=INFO
|
LOG_LEVEL=INFO
|
||||||
DRY_RUN=false
|
DRY_RUN=false
|
||||||
CONSOLIDATION_GROUP_LIMIT=50000
|
CONSOLIDATION_GROUP_LIMIT=40000
|
||||||
|
PROGRESS_LOG_INTERVAL=10000
|
||||||
|
|
||||||
# Performance Testing
|
# Performance Testing
|
||||||
BENCHMARK_OUTPUT_DIR=benchmark_results
|
BENCHMARK_OUTPUT_DIR=benchmark_results
|
||||||
|
|||||||
121
CHANGELOG.md
Normal file
121
CHANGELOG.md
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
# Changelog
|
||||||
|
|
||||||
|
## [Current] - 2025-12-30
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- **Consolidation-based incremental migration**: Uses consolidation keys `(UnitName, ToolNameID, EventDate, EventTime)` instead of timestamps
|
||||||
|
- **MySQL ID optimization**: Uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries, avoiding full table scans
|
||||||
|
- **State management in PostgreSQL**: Replaced JSON file with `migration_state` table for more reliable tracking
|
||||||
|
- **Sync utility**: Added `scripts/sync_migration_state.py` to sync state with actual data
|
||||||
|
- **Performance optimization**: MySQL queries now instant using PRIMARY KEY filter
|
||||||
|
- **Data quality validation**: Automatically validates and logs invalid consolidation keys to dedicated error files
|
||||||
|
- **Error logging**: Invalid keys (null dates, empty tool IDs, corrupted Java strings) are logged and skipped during migration
|
||||||
|
- **Better documentation**: Consolidated and updated all documentation files
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- **Incremental migration**: Now uses consolidation keys instead of timestamp-based approach
|
||||||
|
- **Full migration**: Improved to save global `last_key` after completing all partitions
|
||||||
|
- **State tracking**: Moved from `migration_state.json` to PostgreSQL table `migration_state`
|
||||||
|
- **Query performance**: Added `min_mysql_id` parameter to `fetch_consolidation_keys_after()` for optimization
|
||||||
|
- **Configuration**: Renamed `BATCH_SIZE` to `CONSOLIDATION_GROUP_LIMIT` to better reflect what it controls
|
||||||
|
- **Configuration**: Added `PROGRESS_LOG_INTERVAL` to control logging frequency
|
||||||
|
- **Configuration**: Added `BENCHMARK_OUTPUT_DIR` to specify benchmark results directory
|
||||||
|
- **Documentation**: Updated README.md, MIGRATION_WORKFLOW.md, QUICKSTART.md, EXAMPLE_WORKFLOW.md with current implementation
|
||||||
|
- **Documentation**: Corrected index and partitioning documentation to reflect actual PostgreSQL schema:
|
||||||
|
- Uses `event_timestamp` (not separate event_date/event_time)
|
||||||
|
- Primary key includes `event_year` for partitioning
|
||||||
|
- Consolidation key is UNIQUE (unit_name, tool_name_id, event_timestamp, event_year)
|
||||||
|
|
||||||
|
### Removed
|
||||||
|
- **migration_state.json**: Replaced by PostgreSQL table
|
||||||
|
- **Timestamp-based migration**: Replaced by consolidation key-based approach
|
||||||
|
- **ID-based resumable migration**: Consolidated into single consolidation-based approach
|
||||||
|
- **Temporary debug scripts**: Cleaned up all `/tmp/` debug files
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- **Incremental migration performance**: MySQL queries now ~1000x faster with ID filter
|
||||||
|
- **State synchronization**: Can now sync `migration_state` with actual data using utility script
|
||||||
|
- **Duplicate handling**: Uses `ON CONFLICT DO NOTHING` to prevent duplicates
|
||||||
|
- **Last key tracking**: Properly updates global state after full migration
|
||||||
|
- **Corrupted data handling**: Both full and incremental migrations now validate keys and log errors instead of crashing
|
||||||
|
|
||||||
|
### Error Logging
|
||||||
|
|
||||||
|
Both full and incremental migrations now handle corrupted consolidation keys gracefully:
|
||||||
|
|
||||||
|
**Error files:**
|
||||||
|
- Full migration: `migration_errors_<table>_<partition>.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
|
||||||
|
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
|
||||||
|
|
||||||
|
Each incremental migration creates a new timestamped file to preserve error history across runs.
|
||||||
|
|
||||||
|
**File format:**
|
||||||
|
```
|
||||||
|
# Migration errors for <table> partition <partition>
|
||||||
|
# Format: UnitName|ToolNameID|EventDate|EventTime|Reason
|
||||||
|
|
||||||
|
ID0350||0000-00-00|0:00:00|EventDate is invalid: 0000-00-00
|
||||||
|
[Ljava.lang.String;@abc123|TOOL1|2024-01-01|10:00:00|UnitName is corrupted Java string: [Ljava.lang.String;@abc123
|
||||||
|
UNIT1||2024-01-01|10:00:00|ToolNameID is NULL or empty
|
||||||
|
```
|
||||||
|
|
||||||
|
**Behavior:**
|
||||||
|
- Invalid keys are automatically skipped to prevent migration failure
|
||||||
|
- Each skipped key is logged with the reason for rejection
|
||||||
|
- Total count of skipped keys is reported at the end of migration
|
||||||
|
- Empty error files (no errors) are automatically deleted
|
||||||
|
|
||||||
|
### Migration Guide (from old to new)
|
||||||
|
|
||||||
|
If you have an existing installation with `migration_state.json`:
|
||||||
|
|
||||||
|
1. **Backup your data** (optional but recommended):
|
||||||
|
```bash
|
||||||
|
cp migration_state.json migration_state.json.backup
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Run full migration** to populate `migration_state` table:
|
||||||
|
```bash
|
||||||
|
python main.py migrate full
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Sync state** (if you have existing data):
|
||||||
|
```bash
|
||||||
|
python scripts/sync_migration_state.py
|
||||||
|
```
|
||||||
|
|
||||||
|
4. **Remove old state file**:
|
||||||
|
```bash
|
||||||
|
rm migration_state.json
|
||||||
|
```
|
||||||
|
|
||||||
|
5. **Run incremental migration**:
|
||||||
|
```bash
|
||||||
|
python main.py migrate incremental --dry-run
|
||||||
|
python main.py migrate incremental
|
||||||
|
```
|
||||||
|
|
||||||
|
### Performance Improvements
|
||||||
|
|
||||||
|
- **MySQL query time**: From 60+ seconds to <0.1 seconds (600x faster)
|
||||||
|
- **Consolidation efficiency**: Multiple MySQL rows → single PostgreSQL record
|
||||||
|
- **State reliability**: PostgreSQL table instead of JSON file
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
|
||||||
|
- `--state-file` parameter removed from incremental migration (no longer uses JSON)
|
||||||
|
- `--use-id` flag removed (consolidation-based approach is now default)
|
||||||
|
- Incremental migration requires full migration to be run first
|
||||||
|
- `BATCH_SIZE` environment variable renamed to `CONSOLIDATION_GROUP_LIMIT` (update your .env file)
|
||||||
|
|
||||||
|
## [Previous] - Before 2025-12-30
|
||||||
|
|
||||||
|
### Features
|
||||||
|
- Full migration support
|
||||||
|
- Incremental migration with timestamp tracking
|
||||||
|
- JSONB transformation
|
||||||
|
- Partitioning by year
|
||||||
|
- GIN indexes for JSONB queries
|
||||||
|
- Benchmark system
|
||||||
|
- Progress tracking
|
||||||
|
- Rich logging
|
||||||
@@ -74,8 +74,9 @@ nano .env
|
|||||||
# POSTGRES_PASSWORD=password123
|
# POSTGRES_PASSWORD=password123
|
||||||
# POSTGRES_DATABASE=production_migrated
|
# POSTGRES_DATABASE=production_migrated
|
||||||
#
|
#
|
||||||
# BATCH_SIZE=50000 # Large batches for speed
|
|
||||||
# LOG_LEVEL=INFO
|
# LOG_LEVEL=INFO
|
||||||
|
# CONSOLIDATION_GROUP_LIMIT=80000 # Large batches for speed
|
||||||
|
# PROGRESS_LOG_INTERVAL=20000
|
||||||
```
|
```
|
||||||
|
|
||||||
### 4. Verifica Configurazione (5 min)
|
### 4. Verifica Configurazione (5 min)
|
||||||
@@ -335,8 +336,11 @@ python main.py setup --create-schema
|
|||||||
|
|
||||||
### Migrazione molto lenta
|
### Migrazione molto lenta
|
||||||
```bash
|
```bash
|
||||||
# Aumentare batch size temporaneamente
|
# Aumentare consolidation group limit temporaneamente
|
||||||
# Editare .env: BATCH_SIZE=100000
|
# Editare .env: CONSOLIDATION_GROUP_LIMIT=100000
|
||||||
|
|
||||||
|
# Ridurre logging
|
||||||
|
# Editare .env: PROGRESS_LOG_INTERVAL=50000
|
||||||
|
|
||||||
# Oppure verificare:
|
# Oppure verificare:
|
||||||
# - Latency rete MySQL↔PostgreSQL
|
# - Latency rete MySQL↔PostgreSQL
|
||||||
|
|||||||
@@ -2,244 +2,396 @@
|
|||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
This tool supports three migration modes:
|
This tool implements **consolidation-based incremental migration** for two tables:
|
||||||
|
- **RAWDATACOR**: Raw sensor measurements
|
||||||
|
- **ELABDATADISP**: Elaborated/calculated data
|
||||||
|
|
||||||
1. **Full Migration** (`full_migration.py`) - Initial complete migration
|
Both tables use **consolidation keys** to group and migrate data efficiently.
|
||||||
2. **Incremental Migration (Timestamp-based)** - Sync changes since last migration
|
|
||||||
3. **Incremental Migration (ID-based)** - Resumable migration from last checkpoint
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 1. Initial Full Migration
|
## Migration Modes
|
||||||
|
|
||||||
### First Time Setup
|
### 1. Full Migration
|
||||||
|
|
||||||
|
Initial migration of all historical data, migrating one partition at a time.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Create the PostgreSQL schema
|
# Migrate all partitions for all tables
|
||||||
python main.py setup --create-schema
|
python main.py migrate full
|
||||||
|
|
||||||
# Run full migration (one-time)
|
# Migrate specific table
|
||||||
python main.py migrate --full RAWDATACOR
|
python main.py migrate full --table RAWDATACOR
|
||||||
python main.py migrate --full ELABDATADISP
|
|
||||||
|
# Migrate specific partition (year-based)
|
||||||
|
python main.py migrate full --table ELABDATADISP --partition 2024
|
||||||
|
|
||||||
|
# Dry-run to see what would be migrated
|
||||||
|
python main.py migrate full --dry-run
|
||||||
```
|
```
|
||||||
|
|
||||||
**When to use:** First time migrating data or need complete fresh migration.
|
|
||||||
|
|
||||||
**Characteristics:**
|
**Characteristics:**
|
||||||
- Fetches ALL rows from MySQL
|
- Migrates data partition by partition (year-based)
|
||||||
- No checkpoint tracking
|
- Uses consolidation groups for efficiency
|
||||||
- Cannot resume if interrupted
|
- Tracks progress in `migration_state` table (PostgreSQL)
|
||||||
- Good for initial data load
|
- Can resume from last completed partition if interrupted
|
||||||
|
- Uses `mysql_max_id` optimization for performance
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 2. Timestamp-based Incremental Migration
|
### 2. Incremental Migration
|
||||||
|
|
||||||
### For Continuous Sync (Recommended for most cases)
|
Sync only new data since the last migration.
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# After initial full migration, use incremental with timestamps
|
# Migrate new data for all tables
|
||||||
python main.py migrate --incremental RAWDATACOR
|
python main.py migrate incremental
|
||||||
python main.py migrate --incremental ELABDATADISP
|
|
||||||
|
# Migrate specific table
|
||||||
|
python main.py migrate incremental --table ELABDATADISP
|
||||||
|
|
||||||
|
# Dry-run to check what would be migrated
|
||||||
|
python main.py migrate incremental --dry-run
|
||||||
```
|
```
|
||||||
|
|
||||||
**When to use:** Continuous sync of new/updated records.
|
|
||||||
|
|
||||||
**Characteristics:**
|
**Characteristics:**
|
||||||
- Tracks `created_at` (RAWDATACOR) or `updated_at` (ELABDATADISP)
|
- Uses **consolidation keys** to identify new records:
|
||||||
- Uses JSON state file (`migration_state.json`)
|
- `(UnitName, ToolNameID, EventDate, EventTime)`
|
||||||
- Only fetches rows modified since last run
|
- Tracks last migrated key in `migration_state` table
|
||||||
- Perfect for scheduled jobs (cron, airflow, etc.)
|
- Optimized with `min_mysql_id` filter for performance
|
||||||
- Syncs changes but NOT deletions
|
- Handles duplicates with `ON CONFLICT DO NOTHING`
|
||||||
|
- Perfect for scheduled jobs (cron, systemd timers)
|
||||||
|
|
||||||
**How it works:**
|
**How it works:**
|
||||||
1. First run: Returns with message "No previous migration found" - must run full migration first
|
1. Retrieves `last_key` from `migration_state` table
|
||||||
2. Subsequent runs: Only fetches rows where `created_at` > last_migration_timestamp
|
2. Gets `MAX(mysql_max_id)` from PostgreSQL table for optimization
|
||||||
3. Updates state file with new timestamp for next run
|
3. Queries MySQL: `WHERE id > max_mysql_id AND (key_tuple) > last_key`
|
||||||
|
4. Migrates new consolidation groups
|
||||||
**Example workflow:**
|
5. Updates `migration_state` with new `last_key`
|
||||||
```bash
|
|
||||||
# Day 1: Initial full migration
|
|
||||||
python main.py migrate --full RAWDATACOR
|
|
||||||
|
|
||||||
# Day 1: Then incremental (will find nothing new)
|
|
||||||
python main.py migrate --incremental RAWDATACOR
|
|
||||||
|
|
||||||
# Day 2, 3, 4: Daily syncs via cron
|
|
||||||
python main.py migrate --incremental RAWDATACOR
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 3. ID-based Incremental Migration (Resumable)
|
## Consolidation Keys
|
||||||
|
|
||||||
### For Large Datasets or Unreliable Connections
|
Both tables use consolidation to group multiple measurements into a single JSONB record.
|
||||||
|
|
||||||
```bash
|
### Consolidation Key Structure
|
||||||
# First run
|
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id
|
|
||||||
|
|
||||||
# Can interrupt and resume multiple times
|
```sql
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id
|
(UnitName, ToolNameID, EventDate, EventTime)
|
||||||
```
|
```
|
||||||
|
|
||||||
**When to use:**
|
### Data Quality Validation
|
||||||
- Large datasets that may timeout
|
|
||||||
- Need to resume from exact last position
|
|
||||||
- Network is unstable
|
|
||||||
|
|
||||||
**Characteristics:**
|
The migration automatically validates and logs invalid consolidation keys:
|
||||||
- Tracks `last_id` instead of timestamp
|
- `EventDate IS NULL` or `EventDate = '0000-00-00'`
|
||||||
- Updates state file after EACH BATCH (not just at end)
|
- `ToolNameID IS NULL` or `ToolNameID = ''` (empty string)
|
||||||
- Can interrupt and resume dozens of times
|
- `UnitName IS NULL` or `UnitName = ''` (empty string)
|
||||||
- Resumes from exact record ID where it stopped
|
- `UnitName` starting with `[L` (corrupted Java strings like `[Ljava.lang.String;@...`)
|
||||||
- Works with `migration_state.json`
|
- `EventTime IS NULL`
|
||||||
|
|
||||||
**How it works:**
|
Invalid keys are:
|
||||||
1. First run: Starts from beginning (ID = 0)
|
- **Logged to error files** for tracking and analysis
|
||||||
2. Each batch: Updates state file with max ID from batch
|
- **Skipped automatically** to prevent migration failures
|
||||||
3. Interrupt: Can stop at any time
|
- **Counted and reported** at the end of migration
|
||||||
4. Resume: Next run continues from last ID stored
|
|
||||||
5. Continues until all rows processed
|
|
||||||
|
|
||||||
**Example workflow for large dataset:**
|
Error log files:
|
||||||
```bash
|
- Full migration: `migration_errors_<table>_<partition>.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
|
||||||
# Start ID-based migration (will migrate in batches)
|
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id
|
|
||||||
|
|
||||||
# [If interrupted after 1M rows processed]
|
Each incremental migration creates a new timestamped file to preserve history.
|
||||||
|
|
||||||
# Resume from ID 1M (automatically detects last position)
|
### Why Consolidation?
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id
|
|
||||||
|
|
||||||
# [Continues until complete]
|
Instead of migrating individual sensor readings, we:
|
||||||
|
1. **Group** all measurements for the same (unit, tool, date, time)
|
||||||
|
2. **Transform** 16-25 columns into structured JSONB
|
||||||
|
3. **Migrate** as a single consolidated record
|
||||||
|
|
||||||
|
**Example:**
|
||||||
|
|
||||||
|
MySQL has 10 rows for `(Unit1, Tool1, 2024-01-01, 10:00:00)`:
|
||||||
|
```
|
||||||
|
id | UnitName | ToolNameID | EventDate | EventTime | Val0 | Val1 | ...
|
||||||
|
1 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.5 | 45.2 | ...
|
||||||
|
2 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.6 | 45.3 | ...
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
PostgreSQL gets 1 consolidated record:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"unit_name": "Unit1",
|
||||||
|
"tool_name_id": "Tool1",
|
||||||
|
"event_timestamp": "2024-01-01 10:00:00",
|
||||||
|
"measurements": {
|
||||||
|
"0": {"value": 23.5, "unit": "°C"},
|
||||||
|
"1": {"value": 45.2, "unit": "bar"},
|
||||||
|
...
|
||||||
|
},
|
||||||
|
"mysql_max_id": 10
|
||||||
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## State Management
|
## State Management
|
||||||
|
|
||||||
### State File Location
|
### Migration State Table
|
||||||
```
|
|
||||||
migration_state.json # In project root
|
The `migration_state` table in PostgreSQL tracks migration progress:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
CREATE TABLE migration_state (
|
||||||
|
table_name VARCHAR(50),
|
||||||
|
partition_name VARCHAR(50),
|
||||||
|
last_key JSONB, -- Last migrated consolidation key
|
||||||
|
started_at TIMESTAMP,
|
||||||
|
completed_at TIMESTAMP,
|
||||||
|
total_rows INTEGER,
|
||||||
|
status VARCHAR(20)
|
||||||
|
);
|
||||||
```
|
```
|
||||||
|
|
||||||
### State File Content (Timestamp-based)
|
### State Records
|
||||||
```json
|
|
||||||
{
|
- **Per-partition state**: Tracks each partition's progress
|
||||||
"rawdatacor": {
|
- Example: `('rawdatacor', '2024', {...}, '2024-01-15 10:30:00', 'completed', 1000000)`
|
||||||
"last_timestamp": "2024-12-11T19:30:45.123456",
|
|
||||||
"last_updated": "2024-12-11T19:30:45.123456",
|
- **Global state**: Tracks overall incremental migration position
|
||||||
"total_migrated": 50000
|
- Example: `('rawdatacor', '_global', {...}, NULL, NULL, 0, 'in_progress')`
|
||||||
}
|
|
||||||
}
|
### Checking State
|
||||||
|
|
||||||
|
```sql
|
||||||
|
-- View all migration state
|
||||||
|
SELECT * FROM migration_state ORDER BY table_name, partition_name;
|
||||||
|
|
||||||
|
-- View global state (for incremental migration)
|
||||||
|
SELECT * FROM migration_state WHERE partition_name = '_global';
|
||||||
```
|
```
|
||||||
|
|
||||||
### State File Content (ID-based)
|
---
|
||||||
```json
|
|
||||||
{
|
## Performance Optimization
|
||||||
"rawdatacor": {
|
|
||||||
"last_id": 1000000,
|
### MySQL ID Filter
|
||||||
"total_migrated": 1000000,
|
|
||||||
"last_updated": "2024-12-11T19:45:30.123456"
|
The incremental migration uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries:
|
||||||
}
|
|
||||||
}
|
```sql
|
||||||
|
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
FROM RAWDATACOR
|
||||||
|
WHERE id > 267399536 -- max_mysql_id from PostgreSQL
|
||||||
|
AND (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
|
||||||
|
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
LIMIT 10000
|
||||||
```
|
```
|
||||||
|
|
||||||
### Reset Migration State
|
**Why this is fast:**
|
||||||
```python
|
- Uses PRIMARY KEY index on `id` to skip millions of already-migrated rows
|
||||||
from src.migrator.state import MigrationState
|
- Tuple comparison only applied to filtered subset
|
||||||
|
- Avoids full table scans
|
||||||
|
|
||||||
state = MigrationState()
|
### Consolidation Group Batching
|
||||||
|
|
||||||
# Reset specific table
|
Instead of fetching individual rows, we:
|
||||||
state.reset("rawdatacor")
|
1. Fetch 10,000 consolidation keys at a time
|
||||||
|
2. For each key, fetch all matching rows from MySQL
|
||||||
# Reset all tables
|
3. Transform and insert into PostgreSQL
|
||||||
state.reset()
|
4. Update state every batch
|
||||||
```
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Recommended Workflow
|
## Recommended Workflow
|
||||||
|
|
||||||
### For Daily Continuous Sync
|
### Initial Setup (One-time)
|
||||||
```bash
|
|
||||||
# Week 1: Initial setup
|
|
||||||
python main.py setup --create-schema
|
|
||||||
python main.py migrate --full RAWDATACOR
|
|
||||||
python main.py migrate --full ELABDATADISP
|
|
||||||
|
|
||||||
# Week 2+: Daily incremental syncs (via cron job)
|
```bash
|
||||||
# Schedule: `0 2 * * * cd /path/to/project && python main.py migrate --incremental RAWDATACOR`
|
# 1. Configure .env file
|
||||||
python main.py migrate --incremental RAWDATACOR
|
cp .env.example .env
|
||||||
python main.py migrate --incremental ELABDATADISP
|
nano .env
|
||||||
|
|
||||||
|
# 2. Create PostgreSQL schema
|
||||||
|
python main.py setup --create-schema
|
||||||
|
|
||||||
|
# 3. Run full migration
|
||||||
|
python main.py migrate full
|
||||||
```
|
```
|
||||||
|
|
||||||
### For Large Initial Migration
|
### Daily Incremental Sync
|
||||||
```bash
|
|
||||||
# If dataset > 10 million rows
|
|
||||||
python main.py setup --create-schema
|
|
||||||
python main.py migrate --incremental RAWDATACOR --use-id # Can interrupt/resume
|
|
||||||
|
|
||||||
# For subsequent syncs, use timestamp
|
```bash
|
||||||
python main.py migrate --incremental RAWDATACOR # Timestamp-based
|
# Run incremental migration (via cron or manual)
|
||||||
|
python main.py migrate incremental
|
||||||
|
```
|
||||||
|
|
||||||
|
**Cron example** (daily at 2 AM):
|
||||||
|
```cron
|
||||||
|
0 2 * * * cd /path/to/mysql2postgres && python main.py migrate incremental >> /var/log/migration.log 2>&1
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Key Differences at a Glance
|
## Resuming Interrupted Migrations
|
||||||
|
|
||||||
| Feature | Full | Timestamp | ID-based |
|
### Full Migration
|
||||||
|---------|------|-----------|----------|
|
|
||||||
| Initial setup | ✅ Required first | ✅ After full | ✅ After full |
|
|
||||||
| Sync new/updated | ❌ No | ✅ Yes | ✅ Yes |
|
|
||||||
| Resumable | ❌ No | ⚠️ Partial* | ✅ Full |
|
|
||||||
| Batched state tracking | ❌ No | ❌ No | ✅ Yes |
|
|
||||||
| Large datasets | ⚠️ Risky | ✅ Good | ✅ Best |
|
|
||||||
| Scheduled jobs | ❌ No | ✅ Perfect | ⚠️ Unnecessary |
|
|
||||||
|
|
||||||
*Timestamp mode can resume, but must wait for full batch to complete before continuing
|
If interrupted, full migration resumes from the last completed partition:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# First run: migrates partitions 2014, 2015, 2016... (interrupted after 2020)
|
||||||
|
python main.py migrate full --table RAWDATACOR
|
||||||
|
|
||||||
|
# Resume: continues from partition 2021
|
||||||
|
python main.py migrate full --table RAWDATACOR
|
||||||
|
```
|
||||||
|
|
||||||
|
### Incremental Migration
|
||||||
|
|
||||||
|
Incremental migration uses the `last_key` from `migration_state`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Always safe to re-run - uses ON CONFLICT DO NOTHING
|
||||||
|
python main.py migrate incremental
|
||||||
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Default Partitions
|
## Syncing Migration State
|
||||||
|
|
||||||
Both tables are partitioned by year (2014-2031) plus a DEFAULT partition:
|
If `migration_state` becomes out of sync with actual data, use the sync utility:
|
||||||
- **rawdatacor_2014** through **rawdatacor_2031** (yearly partitions)
|
|
||||||
- **rawdatacor_default** (catches data outside 2014-2031)
|
|
||||||
|
|
||||||
Same for ELABDATADISP. This ensures data with edge-case timestamps doesn't break migration.
|
```bash
|
||||||
|
# Sync migration_state with actual PostgreSQL data
|
||||||
|
python scripts/sync_migration_state.py
|
||||||
|
```
|
||||||
|
|
||||||
|
This finds the most recent row (by `created_at`) and updates `migration_state._global`.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Monitoring
|
## Monitoring
|
||||||
|
|
||||||
### Check Migration Progress
|
### Check Progress
|
||||||
```bash
|
|
||||||
# View state file
|
|
||||||
cat migration_state.json
|
|
||||||
|
|
||||||
# Check PostgreSQL row counts
|
```bash
|
||||||
psql -U postgres -h localhost -d your_db -c "SELECT COUNT(*) FROM rawdatacor;"
|
# View migration state
|
||||||
|
psql -h localhost -U postgres -d migrated_db -c \
|
||||||
|
"SELECT table_name, partition_name, status, total_rows, completed_at
|
||||||
|
FROM migration_state
|
||||||
|
ORDER BY table_name, partition_name"
|
||||||
```
|
```
|
||||||
|
|
||||||
### Common Issues
|
### Verify Row Counts
|
||||||
|
|
||||||
**"No previous migration found"** (Timestamp mode)
|
```sql
|
||||||
- Solution: Run full migration first with `--full` flag
|
-- PostgreSQL
|
||||||
|
SELECT COUNT(*) FROM rawdatacor;
|
||||||
|
SELECT COUNT(*) FROM elabdatadisp;
|
||||||
|
|
||||||
**"Duplicate key value violates unique constraint"**
|
-- Compare with MySQL
|
||||||
- Cause: Running full migration twice
|
-- mysql> SELECT COUNT(DISTINCT UnitName, ToolNameID, EventDate, EventTime) FROM RAWDATACOR;
|
||||||
- Solution: Use timestamp-based incremental sync instead
|
```
|
||||||
|
|
||||||
**"Timeout during migration"** (Large datasets)
|
---
|
||||||
- Solution: Switch to ID-based resumable migration with `--use-id`
|
|
||||||
|
## Common Issues
|
||||||
|
|
||||||
|
### "No previous migration found"
|
||||||
|
|
||||||
|
**Cause**: Running incremental migration before full migration
|
||||||
|
|
||||||
|
**Solution**: Run full migration first
|
||||||
|
```bash
|
||||||
|
python main.py migrate full
|
||||||
|
```
|
||||||
|
|
||||||
|
### "Duplicate key value violates unique constraint"
|
||||||
|
|
||||||
|
**Cause**: Data already exists (shouldn't happen with ON CONFLICT DO NOTHING)
|
||||||
|
|
||||||
|
**Solution**: Migration handles this automatically - check logs for details
|
||||||
|
|
||||||
|
### Slow Incremental Migration
|
||||||
|
|
||||||
|
**Cause**: `MAX(mysql_max_id)` query is slow (~60 seconds on large tables)
|
||||||
|
|
||||||
|
**Solution**: This is expected and only happens once at start. The MySQL queries are instant thanks to the optimization.
|
||||||
|
|
||||||
|
**Alternative**: Create an index on `mysql_max_id` in PostgreSQL (uses disk space):
|
||||||
|
```sql
|
||||||
|
CREATE INDEX idx_rawdatacor_mysql_max_id ON rawdatacor (mysql_max_id DESC);
|
||||||
|
CREATE INDEX idx_elabdatadisp_mysql_max_id ON elabdatadisp (mysql_max_id DESC);
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Key Technical Details
|
||||||
|
|
||||||
|
### Tuple Comparison in MySQL
|
||||||
|
|
||||||
|
MySQL supports lexicographic tuple comparison:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
WHERE (UnitName, ToolNameID, EventDate, EventTime) > ('Unit1', 'Tool1', '2024-01-01', '10:00:00')
|
||||||
|
```
|
||||||
|
|
||||||
|
This is equivalent to:
|
||||||
|
```sql
|
||||||
|
WHERE UnitName > 'Unit1'
|
||||||
|
OR (UnitName = 'Unit1' AND ToolNameID > 'Tool1')
|
||||||
|
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate > '2024-01-01')
|
||||||
|
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate = '2024-01-01' AND EventTime > '10:00:00')
|
||||||
|
```
|
||||||
|
|
||||||
|
But much more efficient!
|
||||||
|
|
||||||
|
### Partitioning in PostgreSQL
|
||||||
|
|
||||||
|
Tables are partitioned by year (2014-2031):
|
||||||
|
```sql
|
||||||
|
CREATE TABLE rawdatacor_2024 PARTITION OF rawdatacor
|
||||||
|
FOR VALUES FROM (2024) TO (2025);
|
||||||
|
```
|
||||||
|
|
||||||
|
PostgreSQL automatically routes INSERTs to the correct partition based on `event_year`.
|
||||||
|
|
||||||
|
### Indexes in PostgreSQL
|
||||||
|
|
||||||
|
Both tables have these indexes automatically created:
|
||||||
|
|
||||||
|
**Primary Key** (required for partitioned tables):
|
||||||
|
```sql
|
||||||
|
-- Must include partition key (event_year)
|
||||||
|
UNIQUE (id, event_year)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Consolidation Key** (prevents duplicates):
|
||||||
|
```sql
|
||||||
|
-- Ensures one record per consolidation group
|
||||||
|
UNIQUE (unit_name, tool_name_id, event_timestamp, event_year)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Query Optimization**:
|
||||||
|
```sql
|
||||||
|
-- Fast filtering by unit/tool
|
||||||
|
(unit_name, tool_name_id)
|
||||||
|
|
||||||
|
-- JSONB queries with GIN index
|
||||||
|
GIN (measurements)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Note**: All indexes are automatically created on all partitions when you run `setup --create-schema`.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
- **Start with:** Full migration (`--full`) for initial data load
|
1. **Full migration**: One-time initial load, partition by partition
|
||||||
- **Then use:** Timestamp-based incremental (`--incremental`) for daily syncs
|
2. **Incremental migration**: Daily sync of new data using consolidation keys
|
||||||
- **Switch to:** ID-based resumable (`--incremental --use-id`) if full migration is too large
|
3. **State tracking**: PostgreSQL `migration_state` table
|
||||||
|
4. **Performance**: `mysql_max_id` filter + consolidation batching
|
||||||
|
5. **Resumable**: Both modes can resume from interruptions
|
||||||
|
6. **Safe**: ON CONFLICT DO NOTHING prevents duplicates
|
||||||
|
|||||||
@@ -42,8 +42,9 @@ POSTGRES_USER=postgres
|
|||||||
POSTGRES_PASSWORD=pgpassword
|
POSTGRES_PASSWORD=pgpassword
|
||||||
POSTGRES_DATABASE=migrated_db
|
POSTGRES_DATABASE=migrated_db
|
||||||
|
|
||||||
BATCH_SIZE=10000
|
|
||||||
LOG_LEVEL=INFO
|
LOG_LEVEL=INFO
|
||||||
|
CONSOLIDATION_GROUP_LIMIT=40000
|
||||||
|
PROGRESS_LOG_INTERVAL=10000
|
||||||
```
|
```
|
||||||
|
|
||||||
### 3. Creare PostgreSQL in Incus
|
### 3. Creare PostgreSQL in Incus
|
||||||
@@ -111,8 +112,11 @@ python main.py migrate full --table RAWDATACOR
|
|||||||
# Migrare solo i cambiamenti dal last sync
|
# Migrare solo i cambiamenti dal last sync
|
||||||
python main.py migrate incremental
|
python main.py migrate incremental
|
||||||
|
|
||||||
# Con stato personalizzato
|
# Dry-run per vedere cosa verrebbe migrato
|
||||||
python main.py migrate incremental --state-file daily_sync.json
|
python main.py migrate incremental --dry-run
|
||||||
|
|
||||||
|
# Solo una tabella specifica
|
||||||
|
python main.py migrate incremental --table RAWDATACOR
|
||||||
```
|
```
|
||||||
|
|
||||||
### Benchmark
|
### Benchmark
|
||||||
@@ -221,8 +225,11 @@ python main.py setup --create-schema
|
|||||||
|
|
||||||
### "Migration is slow"
|
### "Migration is slow"
|
||||||
```bash
|
```bash
|
||||||
# Aumentare batch size in .env
|
# Aumentare consolidation group limit in .env
|
||||||
BATCH_SIZE=50000
|
CONSOLIDATION_GROUP_LIMIT=80000
|
||||||
|
|
||||||
|
# Oppure ridurre logging
|
||||||
|
PROGRESS_LOG_INTERVAL=20000
|
||||||
|
|
||||||
# Oppure ottimizzare MySQL
|
# Oppure ottimizzare MySQL
|
||||||
mysql> FLUSH PRIVILEGES;
|
mysql> FLUSH PRIVILEGES;
|
||||||
|
|||||||
115
config.py
115
config.py
@@ -187,3 +187,118 @@ TABLE_CONFIGS = {
|
|||||||
"elabdatadisp": _elabdatadisp_config,
|
"elabdatadisp": _elabdatadisp_config,
|
||||||
"ELABDATADISP": _elabdatadisp_config,
|
"ELABDATADISP": _elabdatadisp_config,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Partition mapping utilities
|
||||||
|
def year_to_partition_name(year: int, table: str) -> str:
|
||||||
|
"""Map year to partition name.
|
||||||
|
|
||||||
|
Partition naming scheme (different for each table):
|
||||||
|
- RAWDATACOR: part0=2014, part1=2015, ..., part10=2024 (part{year-2014})
|
||||||
|
- ELABDATADISP: d0=2013, d1=2014, ..., d12=2025, ..., d17=2030 (d{year-2013})
|
||||||
|
|
||||||
|
Args:
|
||||||
|
year: Year (2013-2031, depending on table)
|
||||||
|
table: Table name (RAWDATACOR or ELABDATADISP)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Partition name (e.g., "part8" for RAWDATACOR/2022, "d14" for ELABDATADISP/2026)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If year is out of range or table is unknown
|
||||||
|
"""
|
||||||
|
table_upper = table.upper()
|
||||||
|
|
||||||
|
if table_upper == "RAWDATACOR":
|
||||||
|
# RAWDATACOR: 2014-2024 (part0-part10)
|
||||||
|
# RAWDATACOR: 2025-2030 (d12-d17)
|
||||||
|
|
||||||
|
if year < 2014:
|
||||||
|
year = 2014
|
||||||
|
elif year > 2030:
|
||||||
|
year = 2030
|
||||||
|
|
||||||
|
if year < 2025:
|
||||||
|
suffix = "part"
|
||||||
|
d_year = 2014
|
||||||
|
else:
|
||||||
|
suffix = "d"
|
||||||
|
d_year = 2013 # Continue naming as d12, d13, ...
|
||||||
|
|
||||||
|
partition_index = year - d_year # 2014→0, 2015→1, ..., 2024→10 - 2025→12, ..., 2030→17
|
||||||
|
return f"{suffix}{partition_index}"
|
||||||
|
|
||||||
|
elif table_upper == "ELABDATADISP":
|
||||||
|
# ELABDATADISP: 2013-2031 (d0-d18)
|
||||||
|
if year < 2013:
|
||||||
|
year = 2013
|
||||||
|
elif year > 2031:
|
||||||
|
year = 2031
|
||||||
|
|
||||||
|
partition_index = year - 2013 # 2013→0, 2014→1, ..., 2025→12, ..., 2031→18
|
||||||
|
return f"d{partition_index}"
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unknown table: {table}")
|
||||||
|
|
||||||
|
|
||||||
|
def get_partitions_from_year(year: int, table: str) -> list[str]:
|
||||||
|
"""Get list of partition names from a specific year onwards.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
year: Starting year
|
||||||
|
table: Table name (RAWDATACOR or ELABDATADISP)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of partition names from that year to the latest available year
|
||||||
|
|
||||||
|
Example:
|
||||||
|
get_partitions_from_year(2022, "RAWDATACOR")
|
||||||
|
→ ["part8", "part9", "part10", "d12", "d13", "d14", "d15", "d16", "d17"] # 2022→part8, ..., 2024→part10, 2025→d12, ..., 2030→d17
|
||||||
|
|
||||||
|
get_partitions_from_year(2025, "ELABDATADISP")
|
||||||
|
→ ["d12", "d13", "d14", "d15", "d16", "d17"] # 2025-2030
|
||||||
|
"""
|
||||||
|
table_upper = table.upper()
|
||||||
|
partitions = []
|
||||||
|
|
||||||
|
if table_upper == "RAWDATACOR":
|
||||||
|
end_year = 2030 # RAWDATACOR: part0-part10 (2014-2024) + d12-d17 (2025-2030)
|
||||||
|
elif table_upper == "ELABDATADISP":
|
||||||
|
end_year = 2030 # ELABDATADISP: d0-d17 (2013-2030)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unknown table: {table}")
|
||||||
|
|
||||||
|
# Generate partitions for each year from start_year to end_year
|
||||||
|
for y in range(year, end_year + 1):
|
||||||
|
partition_name = year_to_partition_name(y, table)
|
||||||
|
# Avoid duplicates (can happen if mapping multiple years to same partition)
|
||||||
|
if not partitions or partitions[-1] != partition_name:
|
||||||
|
partitions.append(partition_name)
|
||||||
|
|
||||||
|
return partitions
|
||||||
|
|
||||||
|
|
||||||
|
def date_string_to_partition_name(date_str: str, table: str) -> str:
|
||||||
|
"""Extract year from date string and map to partition name.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
date_str: Date string in format 'YYYY-MM-DD' (e.g., '2022-05-15')
|
||||||
|
table: Table name (RAWDATACOR or ELABDATADISP)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Partition name (e.g., "part8" or "d8")
|
||||||
|
|
||||||
|
Example:
|
||||||
|
date_string_to_partition_name("2022-05-15", "RAWDATACOR") → "part8"
|
||||||
|
"""
|
||||||
|
if not date_str or len(date_str) < 4:
|
||||||
|
# Default to 2014 if invalid date
|
||||||
|
return year_to_partition_name(2014, table)
|
||||||
|
|
||||||
|
try:
|
||||||
|
year = int(date_str[:4])
|
||||||
|
return year_to_partition_name(year, table)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
# Default to 2014 if can't parse
|
||||||
|
return year_to_partition_name(2014, table)
|
||||||
|
|||||||
43
main.py
43
main.py
@@ -31,7 +31,7 @@ def cli(ctx):
|
|||||||
)
|
)
|
||||||
def setup(create_schema):
|
def setup(create_schema):
|
||||||
"""Setup PostgreSQL database."""
|
"""Setup PostgreSQL database."""
|
||||||
setup_logger(__name__)
|
setup_logger("") # Configure root logger to show all module logs
|
||||||
|
|
||||||
if not create_schema:
|
if not create_schema:
|
||||||
click.echo("Usage: python main.py setup --create-schema")
|
click.echo("Usage: python main.py setup --create-schema")
|
||||||
@@ -89,7 +89,7 @@ def migrate():
|
|||||||
)
|
)
|
||||||
def full(table, dry_run, resume, partition, parallel):
|
def full(table, dry_run, resume, partition, parallel):
|
||||||
"""Perform full migration of all data."""
|
"""Perform full migration of all data."""
|
||||||
setup_logger(__name__)
|
setup_logger("") # Configure root logger to show all module logs
|
||||||
|
|
||||||
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
|
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
|
||||||
|
|
||||||
@@ -178,7 +178,7 @@ def incremental(table, dry_run):
|
|||||||
)
|
)
|
||||||
def benchmark(iterations, output):
|
def benchmark(iterations, output):
|
||||||
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
|
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
|
||||||
setup_logger(__name__)
|
setup_logger("") # Configure root logger to show all module logs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
click.echo("Running performance benchmarks...")
|
click.echo("Running performance benchmarks...")
|
||||||
@@ -194,7 +194,7 @@ def benchmark(iterations, output):
|
|||||||
@cli.command()
|
@cli.command()
|
||||||
def info():
|
def info():
|
||||||
"""Show configuration information."""
|
"""Show configuration information."""
|
||||||
setup_logger(__name__)
|
setup_logger("") # Configure root logger to show all module logs
|
||||||
|
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
|
|
||||||
@@ -219,5 +219,40 @@ def info():
|
|||||||
click.echo(f" Iterations: {settings.benchmark.iterations}")
|
click.echo(f" Iterations: {settings.benchmark.iterations}")
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.option(
|
||||||
|
"--port",
|
||||||
|
type=int,
|
||||||
|
default=7860,
|
||||||
|
help="Port to run the web interface on (default: 7860)"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--share",
|
||||||
|
is_flag=True,
|
||||||
|
help="Create a public share link (useful for remote access)"
|
||||||
|
)
|
||||||
|
def web(port, share):
|
||||||
|
"""Launch web-based GUI for migration monitoring and control."""
|
||||||
|
setup_logger("") # Configure root logger to show all module logs
|
||||||
|
|
||||||
|
try:
|
||||||
|
from web_ui import launch_ui
|
||||||
|
|
||||||
|
click.echo(f"\n🚀 Starting Migration Dashboard on http://localhost:{port}")
|
||||||
|
if share:
|
||||||
|
click.echo("📡 Creating public share link...")
|
||||||
|
|
||||||
|
launch_ui(share=share, server_port=port)
|
||||||
|
|
||||||
|
except ImportError as e:
|
||||||
|
click.echo(f"✗ Failed to import web_ui module: {e}", err=True)
|
||||||
|
click.echo("Make sure gradio is installed: uv sync", err=True)
|
||||||
|
sys.exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Web interface failed: {e}")
|
||||||
|
click.echo(f"✗ Web interface failed: {e}", err=True)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
cli(obj={})
|
cli(obj={})
|
||||||
|
|||||||
@@ -13,4 +13,7 @@ dependencies = [
|
|||||||
"pydantic>=2.5.0",
|
"pydantic>=2.5.0",
|
||||||
"pydantic-settings>=2.1.0",
|
"pydantic-settings>=2.1.0",
|
||||||
"cryptography>=46.0.3",
|
"cryptography>=46.0.3",
|
||||||
|
"gradio>=4.0.0",
|
||||||
|
"pandas>=2.0.0",
|
||||||
|
"plotly>=5.0.0",
|
||||||
]
|
]
|
||||||
|
|||||||
78
scripts/README.md
Normal file
78
scripts/README.md
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
# Migration Scripts
|
||||||
|
|
||||||
|
Utility scripts per la gestione della migrazione.
|
||||||
|
|
||||||
|
## sync_migration_state.py
|
||||||
|
|
||||||
|
Sincronizza la tabella `migration_state` con i dati effettivamente presenti in PostgreSQL.
|
||||||
|
|
||||||
|
### Quando usare
|
||||||
|
|
||||||
|
Usa questo script quando `migration_state` non è sincronizzato con i dati reali, ad esempio:
|
||||||
|
- Dopo inserimenti manuali in PostgreSQL
|
||||||
|
- Dopo corruzione dello stato
|
||||||
|
- Prima di eseguire migrazione incrementale su dati già esistenti
|
||||||
|
|
||||||
|
### Come funziona
|
||||||
|
|
||||||
|
Per ogni tabella (rawdatacor, elabdatadisp):
|
||||||
|
1. Trova la riga con MAX(created_at) - l'ultima riga inserita
|
||||||
|
2. Estrae la consolidation key da quella riga
|
||||||
|
3. Aggiorna `migration_state._global` con quella chiave
|
||||||
|
|
||||||
|
### Utilizzo
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Eseguire dalla root del progetto
|
||||||
|
python scripts/sync_migration_state.py
|
||||||
|
```
|
||||||
|
|
||||||
|
### Output
|
||||||
|
|
||||||
|
```
|
||||||
|
Syncing migration_state with actual PostgreSQL data...
|
||||||
|
================================================================================
|
||||||
|
|
||||||
|
ELABDATADISP:
|
||||||
|
Most recently inserted row (by created_at):
|
||||||
|
created_at: 2025-12-30 11:58:24
|
||||||
|
event_timestamp: 2025-12-30 14:58:24
|
||||||
|
Consolidation key: (ID0290, DT0007, 2025-12-30, 14:58:24)
|
||||||
|
✓ Updated migration_state with this key
|
||||||
|
|
||||||
|
RAWDATACOR:
|
||||||
|
Most recently inserted row (by created_at):
|
||||||
|
created_at: 2025-12-30 11:13:29
|
||||||
|
event_timestamp: 2025-12-30 11:11:39
|
||||||
|
Consolidation key: (ID0304, DT0024, 2025-12-30, 11:11:39)
|
||||||
|
✓ Updated migration_state with this key
|
||||||
|
|
||||||
|
================================================================================
|
||||||
|
✓ Done! Incremental migration will now start from the correct position.
|
||||||
|
```
|
||||||
|
|
||||||
|
### Effetti
|
||||||
|
|
||||||
|
Dopo aver eseguito questo script:
|
||||||
|
- `migration_state._global` sarà aggiornato con l'ultima chiave migrata
|
||||||
|
- `python main.py migrate incremental` partirà dalla posizione corretta
|
||||||
|
- Non verranno create duplicazioni (usa ON CONFLICT DO NOTHING)
|
||||||
|
|
||||||
|
### Avvertenze
|
||||||
|
|
||||||
|
- Esclude automaticamente dati corrotti (unit_name come `[Ljava.lang.String;@...`)
|
||||||
|
- Usa `created_at` per trovare l'ultima riga inserita (non `event_timestamp`)
|
||||||
|
- Sovrascrive lo stato globale esistente
|
||||||
|
|
||||||
|
### Verifica
|
||||||
|
|
||||||
|
Dopo aver eseguito lo script, verifica lo stato:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SELECT table_name, partition_name, last_key
|
||||||
|
FROM migration_state
|
||||||
|
WHERE partition_name = '_global'
|
||||||
|
ORDER BY table_name;
|
||||||
|
```
|
||||||
|
|
||||||
|
Dovrebbe mostrare le chiavi più recenti per entrambe le tabelle.
|
||||||
63
scripts/sync_migration_state.py
Executable file
63
scripts/sync_migration_state.py
Executable file
@@ -0,0 +1,63 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Sync migration_state with actual data in PostgreSQL tables."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
sys.path.insert(0, '/home/alex/devel/mysql2postgres')
|
||||||
|
|
||||||
|
from src.connectors.postgres_connector import PostgreSQLConnector
|
||||||
|
from src.migrator.state_manager import StateManager
|
||||||
|
|
||||||
|
def sync_table_state(table_name: str):
|
||||||
|
"""Sync migration_state for a table with its actual data."""
|
||||||
|
with PostgreSQLConnector() as pg_conn:
|
||||||
|
cursor = pg_conn.connection.cursor()
|
||||||
|
|
||||||
|
# Find the row with MAX(created_at) - most recently inserted
|
||||||
|
# Exclude corrupted data (Java strings)
|
||||||
|
cursor.execute(f"""
|
||||||
|
SELECT unit_name, tool_name_id,
|
||||||
|
DATE(event_timestamp)::text as event_date,
|
||||||
|
event_timestamp::time::text as event_time,
|
||||||
|
created_at,
|
||||||
|
event_timestamp
|
||||||
|
FROM {table_name}
|
||||||
|
WHERE unit_name NOT LIKE '[L%' -- Exclude corrupted Java strings
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
""")
|
||||||
|
|
||||||
|
result = cursor.fetchone()
|
||||||
|
if not result:
|
||||||
|
print(f"No data found in {table_name}")
|
||||||
|
return
|
||||||
|
|
||||||
|
unit_name, tool_name_id, event_date, event_time, created_at, event_timestamp = result
|
||||||
|
|
||||||
|
print(f"\n{table_name.upper()}:")
|
||||||
|
print(f" Most recently inserted row (by created_at):")
|
||||||
|
print(f" created_at: {created_at}")
|
||||||
|
print(f" event_timestamp: {event_timestamp}")
|
||||||
|
print(f" Consolidation key: ({unit_name}, {tool_name_id}, {event_date}, {event_time})")
|
||||||
|
|
||||||
|
# Update global migration_state with this key
|
||||||
|
state_mgr = StateManager(pg_conn, table_name, partition_name="_global")
|
||||||
|
|
||||||
|
last_key = {
|
||||||
|
"unit_name": unit_name,
|
||||||
|
"tool_name_id": tool_name_id,
|
||||||
|
"event_date": event_date,
|
||||||
|
"event_time": event_time
|
||||||
|
}
|
||||||
|
|
||||||
|
state_mgr.update_state(last_key=last_key)
|
||||||
|
print(f" ✓ Updated migration_state with this key")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("Syncing migration_state with actual PostgreSQL data...")
|
||||||
|
print("="*80)
|
||||||
|
|
||||||
|
sync_table_state("elabdatadisp")
|
||||||
|
sync_table_state("rawdatacor")
|
||||||
|
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("✓ Done! Incremental migration will now start from the correct position.")
|
||||||
@@ -294,6 +294,7 @@ class MySQLConnector:
|
|||||||
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
|
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
|
||||||
LIMIT %s
|
LIMIT %s
|
||||||
"""
|
"""
|
||||||
|
logger.info(f"Executing first query on partition {partition} (fetching up to {limit} rows, sorted by consolidation key)...")
|
||||||
cursor.execute(rows_query, (limit,))
|
cursor.execute(rows_query, (limit,))
|
||||||
else:
|
else:
|
||||||
# Resume AFTER last completely yielded key
|
# Resume AFTER last completely yielded key
|
||||||
@@ -304,9 +305,11 @@ class MySQLConnector:
|
|||||||
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
|
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
|
||||||
LIMIT %s
|
LIMIT %s
|
||||||
"""
|
"""
|
||||||
|
logger.debug(f"Executing query on partition {partition} (resuming from key {last_completed_key}, limit {limit})...")
|
||||||
cursor.execute(rows_query, (last_completed_key[0], last_completed_key[1], last_completed_key[2], last_completed_key[3], limit))
|
cursor.execute(rows_query, (last_completed_key[0], last_completed_key[1], last_completed_key[2], last_completed_key[3], limit))
|
||||||
|
|
||||||
rows = cursor.fetchall()
|
rows = cursor.fetchall()
|
||||||
|
logger.debug(f"Fetched {len(rows)} rows from partition {partition}")
|
||||||
|
|
||||||
if not rows:
|
if not rows:
|
||||||
# No more rows - yield any buffered group and finish
|
# No more rows - yield any buffered group and finish
|
||||||
@@ -692,6 +695,92 @@ class MySQLConnector:
|
|||||||
)
|
)
|
||||||
self._reconnect()
|
self._reconnect()
|
||||||
|
|
||||||
|
def fetch_consolidation_keys_from_partition_after(
|
||||||
|
self,
|
||||||
|
table: str,
|
||||||
|
partition: str,
|
||||||
|
after_key: Optional[Dict[str, Any]] = None,
|
||||||
|
limit: Optional[int] = None
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Fetch distinct consolidation keys from a specific partition after a specific key.
|
||||||
|
|
||||||
|
Optimized version for incremental migration that queries only one partition.
|
||||||
|
|
||||||
|
Query pattern:
|
||||||
|
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
FROM table PARTITION (partition_name)
|
||||||
|
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
|
||||||
|
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
LIMIT X
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table: Table name (RAWDATACOR or ELABDATADISP)
|
||||||
|
partition: Partition name (e.g., 'part8', 'd9')
|
||||||
|
after_key: Start after this key (dict with unit_name, tool_name_id, event_date, event_time)
|
||||||
|
limit: Number of keys to fetch (uses CONSOLIDATION_GROUP_LIMIT if None)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of dicts with keys: UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
"""
|
||||||
|
if table not in ("RAWDATACOR", "ELABDATADISP"):
|
||||||
|
raise ValueError(f"Consolidation not supported for table {table}")
|
||||||
|
|
||||||
|
if limit is None:
|
||||||
|
limit = self.settings.migration.consolidation_group_limit
|
||||||
|
|
||||||
|
retries = 0
|
||||||
|
while retries < self.MAX_RETRIES:
|
||||||
|
try:
|
||||||
|
with self.connection.cursor() as cursor:
|
||||||
|
if after_key:
|
||||||
|
# Fetch keys AFTER the last migrated key from this specific partition
|
||||||
|
query = f"""
|
||||||
|
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
FROM `{table}` PARTITION (`{partition}`)
|
||||||
|
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s)
|
||||||
|
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
LIMIT %s
|
||||||
|
"""
|
||||||
|
cursor.execute(
|
||||||
|
query,
|
||||||
|
(
|
||||||
|
after_key.get("unit_name"),
|
||||||
|
after_key.get("tool_name_id"),
|
||||||
|
after_key.get("event_date"),
|
||||||
|
after_key.get("event_time"),
|
||||||
|
limit
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No after_key: fetch from beginning of partition
|
||||||
|
query = f"""
|
||||||
|
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
FROM `{table}` PARTITION (`{partition}`)
|
||||||
|
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||||
|
LIMIT %s
|
||||||
|
"""
|
||||||
|
cursor.execute(query, (limit,))
|
||||||
|
|
||||||
|
keys = cursor.fetchall()
|
||||||
|
return keys
|
||||||
|
|
||||||
|
except pymysql.Error as e:
|
||||||
|
retries += 1
|
||||||
|
if retries >= self.MAX_RETRIES:
|
||||||
|
logger.error(
|
||||||
|
f"Failed to fetch consolidation keys from {table} PARTITION ({partition}) "
|
||||||
|
f"(after_key={after_key}) after {self.MAX_RETRIES} retries: {e}"
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
f"Fetch consolidation keys from partition failed (retry {retries}/{self.MAX_RETRIES}): {e}"
|
||||||
|
)
|
||||||
|
self._reconnect()
|
||||||
|
|
||||||
def fetch_records_for_key_all_partitions(
|
def fetch_records_for_key_all_partitions(
|
||||||
self,
|
self,
|
||||||
table: str,
|
table: str,
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from src.migrator.consolidator import consolidate_rows
|
|||||||
from src.migrator.state_manager import StateManager
|
from src.migrator.state_manager import StateManager
|
||||||
from src.utils.logger import get_logger
|
from src.utils.logger import get_logger
|
||||||
from src.utils.progress import ProgressTracker
|
from src.utils.progress import ProgressTracker
|
||||||
|
from src.utils.validation import validate_consolidation_key, ErrorLogger
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
@@ -60,6 +61,9 @@ class IncrementalMigrator:
|
|||||||
# Initialize state manager
|
# Initialize state manager
|
||||||
state_mgr = StateManager(pg_conn, pg_table)
|
state_mgr = StateManager(pg_conn, pg_table)
|
||||||
|
|
||||||
|
# Initialize error logger
|
||||||
|
error_logger = ErrorLogger(pg_table, "incremental")
|
||||||
|
|
||||||
# Get last migrated key from migration_state
|
# Get last migrated key from migration_state
|
||||||
# This was saved during the last full/incremental migration
|
# This was saved during the last full/incremental migration
|
||||||
last_key = state_mgr.get_last_key()
|
last_key = state_mgr.get_last_key()
|
||||||
@@ -77,52 +81,63 @@ class IncrementalMigrator:
|
|||||||
f"{last_key.get('event_date')}, {last_key.get('event_time')})"
|
f"{last_key.get('event_date')}, {last_key.get('event_time')})"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Get max MySQL ID already migrated to optimize query performance
|
# Determine which partitions to process based on last_key's event_date
|
||||||
cursor = pg_conn.connection.cursor()
|
# This is a MAJOR optimization: instead of querying the entire table,
|
||||||
cursor.execute(f"SELECT MAX(mysql_max_id) FROM {pg_table}")
|
# we only process partitions from the last migrated year onwards
|
||||||
result = cursor.fetchone()
|
from config import get_partitions_from_year, date_string_to_partition_name
|
||||||
max_mysql_id = result[0] if result and result[0] else 0
|
|
||||||
|
|
||||||
logger.info(f"Max MySQL ID already migrated: {max_mysql_id}")
|
last_event_date = last_key.get('event_date')
|
||||||
|
if not last_event_date:
|
||||||
|
logger.warning("Last key has no event_date, starting from 2014")
|
||||||
|
partitions_to_process = mysql_conn.get_table_partitions(mysql_table)
|
||||||
|
else:
|
||||||
|
# Extract year from last_event_date and get partitions from that year onwards
|
||||||
|
year = int(str(last_event_date)[:4]) if len(str(last_event_date)) >= 4 else 2014
|
||||||
|
partitions_to_process = get_partitions_from_year(year, mysql_table)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Optimized incremental sync: Processing only {len(partitions_to_process)} "
|
||||||
|
f"partitions from year {year} onwards: {partitions_to_process}"
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"Skipping partitions before year {year} (no new data possible there)"
|
||||||
|
)
|
||||||
|
|
||||||
if dry_run:
|
if dry_run:
|
||||||
# In dry-run, check how many new keys exist in MySQL
|
# In dry-run, check how many new keys exist in MySQL
|
||||||
logger.info("[DRY RUN] Checking for new keys in MySQL...")
|
logger.info("[DRY RUN] Checking for new keys across relevant partitions...")
|
||||||
|
|
||||||
# Sample first 100 keys to check if there are new records
|
total_new_keys = 0
|
||||||
sample_keys = mysql_conn.fetch_consolidation_keys_after(
|
first_keys_found = []
|
||||||
|
|
||||||
|
for partition in partitions_to_process:
|
||||||
|
# For first partition (same year as last_key), use after_key
|
||||||
|
# For subsequent partitions, start from beginning
|
||||||
|
if partition == partitions_to_process[0]:
|
||||||
|
sample_keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
|
||||||
mysql_table,
|
mysql_table,
|
||||||
|
partition=partition,
|
||||||
after_key=last_key,
|
after_key=last_key,
|
||||||
min_mysql_id=max_mysql_id,
|
limit=100
|
||||||
limit=100,
|
|
||||||
offset=0
|
|
||||||
)
|
)
|
||||||
|
logger.info(f"[DRY RUN] Partition {partition}: {len(sample_keys)} new keys (after last_key)")
|
||||||
if sample_keys:
|
else:
|
||||||
# If we found 100 keys in the sample, there might be many more
|
sample_keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
|
||||||
# Try to get a rough count by checking larger offsets
|
|
||||||
if len(sample_keys) == 100:
|
|
||||||
# There are at least 100 keys, check if there are more
|
|
||||||
logger.info(
|
|
||||||
f"[DRY RUN] Found at least 100 new keys, checking total count..."
|
|
||||||
)
|
|
||||||
# Sample at different offsets to estimate total
|
|
||||||
test_batch = mysql_conn.fetch_consolidation_keys_after(
|
|
||||||
mysql_table,
|
mysql_table,
|
||||||
after_key=last_key,
|
partition=partition,
|
||||||
min_mysql_id=max_mysql_id,
|
after_key=None, # All keys from this partition are new
|
||||||
limit=1,
|
limit=100
|
||||||
offset=1000
|
|
||||||
)
|
)
|
||||||
if test_batch:
|
logger.info(f"[DRY RUN] Partition {partition}: {len(sample_keys)} new keys (all new)")
|
||||||
logger.info(f"[DRY RUN] Estimated: More than 1000 new keys to migrate")
|
|
||||||
else:
|
|
||||||
logger.info(f"[DRY RUN] Estimated: Between 100-1000 new keys to migrate")
|
|
||||||
else:
|
|
||||||
logger.info(f"[DRY RUN] Found {len(sample_keys)} new keys to migrate")
|
|
||||||
|
|
||||||
|
total_new_keys += len(sample_keys)
|
||||||
|
if sample_keys and len(first_keys_found) < 3:
|
||||||
|
first_keys_found.extend(sample_keys[:3 - len(first_keys_found)])
|
||||||
|
|
||||||
|
if total_new_keys > 0:
|
||||||
|
logger.info(f"[DRY RUN] Found at least {total_new_keys} new keys across {len(partitions_to_process)} partitions")
|
||||||
logger.info("[DRY RUN] First 3 keys:")
|
logger.info("[DRY RUN] First 3 keys:")
|
||||||
for i, key in enumerate(sample_keys[:3]):
|
for i, key in enumerate(first_keys_found[:3]):
|
||||||
logger.info(
|
logger.info(
|
||||||
f" {i+1}. ({key.get('UnitName')}, {key.get('ToolNameID')}, "
|
f" {i+1}. ({key.get('UnitName')}, {key.get('ToolNameID')}, "
|
||||||
f"{key.get('EventDate')}, {key.get('EventTime')})"
|
f"{key.get('EventDate')}, {key.get('EventTime')})"
|
||||||
@@ -130,17 +145,16 @@ class IncrementalMigrator:
|
|||||||
logger.info(
|
logger.info(
|
||||||
f"[DRY RUN] Run without --dry-run to perform actual migration"
|
f"[DRY RUN] Run without --dry-run to perform actual migration"
|
||||||
)
|
)
|
||||||
# Return a positive number to indicate there's data to migrate
|
return total_new_keys
|
||||||
return len(sample_keys)
|
|
||||||
else:
|
else:
|
||||||
logger.info("[DRY RUN] No new keys found - database is up to date")
|
logger.info("[DRY RUN] No new keys found - database is up to date")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
# Migrate new keys
|
# Migrate new keys
|
||||||
migrated_rows = 0
|
migrated_rows = 0
|
||||||
offset = 0
|
|
||||||
insert_buffer = []
|
insert_buffer = []
|
||||||
buffer_size = self.settings.migration.consolidation_group_limit // 10
|
buffer_size = self.settings.migration.consolidation_group_limit // 10
|
||||||
|
last_processed_key = None # Track last key for final state update
|
||||||
|
|
||||||
with ProgressTracker(
|
with ProgressTracker(
|
||||||
total=None, # Unknown total
|
total=None, # Unknown total
|
||||||
@@ -150,22 +164,31 @@ class IncrementalMigrator:
|
|||||||
# Get column order for PostgreSQL insert
|
# Get column order for PostgreSQL insert
|
||||||
pg_columns = self._get_pg_columns()
|
pg_columns = self._get_pg_columns()
|
||||||
|
|
||||||
|
# Process each partition
|
||||||
|
for partition_idx, partition in enumerate(partitions_to_process, 1):
|
||||||
|
logger.info(
|
||||||
|
f"[{partition_idx}/{len(partitions_to_process)}] "
|
||||||
|
f"Processing partition {partition}..."
|
||||||
|
)
|
||||||
|
|
||||||
|
# For first partition (same year as last_key), fetch keys AFTER last_key
|
||||||
|
# For subsequent partitions, fetch ALL keys (they're all new)
|
||||||
|
use_after_key = last_key if partition == partitions_to_process[0] else None
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# Fetch batch of consolidation keys AFTER last_key
|
# Fetch batch of consolidation keys from this partition
|
||||||
logger.debug(f"Fetching keys after last_key with offset={offset}")
|
keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
|
||||||
keys = mysql_conn.fetch_consolidation_keys_after(
|
|
||||||
mysql_table,
|
mysql_table,
|
||||||
after_key=last_key,
|
partition=partition,
|
||||||
min_mysql_id=max_mysql_id,
|
after_key=use_after_key,
|
||||||
limit=self.settings.migration.consolidation_group_limit,
|
limit=self.settings.migration.consolidation_group_limit
|
||||||
offset=offset
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if not keys:
|
if not keys:
|
||||||
logger.info("No more new keys to migrate")
|
logger.info(f" No more keys in partition {partition}")
|
||||||
break
|
break
|
||||||
|
|
||||||
logger.info(f"Processing {len(keys)} new keys (offset={offset})")
|
logger.info(f" Processing {len(keys)} keys from {partition}...")
|
||||||
|
|
||||||
# Process each consolidation key
|
# Process each consolidation key
|
||||||
keys_processed = 0
|
keys_processed = 0
|
||||||
@@ -179,7 +202,20 @@ class IncrementalMigrator:
|
|||||||
event_date = key.get("EventDate")
|
event_date = key.get("EventDate")
|
||||||
event_time = key.get("EventTime")
|
event_time = key.get("EventTime")
|
||||||
|
|
||||||
|
# Validate consolidation key before fetching
|
||||||
|
is_valid, error_reason = validate_consolidation_key(
|
||||||
|
unit_name, tool_name_id, event_date, event_time
|
||||||
|
)
|
||||||
|
|
||||||
|
if not is_valid:
|
||||||
|
# Log invalid key and skip
|
||||||
|
error_logger.log_invalid_key(
|
||||||
|
unit_name, tool_name_id, event_date, event_time, error_reason
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
# Fetch all MySQL rows for this key (all nodes, all partitions)
|
# Fetch all MySQL rows for this key (all nodes, all partitions)
|
||||||
|
try:
|
||||||
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
|
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
|
||||||
mysql_table,
|
mysql_table,
|
||||||
unit_name,
|
unit_name,
|
||||||
@@ -187,6 +223,13 @@ class IncrementalMigrator:
|
|||||||
event_date,
|
event_date,
|
||||||
event_time
|
event_time
|
||||||
)
|
)
|
||||||
|
except Exception as e:
|
||||||
|
# Log corrupted key that caused fetch error
|
||||||
|
error_logger.log_invalid_key(
|
||||||
|
unit_name, tool_name_id, event_date, event_time,
|
||||||
|
f"Fetch failed: {e}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
if not mysql_rows:
|
if not mysql_rows:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
@@ -208,6 +251,14 @@ class IncrementalMigrator:
|
|||||||
# Add to insert buffer
|
# Add to insert buffer
|
||||||
insert_buffer.append(pg_row)
|
insert_buffer.append(pg_row)
|
||||||
|
|
||||||
|
# Track last processed key
|
||||||
|
last_processed_key = {
|
||||||
|
"unit_name": unit_name,
|
||||||
|
"tool_name_id": tool_name_id,
|
||||||
|
"event_date": str(event_date) if event_date else None,
|
||||||
|
"event_time": str(event_time) if event_time else None,
|
||||||
|
}
|
||||||
|
|
||||||
# Flush buffer when full
|
# Flush buffer when full
|
||||||
if len(insert_buffer) >= buffer_size:
|
if len(insert_buffer) >= buffer_size:
|
||||||
# Use COPY with ON CONFLICT to handle duplicates
|
# Use COPY with ON CONFLICT to handle duplicates
|
||||||
@@ -220,13 +271,7 @@ class IncrementalMigrator:
|
|||||||
migrated_rows += inserted
|
migrated_rows += inserted
|
||||||
progress.update(inserted)
|
progress.update(inserted)
|
||||||
|
|
||||||
# Update state with last key
|
# Update state with last key (from tracked variable)
|
||||||
last_processed_key = {
|
|
||||||
"unit_name": unit_name,
|
|
||||||
"tool_name_id": tool_name_id,
|
|
||||||
"event_date": str(event_date) if event_date else None,
|
|
||||||
"event_time": str(event_time) if event_time else None,
|
|
||||||
}
|
|
||||||
state_mgr.update_state(
|
state_mgr.update_state(
|
||||||
last_key=last_processed_key,
|
last_key=last_processed_key,
|
||||||
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
||||||
@@ -237,12 +282,15 @@ class IncrementalMigrator:
|
|||||||
)
|
)
|
||||||
insert_buffer = []
|
insert_buffer = []
|
||||||
|
|
||||||
# Move to next batch of keys
|
# After processing all keys in batch, update use_after_key for next iteration
|
||||||
offset += len(keys)
|
if keys:
|
||||||
|
last_key_in_batch = keys[-1]
|
||||||
# If we got fewer keys than requested, we're done
|
use_after_key = {
|
||||||
if len(keys) < self.settings.migration.consolidation_group_limit:
|
"unit_name": last_key_in_batch.get("UnitName"),
|
||||||
break
|
"tool_name_id": last_key_in_batch.get("ToolNameID"),
|
||||||
|
"event_date": str(last_key_in_batch.get("EventDate")) if last_key_in_batch.get("EventDate") else None,
|
||||||
|
"event_time": str(last_key_in_batch.get("EventTime")) if last_key_in_batch.get("EventTime") else None,
|
||||||
|
}
|
||||||
|
|
||||||
# Flush remaining buffer
|
# Flush remaining buffer
|
||||||
if insert_buffer:
|
if insert_buffer:
|
||||||
@@ -257,13 +305,24 @@ class IncrementalMigrator:
|
|||||||
progress.update(inserted)
|
progress.update(inserted)
|
||||||
logger.debug(f"Final flush: {inserted} rows")
|
logger.debug(f"Final flush: {inserted} rows")
|
||||||
|
|
||||||
|
# Update state with last key after final flush
|
||||||
|
if last_processed_key:
|
||||||
|
state_mgr.update_state(
|
||||||
|
last_key=last_processed_key,
|
||||||
|
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
||||||
|
)
|
||||||
|
|
||||||
# Get final row count
|
# Get final row count
|
||||||
final_count = pg_conn.get_row_count(pg_table)
|
final_count = pg_conn.get_row_count(pg_table)
|
||||||
logger.info(f"Total PostgreSQL rows: {final_count}")
|
logger.info(f"Total PostgreSQL rows: {final_count}")
|
||||||
|
|
||||||
|
# Close error logger and get count
|
||||||
|
error_logger.close()
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"✓ Incremental migration complete: "
|
f"✓ Incremental migration complete: "
|
||||||
f"{migrated_rows} new rows migrated to {pg_table}"
|
f"{migrated_rows} new rows migrated to {pg_table}, "
|
||||||
|
f"{error_logger.get_error_count()} invalid keys skipped"
|
||||||
)
|
)
|
||||||
|
|
||||||
return migrated_rows
|
return migrated_rows
|
||||||
|
|||||||
@@ -120,6 +120,9 @@ class PartitionMigrator:
|
|||||||
description=f"Streaming {mysql_table} partition {partition_name}"
|
description=f"Streaming {mysql_table} partition {partition_name}"
|
||||||
) as progress:
|
) as progress:
|
||||||
|
|
||||||
|
# Log before starting to fetch (this query can take several minutes for large partitions)
|
||||||
|
logger.info(f"Fetching consolidation groups from MySQL partition {partition_name}...")
|
||||||
|
|
||||||
# Use fetch_consolidation_groups_from_partition with start_key for efficient resume
|
# Use fetch_consolidation_groups_from_partition with start_key for efficient resume
|
||||||
# MySQL will skip all keys <= start_key using WHERE clause (no unnecessary data transfer)
|
# MySQL will skip all keys <= start_key using WHERE clause (no unnecessary data transfer)
|
||||||
for group in mysql_conn.fetch_consolidation_groups_from_partition(
|
for group in mysql_conn.fetch_consolidation_groups_from_partition(
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ Tracks migration progress with:
|
|||||||
- status: pending, in_progress, completed
|
- status: pending, in_progress, completed
|
||||||
"""
|
"""
|
||||||
from typing import Optional, Dict, Any
|
from typing import Optional, Dict, Any
|
||||||
from datetime import datetime
|
from datetime import datetime, timezone
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from src.connectors.postgres_connector import PostgreSQLConnector
|
from src.connectors.postgres_connector import PostgreSQLConnector
|
||||||
@@ -199,7 +199,7 @@ class StateManager:
|
|||||||
|
|
||||||
if mark_completed:
|
if mark_completed:
|
||||||
updates.append("migration_completed_at = %s")
|
updates.append("migration_completed_at = %s")
|
||||||
params.append(datetime.utcnow())
|
params.append(datetime.now(timezone.utc))
|
||||||
if status is None:
|
if status is None:
|
||||||
updates.append("status = 'completed'")
|
updates.append("status = 'completed'")
|
||||||
|
|
||||||
|
|||||||
@@ -183,8 +183,8 @@ CREATE TABLE IF NOT EXISTS migration_state (
|
|||||||
table_name VARCHAR(255) NOT NULL,
|
table_name VARCHAR(255) NOT NULL,
|
||||||
partition_name VARCHAR(255) NOT NULL,
|
partition_name VARCHAR(255) NOT NULL,
|
||||||
last_key JSONB,
|
last_key JSONB,
|
||||||
migration_started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
migration_started_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
|
||||||
migration_completed_at TIMESTAMP,
|
migration_completed_at TIMESTAMPTZ,
|
||||||
total_rows_migrated BIGINT DEFAULT 0,
|
total_rows_migrated BIGINT DEFAULT 0,
|
||||||
status VARCHAR(32) DEFAULT 'pending',
|
status VARCHAR(32) DEFAULT 'pending',
|
||||||
PRIMARY KEY (table_name, partition_name),
|
PRIMARY KEY (table_name, partition_name),
|
||||||
|
|||||||
@@ -10,21 +10,30 @@ logger = get_logger(__name__)
|
|||||||
class ErrorLogger:
|
class ErrorLogger:
|
||||||
"""Log invalid migration keys to a file."""
|
"""Log invalid migration keys to a file."""
|
||||||
|
|
||||||
def __init__(self, table: str, partition: str):
|
def __init__(self, table: str, partition: str, use_timestamp: bool = False):
|
||||||
"""Initialize error logger.
|
"""Initialize error logger.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
table: Table name
|
table: Table name
|
||||||
partition: Partition name
|
partition: Partition name (e.g., 'p2024' or 'incremental')
|
||||||
|
use_timestamp: If True, add timestamp to filename (for incremental migrations)
|
||||||
"""
|
"""
|
||||||
self.table = table
|
self.table = table
|
||||||
self.partition = partition
|
self.partition = partition
|
||||||
|
|
||||||
|
# Add timestamp to filename for incremental migrations to avoid overwriting
|
||||||
|
if use_timestamp or partition == "incremental":
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
self.error_file = f"migration_errors_{table}_{partition}_{timestamp}.log"
|
||||||
|
else:
|
||||||
self.error_file = f"migration_errors_{table}_{partition}.log"
|
self.error_file = f"migration_errors_{table}_{partition}.log"
|
||||||
|
|
||||||
self.error_count = 0
|
self.error_count = 0
|
||||||
|
|
||||||
# Create error file with header
|
# Create error file with header
|
||||||
with open(self.error_file, "w") as f:
|
with open(self.error_file, "w") as f:
|
||||||
f.write(f"# Migration errors for {table} partition {partition}\n")
|
f.write(f"# Migration errors for {table} partition {partition}\n")
|
||||||
|
f.write(f"# Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||||
f.write("# Format: UnitName|ToolNameID|EventDate|EventTime|Reason\n\n")
|
f.write("# Format: UnitName|ToolNameID|EventDate|EventTime|Reason\n\n")
|
||||||
|
|
||||||
logger.info(f"Error log file created: {self.error_file}")
|
logger.info(f"Error log file created: {self.error_file}")
|
||||||
@@ -99,6 +108,10 @@ def validate_consolidation_key(
|
|||||||
if unit_name is None or unit_name == "":
|
if unit_name is None or unit_name == "":
|
||||||
return False, "UnitName is NULL or empty"
|
return False, "UnitName is NULL or empty"
|
||||||
|
|
||||||
|
# Check for corrupted Java strings (like '[Ljava.lang.String;@...')
|
||||||
|
if isinstance(unit_name, str) and unit_name.startswith("[L"):
|
||||||
|
return False, f"UnitName is corrupted Java string: {unit_name}"
|
||||||
|
|
||||||
if tool_name_id is None or tool_name_id == "":
|
if tool_name_id is None or tool_name_id == "":
|
||||||
return False, "ToolNameID is NULL or empty"
|
return False, "ToolNameID is NULL or empty"
|
||||||
|
|
||||||
|
|||||||
593
web_ui.py
Normal file
593
web_ui.py
Normal file
@@ -0,0 +1,593 @@
|
|||||||
|
"""Web UI for MySQL to PostgreSQL migration monitoring and control.
|
||||||
|
|
||||||
|
Provides a Gradio-based interface for:
|
||||||
|
- Viewing migration status and progress
|
||||||
|
- Starting/monitoring migrations
|
||||||
|
- Viewing logs and errors
|
||||||
|
- Performance metrics and graphs
|
||||||
|
"""
|
||||||
|
import gradio as gr
|
||||||
|
import pandas as pd
|
||||||
|
import plotly.graph_objects as go
|
||||||
|
import plotly.express as px
|
||||||
|
from pathlib import Path
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional, Dict, List, Tuple
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from config import get_settings
|
||||||
|
from src.connectors.postgres_connector import PostgreSQLConnector
|
||||||
|
from src.migrator.full_migrator import run_full_migration
|
||||||
|
from src.migrator.parallel_migrator import run_parallel_migration
|
||||||
|
from src.utils.logger import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
# Global state for tracking running migrations
|
||||||
|
running_migrations = {}
|
||||||
|
migration_logs = {}
|
||||||
|
|
||||||
|
|
||||||
|
def get_migration_state_df() -> pd.DataFrame:
|
||||||
|
"""Fetch current migration state from PostgreSQL.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DataFrame with columns: table_name, partition_name, status,
|
||||||
|
total_rows_migrated, migration_started_at, migration_completed_at
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with PostgreSQLConnector() as pg_conn:
|
||||||
|
with pg_conn.connection.cursor() as cursor:
|
||||||
|
cursor.execute("""
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
partition_name,
|
||||||
|
status,
|
||||||
|
total_rows_migrated,
|
||||||
|
migration_started_at,
|
||||||
|
migration_completed_at,
|
||||||
|
last_key
|
||||||
|
FROM migration_state
|
||||||
|
WHERE partition_name != '_global'
|
||||||
|
ORDER BY table_name, partition_name
|
||||||
|
""")
|
||||||
|
|
||||||
|
rows = cursor.fetchall()
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
return pd.DataFrame(columns=[
|
||||||
|
'Table', 'Partition', 'Status', 'Rows Migrated',
|
||||||
|
'Started At', 'Completed At'
|
||||||
|
])
|
||||||
|
|
||||||
|
df = pd.DataFrame(rows, columns=[
|
||||||
|
'Table', 'Partition', 'Status', 'Rows Migrated',
|
||||||
|
'Started At', 'Completed At', 'Last Key'
|
||||||
|
])
|
||||||
|
|
||||||
|
# Format dates
|
||||||
|
df['Started At'] = pd.to_datetime(df['Started At']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
df['Completed At'] = pd.to_datetime(df['Completed At']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
df['Completed At'] = df['Completed At'].fillna('-')
|
||||||
|
|
||||||
|
# Drop Last Key column for display (too verbose)
|
||||||
|
df = df.drop('Last Key', axis=1)
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to fetch migration state: {e}")
|
||||||
|
return pd.DataFrame(columns=[
|
||||||
|
'Table', 'Partition', 'Status', 'Rows Migrated',
|
||||||
|
'Started At', 'Completed At'
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def get_migration_summary() -> Dict[str, any]:
|
||||||
|
"""Get summary statistics for migrations.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with total/completed/in_progress/pending counts per table
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with PostgreSQLConnector() as pg_conn:
|
||||||
|
with pg_conn.connection.cursor() as cursor:
|
||||||
|
cursor.execute("""
|
||||||
|
SELECT
|
||||||
|
table_name,
|
||||||
|
status,
|
||||||
|
COUNT(*) as count,
|
||||||
|
SUM(total_rows_migrated) as total_rows
|
||||||
|
FROM migration_state
|
||||||
|
WHERE partition_name != '_global'
|
||||||
|
GROUP BY table_name, status
|
||||||
|
ORDER BY table_name, status
|
||||||
|
""")
|
||||||
|
|
||||||
|
rows = cursor.fetchall()
|
||||||
|
|
||||||
|
summary = {
|
||||||
|
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
|
||||||
|
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
|
||||||
|
}
|
||||||
|
|
||||||
|
for table, status, count, total_rows in rows:
|
||||||
|
table_upper = table.upper()
|
||||||
|
if table_upper in summary:
|
||||||
|
summary[table_upper][status] = count
|
||||||
|
summary[table_upper]['total_rows'] += total_rows or 0
|
||||||
|
|
||||||
|
return summary
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to fetch migration summary: {e}")
|
||||||
|
return {
|
||||||
|
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
|
||||||
|
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def create_status_chart() -> go.Figure:
|
||||||
|
"""Create a bar chart showing migration status by table.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Plotly figure
|
||||||
|
"""
|
||||||
|
summary = get_migration_summary()
|
||||||
|
|
||||||
|
tables = []
|
||||||
|
completed = []
|
||||||
|
in_progress = []
|
||||||
|
pending = []
|
||||||
|
|
||||||
|
for table, stats in summary.items():
|
||||||
|
tables.append(table)
|
||||||
|
completed.append(stats['completed'])
|
||||||
|
in_progress.append(stats['in_progress'])
|
||||||
|
pending.append(stats['pending'])
|
||||||
|
|
||||||
|
fig = go.Figure(data=[
|
||||||
|
go.Bar(name='Completed', x=tables, y=completed, marker_color='green'),
|
||||||
|
go.Bar(name='In Progress', x=tables, y=in_progress, marker_color='orange'),
|
||||||
|
go.Bar(name='Pending', x=tables, y=pending, marker_color='gray')
|
||||||
|
])
|
||||||
|
|
||||||
|
fig.update_layout(
|
||||||
|
barmode='stack',
|
||||||
|
title='Migration Status by Table',
|
||||||
|
xaxis_title='Table',
|
||||||
|
yaxis_title='Number of Partitions',
|
||||||
|
height=400
|
||||||
|
)
|
||||||
|
|
||||||
|
return fig
|
||||||
|
|
||||||
|
|
||||||
|
def get_error_logs() -> List[Tuple[str, str]]:
|
||||||
|
"""Get list of error log files with their paths.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of (filename, filepath) tuples
|
||||||
|
"""
|
||||||
|
error_logs = []
|
||||||
|
for log_file in Path('.').glob('migration_errors_*.log'):
|
||||||
|
error_logs.append((log_file.name, str(log_file)))
|
||||||
|
|
||||||
|
return sorted(error_logs, key=lambda x: x[0], reverse=True)
|
||||||
|
|
||||||
|
|
||||||
|
def read_error_log(log_file: str) -> str:
|
||||||
|
"""Read contents of an error log file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
log_file: Path to log file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Contents of log file
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not log_file or log_file == "Select a log file...":
|
||||||
|
return "No log file selected"
|
||||||
|
|
||||||
|
with open(log_file, 'r') as f:
|
||||||
|
content = f.read()
|
||||||
|
|
||||||
|
if not content:
|
||||||
|
return f"Log file {log_file} is empty (no errors logged)"
|
||||||
|
|
||||||
|
return content
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error reading log file: {e}"
|
||||||
|
|
||||||
|
|
||||||
|
def start_migration_task(
|
||||||
|
table: str,
|
||||||
|
parallel_workers: int,
|
||||||
|
resume: bool,
|
||||||
|
dry_run: bool,
|
||||||
|
partition: Optional[str] = None
|
||||||
|
) -> str:
|
||||||
|
"""Start a migration in a background thread.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table: Table name (RAWDATACOR, ELABDATADISP, or all)
|
||||||
|
parallel_workers: Number of parallel workers (0 = sequential)
|
||||||
|
resume: Whether to resume from last checkpoint
|
||||||
|
dry_run: Whether to run in dry-run mode
|
||||||
|
partition: Optional partition name for single partition migration
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status message
|
||||||
|
"""
|
||||||
|
task_id = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||||
|
|
||||||
|
if task_id in running_migrations:
|
||||||
|
return f"❌ Migration already running for {table}"
|
||||||
|
|
||||||
|
# Validate options
|
||||||
|
if parallel_workers > 0 and partition:
|
||||||
|
return "❌ Cannot use parallel workers with single partition mode"
|
||||||
|
|
||||||
|
# Initialize log buffer for this task
|
||||||
|
migration_logs[task_id] = []
|
||||||
|
|
||||||
|
def run_migration():
|
||||||
|
"""Worker function to run migration."""
|
||||||
|
try:
|
||||||
|
migration_logs[task_id].append(f"Starting migration for {table}...")
|
||||||
|
migration_logs[task_id].append(f"Mode: {'Parallel' if parallel_workers > 0 else 'Sequential'}")
|
||||||
|
migration_logs[task_id].append(f"Resume: {resume}, Dry-run: {dry_run}")
|
||||||
|
|
||||||
|
if parallel_workers > 0:
|
||||||
|
# Parallel migration
|
||||||
|
rows = run_parallel_migration(
|
||||||
|
table,
|
||||||
|
num_workers=parallel_workers,
|
||||||
|
dry_run=dry_run,
|
||||||
|
resume=resume
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Sequential migration
|
||||||
|
rows = run_full_migration(
|
||||||
|
table,
|
||||||
|
dry_run=dry_run,
|
||||||
|
resume=resume,
|
||||||
|
partition=partition
|
||||||
|
)
|
||||||
|
|
||||||
|
migration_logs[task_id].append(f"✓ Migration complete: {rows:,} rows migrated")
|
||||||
|
running_migrations[task_id] = 'completed'
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
migration_logs[task_id].append(f"❌ Migration failed: {e}")
|
||||||
|
running_migrations[task_id] = 'failed'
|
||||||
|
logger.error(f"Migration failed: {e}")
|
||||||
|
|
||||||
|
# Start migration in background thread
|
||||||
|
thread = threading.Thread(target=run_migration, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
running_migrations[task_id] = 'running'
|
||||||
|
|
||||||
|
return f"✓ Migration started: {task_id}\nCheck the 'Logs' tab for progress"
|
||||||
|
|
||||||
|
|
||||||
|
def start_incremental_migration_task(
|
||||||
|
table: str,
|
||||||
|
dry_run: bool
|
||||||
|
) -> str:
|
||||||
|
"""Start an incremental migration in a background thread.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table: Table name (RAWDATACOR, ELABDATADISP, or all)
|
||||||
|
dry_run: Whether to run in dry-run mode
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Status message
|
||||||
|
"""
|
||||||
|
task_id = f"incremental_{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||||
|
|
||||||
|
if task_id in running_migrations:
|
||||||
|
return f"❌ Incremental migration already running for {table}"
|
||||||
|
|
||||||
|
# Initialize log buffer for this task
|
||||||
|
migration_logs[task_id] = []
|
||||||
|
|
||||||
|
def run_migration():
|
||||||
|
"""Worker function to run incremental migration."""
|
||||||
|
try:
|
||||||
|
migration_logs[task_id].append(f"Starting INCREMENTAL migration for {table}...")
|
||||||
|
migration_logs[task_id].append(f"Dry-run: {dry_run}")
|
||||||
|
migration_logs[task_id].append("This will sync only NEW data added since last migration")
|
||||||
|
|
||||||
|
from src.migrator.incremental_migrator import run_incremental_migration
|
||||||
|
|
||||||
|
rows = run_incremental_migration(
|
||||||
|
table,
|
||||||
|
dry_run=dry_run
|
||||||
|
)
|
||||||
|
|
||||||
|
migration_logs[task_id].append(f"✓ Incremental migration complete: {rows:,} rows migrated")
|
||||||
|
running_migrations[task_id] = 'completed'
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
migration_logs[task_id].append(f"❌ Incremental migration failed: {e}")
|
||||||
|
running_migrations[task_id] = 'failed'
|
||||||
|
logger.error(f"Incremental migration failed: {e}")
|
||||||
|
|
||||||
|
# Start migration in background thread
|
||||||
|
thread = threading.Thread(target=run_migration, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
running_migrations[task_id] = 'running'
|
||||||
|
|
||||||
|
return f"✓ Incremental migration started: {task_id}\nCheck the 'Logs' tab for progress"
|
||||||
|
|
||||||
|
|
||||||
|
def get_migration_logs(task_id: str) -> str:
|
||||||
|
"""Get logs for a specific migration task.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: Task identifier
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Log contents
|
||||||
|
"""
|
||||||
|
if not task_id or task_id == "No migrations running":
|
||||||
|
return "Select a migration to view logs"
|
||||||
|
|
||||||
|
logs = migration_logs.get(task_id, [])
|
||||||
|
if not logs:
|
||||||
|
return f"No logs available for {task_id}"
|
||||||
|
|
||||||
|
return "\n".join(logs)
|
||||||
|
|
||||||
|
|
||||||
|
def refresh_migration_state():
|
||||||
|
"""Refresh migration state display."""
|
||||||
|
return get_migration_state_df()
|
||||||
|
|
||||||
|
|
||||||
|
def refresh_status_chart():
|
||||||
|
"""Refresh status chart."""
|
||||||
|
return create_status_chart()
|
||||||
|
|
||||||
|
|
||||||
|
def refresh_running_migrations() -> gr.Dropdown:
|
||||||
|
"""Refresh list of running migrations."""
|
||||||
|
if not running_migrations:
|
||||||
|
return gr.Dropdown(choices=["No migrations running"], value="No migrations running")
|
||||||
|
|
||||||
|
choices = list(running_migrations.keys())
|
||||||
|
return gr.Dropdown(choices=choices, value=choices[0] if choices else None)
|
||||||
|
|
||||||
|
|
||||||
|
# Build Gradio interface
|
||||||
|
with gr.Blocks(title="MySQL to PostgreSQL Migration", theme=gr.themes.Soft()) as demo:
|
||||||
|
gr.Markdown("# 🔄 MySQL to PostgreSQL Migration Dashboard")
|
||||||
|
gr.Markdown("Monitor and control data migration from MySQL to PostgreSQL")
|
||||||
|
|
||||||
|
with gr.Tabs():
|
||||||
|
# Tab 1: Overview
|
||||||
|
with gr.Tab("📊 Overview"):
|
||||||
|
gr.Markdown("## Migration Status Overview")
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
refresh_btn = gr.Button("🔄 Refresh", scale=1)
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
status_chart = gr.Plot(label="Partition Status by Table")
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
state_table = gr.Dataframe(
|
||||||
|
value=get_migration_state_df(),
|
||||||
|
label="Migration State (All Partitions)",
|
||||||
|
interactive=False,
|
||||||
|
wrap=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Auto-refresh every 10 seconds
|
||||||
|
refresh_btn.click(
|
||||||
|
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
|
||||||
|
outputs=[state_table, status_chart]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initial load
|
||||||
|
demo.load(
|
||||||
|
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
|
||||||
|
outputs=[state_table, status_chart]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Tab 2: Start Migration
|
||||||
|
with gr.Tab("▶️ Start Migration"):
|
||||||
|
gr.Markdown("## Start New Migration")
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
table_select = gr.Dropdown(
|
||||||
|
choices=["RAWDATACOR", "ELABDATADISP", "all"],
|
||||||
|
value="RAWDATACOR",
|
||||||
|
label="Table to Migrate"
|
||||||
|
)
|
||||||
|
|
||||||
|
partition_input = gr.Textbox(
|
||||||
|
label="Partition (optional)",
|
||||||
|
placeholder="Leave empty for all partitions, or enter e.g. 'part7', 'd9'",
|
||||||
|
value=""
|
||||||
|
)
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
parallel_slider = gr.Slider(
|
||||||
|
minimum=0,
|
||||||
|
maximum=10,
|
||||||
|
value=0,
|
||||||
|
step=1,
|
||||||
|
label="Parallel Workers (0 = sequential)"
|
||||||
|
)
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
resume_check = gr.Checkbox(label="Resume from last checkpoint", value=True)
|
||||||
|
dry_run_check = gr.Checkbox(label="Dry run (simulate without writing)", value=False)
|
||||||
|
|
||||||
|
start_btn = gr.Button("▶️ Start Migration", variant="primary", size="lg")
|
||||||
|
|
||||||
|
migration_output = gr.Textbox(
|
||||||
|
label="Migration Status",
|
||||||
|
lines=3,
|
||||||
|
interactive=False
|
||||||
|
)
|
||||||
|
|
||||||
|
start_btn.click(
|
||||||
|
fn=start_migration_task,
|
||||||
|
inputs=[table_select, parallel_slider, resume_check, dry_run_check, partition_input],
|
||||||
|
outputs=migration_output
|
||||||
|
)
|
||||||
|
|
||||||
|
# Tab 3: Incremental Migration
|
||||||
|
with gr.Tab("🔄 Incremental Sync"):
|
||||||
|
gr.Markdown("## Incremental Migration (Sync New Data)")
|
||||||
|
gr.Markdown("""
|
||||||
|
Questa modalità sincronizza **solo i dati nuovi** aggiunti dopo l'ultima migrazione full.
|
||||||
|
|
||||||
|
**Come funziona:**
|
||||||
|
- Trova le nuove consolidation keys in MySQL che non esistono ancora in PostgreSQL
|
||||||
|
- Migra solo quelle chiavi (non riprocessa dati già migrati)
|
||||||
|
- Usa `migration_state` per tracciare l'ultima chiave processata
|
||||||
|
|
||||||
|
**Quando usare:**
|
||||||
|
- Dopo aver completato una migrazione full di tutte le partizioni
|
||||||
|
- Per sincronizzare periodicamente nuovi dati senza rifare tutto
|
||||||
|
- Per aggiornamenti quotidiani/settimanali
|
||||||
|
""")
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
inc_table_select = gr.Dropdown(
|
||||||
|
choices=["RAWDATACOR", "ELABDATADISP", "all"],
|
||||||
|
value="RAWDATACOR",
|
||||||
|
label="Table to Sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
inc_dry_run_check = gr.Checkbox(
|
||||||
|
label="Dry run (simulate without writing)",
|
||||||
|
value=False
|
||||||
|
)
|
||||||
|
|
||||||
|
inc_start_btn = gr.Button("🔄 Start Incremental Sync", variant="primary", size="lg")
|
||||||
|
|
||||||
|
inc_migration_output = gr.Textbox(
|
||||||
|
label="Sync Status",
|
||||||
|
lines=3,
|
||||||
|
interactive=False
|
||||||
|
)
|
||||||
|
|
||||||
|
inc_start_btn.click(
|
||||||
|
fn=start_incremental_migration_task,
|
||||||
|
inputs=[inc_table_select, inc_dry_run_check],
|
||||||
|
outputs=inc_migration_output
|
||||||
|
)
|
||||||
|
|
||||||
|
# Tab 4: Logs
|
||||||
|
with gr.Tab("📝 Logs"):
|
||||||
|
gr.Markdown("## Migration Logs")
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
running_migrations_dropdown = gr.Dropdown(
|
||||||
|
choices=["No migrations running"],
|
||||||
|
value="No migrations running",
|
||||||
|
label="Select Migration",
|
||||||
|
interactive=True
|
||||||
|
)
|
||||||
|
refresh_migrations_btn = gr.Button("🔄 Refresh List")
|
||||||
|
|
||||||
|
log_output = gr.Textbox(
|
||||||
|
label="Log Output",
|
||||||
|
lines=20,
|
||||||
|
interactive=False,
|
||||||
|
max_lines=50
|
||||||
|
)
|
||||||
|
|
||||||
|
refresh_migrations_btn.click(
|
||||||
|
fn=refresh_running_migrations,
|
||||||
|
outputs=running_migrations_dropdown
|
||||||
|
)
|
||||||
|
|
||||||
|
running_migrations_dropdown.change(
|
||||||
|
fn=get_migration_logs,
|
||||||
|
inputs=running_migrations_dropdown,
|
||||||
|
outputs=log_output
|
||||||
|
)
|
||||||
|
|
||||||
|
# Tab 5: Error Logs
|
||||||
|
with gr.Tab("⚠️ Error Logs"):
|
||||||
|
gr.Markdown("## Error Log Viewer")
|
||||||
|
gr.Markdown("View logged validation errors (invalid consolidation keys)")
|
||||||
|
|
||||||
|
with gr.Row():
|
||||||
|
error_log_dropdown = gr.Dropdown(
|
||||||
|
choices=[x[0] for x in get_error_logs()] if get_error_logs() else ["No error logs found"],
|
||||||
|
label="Select Error Log File",
|
||||||
|
value="Select a log file..."
|
||||||
|
)
|
||||||
|
refresh_error_logs_btn = gr.Button("🔄 Refresh")
|
||||||
|
|
||||||
|
error_log_content = gr.Textbox(
|
||||||
|
label="Error Log Contents",
|
||||||
|
lines=20,
|
||||||
|
interactive=False,
|
||||||
|
max_lines=50
|
||||||
|
)
|
||||||
|
|
||||||
|
def refresh_error_log_list():
|
||||||
|
logs = get_error_logs()
|
||||||
|
choices = [x[0] for x in logs] if logs else ["No error logs found"]
|
||||||
|
return gr.Dropdown(choices=choices)
|
||||||
|
|
||||||
|
def show_error_log(filename):
|
||||||
|
if not filename or filename == "Select a log file..." or filename == "No error logs found":
|
||||||
|
return "Select a log file to view its contents"
|
||||||
|
return read_error_log(filename)
|
||||||
|
|
||||||
|
refresh_error_logs_btn.click(
|
||||||
|
fn=refresh_error_log_list,
|
||||||
|
outputs=error_log_dropdown
|
||||||
|
)
|
||||||
|
|
||||||
|
error_log_dropdown.change(
|
||||||
|
fn=show_error_log,
|
||||||
|
inputs=error_log_dropdown,
|
||||||
|
outputs=error_log_content
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def launch_ui(share=False, server_port=7860):
|
||||||
|
"""Launch the Gradio UI.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
share: Whether to create a public share link
|
||||||
|
server_port: Port to run the server on
|
||||||
|
"""
|
||||||
|
demo.launch(
|
||||||
|
share=share,
|
||||||
|
server_port=server_port,
|
||||||
|
server_name="0.0.0.0" # Listen on all interfaces
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Parse command line args
|
||||||
|
share = "--share" in sys.argv
|
||||||
|
port = 7860
|
||||||
|
|
||||||
|
for arg in sys.argv:
|
||||||
|
if arg.startswith("--port="):
|
||||||
|
port = int(arg.split("=")[1])
|
||||||
|
|
||||||
|
print(f"Starting Migration Dashboard on port {port}...")
|
||||||
|
print(f"Share mode: {share}")
|
||||||
|
|
||||||
|
launch_ui(share=share, server_port=port)
|
||||||
Reference in New Issue
Block a user