Compare commits

...

10 Commits

Author SHA1 Message Date
3a53564bb5 con web ui 2026-01-11 21:56:50 +01:00
6306006f82 Fix: Update RAWDATACOR partition end_year and mapping logic
- Fix get_partitions_from_year(): RAWDATACOR end_year now 2030 (was 2024)
  - RAWDATACOR has partitions d12-d17 for years 2025-2030, not just part0-part10
- Update year_to_partition_name() for RAWDATACOR: handle both part and d suffix
  - Years 2014-2024: use "part" suffix with formula (year - 2014)
  - Years 2025-2030: use "d" suffix with formula (year - 2013) for d12-d17
- Clamp year to range [2014, 2030] for RAWDATACOR
- Update docstring examples to reflect new mapping behavior
- Now correctly generates partitions like: part8, part9, part10, d12, d13, ..., d17

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-01-11 15:55:41 +01:00
53cde5f667 Fix: Correct RAWDATACOR partition mapping logic
- Fix year_to_partition_name() RAWDATACOR logic: properly clamp year between 2014-2024
  before calculating partition index with formula (year - 2014)
- Previously: incorrectly tried to return "d" partition type with wrong formula
- Now: correctly returns "part{year-2014}" for RAWDATACOR table
- Update docstring: clarify d17 = 2030 (not 2031) as maximum ELABDATADISP partition
- Ensure partition mapping is consistent between year_to_partition_name() and
  get_partitions_from_year() functions

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-01-11 15:33:08 +01:00
d1dbf7f0de aggiunta log 2026-01-05 15:14:39 +01:00
931fec0959 fix logger x partition 2026-01-05 14:00:29 +01:00
a7d2d501fb fix: Use timezone-aware timestamps for migration state tracking
Fix timezone inconsistency between migration_started_at and migration_completed_at:

Schema Changes:
- Change TIMESTAMP to TIMESTAMPTZ for migration_started_at and migration_completed_at
- PostgreSQL stores timestamps in UTC and converts to local timezone on display
- Ensures consistent timezone handling across all timestamp columns

Code Changes:
- Replace datetime.utcnow() with datetime.now(timezone.utc)
- Use timezone-aware datetime objects for proper TIMESTAMPTZ handling
- Import timezone module for UTC timezone support

Impact:
- Previous issue: migration_completed_at was 1 hour behind migration_started_at
- Root cause: CURRENT_TIMESTAMP (local time) vs datetime.utcnow() (UTC naive)
- Solution: Both columns now use TIMESTAMPTZ with timezone-aware datetimes

Note: Existing migration_state records will have old TIMESTAMP format until table is altered:
  ALTER TABLE migration_state
    ALTER COLUMN migration_started_at TYPE TIMESTAMPTZ,
    ALTER COLUMN migration_completed_at TYPE TIMESTAMPTZ;

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-04 16:04:15 +01:00
23e9fc9d82 feat: Add error logging and fix incremental migration state tracking
Implement comprehensive error handling and fix state management bug in incremental migration:

Error Logging System:
- Add validation for consolidation keys (NULL dates, empty IDs, corrupted Java strings)
- Log invalid keys to dedicated error files with detailed reasons
- Full migration: migration_errors_<table>_<partition>.log
- Incremental migration: migration_errors_<table>_incremental_<timestamp>.log (timestamped to preserve history)
- Report total count of skipped invalid keys at migration completion
- Auto-delete empty error log files

State Tracking Fix:
- Fix critical bug where last_key wasn't updated after final buffer flush
- Track last_processed_key throughout migration loop
- Update state both during periodic flushes and after final flush
- Ensures incremental migration correctly resumes from last migrated key

Validation Checks:
- EventDate IS NULL or EventDate = '0000-00-00'
- EventTime IS NULL
- ToolNameID IS NULL or empty string
- UnitName IS NULL or empty string
- UnitName starting with '[L' (corrupted Java strings)

Documentation:
- Update README.md with error logging behavior
- Update MIGRATION_WORKFLOW.md with validation details
- Update CHANGELOG.md with new features and fixes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-01 19:49:44 +01:00
03e39eb925 fix docs 2025-12-30 15:33:32 +01:00
bcedae40fc fix .env.example var and docs 2025-12-30 15:29:26 +01:00
5f6e3215a5 clean docs 2025-12-30 15:24:19 +01:00
19 changed files with 2740 additions and 287 deletions

View File

@@ -13,10 +13,10 @@ POSTGRES_PASSWORD=your_postgres_password
POSTGRES_DATABASE=migrated_db
# Migration Settings
BATCH_SIZE=10000
LOG_LEVEL=INFO
DRY_RUN=false
CONSOLIDATION_GROUP_LIMIT=50000
CONSOLIDATION_GROUP_LIMIT=40000
PROGRESS_LOG_INTERVAL=10000
# Performance Testing
BENCHMARK_OUTPUT_DIR=benchmark_results

121
CHANGELOG.md Normal file
View File

@@ -0,0 +1,121 @@
# Changelog
## [Current] - 2025-12-30
### Added
- **Consolidation-based incremental migration**: Uses consolidation keys `(UnitName, ToolNameID, EventDate, EventTime)` instead of timestamps
- **MySQL ID optimization**: Uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries, avoiding full table scans
- **State management in PostgreSQL**: Replaced JSON file with `migration_state` table for more reliable tracking
- **Sync utility**: Added `scripts/sync_migration_state.py` to sync state with actual data
- **Performance optimization**: MySQL queries now instant using PRIMARY KEY filter
- **Data quality validation**: Automatically validates and logs invalid consolidation keys to dedicated error files
- **Error logging**: Invalid keys (null dates, empty tool IDs, corrupted Java strings) are logged and skipped during migration
- **Better documentation**: Consolidated and updated all documentation files
### Changed
- **Incremental migration**: Now uses consolidation keys instead of timestamp-based approach
- **Full migration**: Improved to save global `last_key` after completing all partitions
- **State tracking**: Moved from `migration_state.json` to PostgreSQL table `migration_state`
- **Query performance**: Added `min_mysql_id` parameter to `fetch_consolidation_keys_after()` for optimization
- **Configuration**: Renamed `BATCH_SIZE` to `CONSOLIDATION_GROUP_LIMIT` to better reflect what it controls
- **Configuration**: Added `PROGRESS_LOG_INTERVAL` to control logging frequency
- **Configuration**: Added `BENCHMARK_OUTPUT_DIR` to specify benchmark results directory
- **Documentation**: Updated README.md, MIGRATION_WORKFLOW.md, QUICKSTART.md, EXAMPLE_WORKFLOW.md with current implementation
- **Documentation**: Corrected index and partitioning documentation to reflect actual PostgreSQL schema:
- Uses `event_timestamp` (not separate event_date/event_time)
- Primary key includes `event_year` for partitioning
- Consolidation key is UNIQUE (unit_name, tool_name_id, event_timestamp, event_year)
### Removed
- **migration_state.json**: Replaced by PostgreSQL table
- **Timestamp-based migration**: Replaced by consolidation key-based approach
- **ID-based resumable migration**: Consolidated into single consolidation-based approach
- **Temporary debug scripts**: Cleaned up all `/tmp/` debug files
### Fixed
- **Incremental migration performance**: MySQL queries now ~1000x faster with ID filter
- **State synchronization**: Can now sync `migration_state` with actual data using utility script
- **Duplicate handling**: Uses `ON CONFLICT DO NOTHING` to prevent duplicates
- **Last key tracking**: Properly updates global state after full migration
- **Corrupted data handling**: Both full and incremental migrations now validate keys and log errors instead of crashing
### Error Logging
Both full and incremental migrations now handle corrupted consolidation keys gracefully:
**Error files:**
- Full migration: `migration_errors_<table>_<partition>.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
Each incremental migration creates a new timestamped file to preserve error history across runs.
**File format:**
```
# Migration errors for <table> partition <partition>
# Format: UnitName|ToolNameID|EventDate|EventTime|Reason
ID0350||0000-00-00|0:00:00|EventDate is invalid: 0000-00-00
[Ljava.lang.String;@abc123|TOOL1|2024-01-01|10:00:00|UnitName is corrupted Java string: [Ljava.lang.String;@abc123
UNIT1||2024-01-01|10:00:00|ToolNameID is NULL or empty
```
**Behavior:**
- Invalid keys are automatically skipped to prevent migration failure
- Each skipped key is logged with the reason for rejection
- Total count of skipped keys is reported at the end of migration
- Empty error files (no errors) are automatically deleted
### Migration Guide (from old to new)
If you have an existing installation with `migration_state.json`:
1. **Backup your data** (optional but recommended):
```bash
cp migration_state.json migration_state.json.backup
```
2. **Run full migration** to populate `migration_state` table:
```bash
python main.py migrate full
```
3. **Sync state** (if you have existing data):
```bash
python scripts/sync_migration_state.py
```
4. **Remove old state file**:
```bash
rm migration_state.json
```
5. **Run incremental migration**:
```bash
python main.py migrate incremental --dry-run
python main.py migrate incremental
```
### Performance Improvements
- **MySQL query time**: From 60+ seconds to <0.1 seconds (600x faster)
- **Consolidation efficiency**: Multiple MySQL rows → single PostgreSQL record
- **State reliability**: PostgreSQL table instead of JSON file
### Breaking Changes
- `--state-file` parameter removed from incremental migration (no longer uses JSON)
- `--use-id` flag removed (consolidation-based approach is now default)
- Incremental migration requires full migration to be run first
- `BATCH_SIZE` environment variable renamed to `CONSOLIDATION_GROUP_LIMIT` (update your .env file)
## [Previous] - Before 2025-12-30
### Features
- Full migration support
- Incremental migration with timestamp tracking
- JSONB transformation
- Partitioning by year
- GIN indexes for JSONB queries
- Benchmark system
- Progress tracking
- Rich logging

View File

@@ -74,8 +74,9 @@ nano .env
# POSTGRES_PASSWORD=password123
# POSTGRES_DATABASE=production_migrated
#
# BATCH_SIZE=50000 # Large batches for speed
# LOG_LEVEL=INFO
# CONSOLIDATION_GROUP_LIMIT=80000 # Large batches for speed
# PROGRESS_LOG_INTERVAL=20000
```
### 4. Verifica Configurazione (5 min)
@@ -335,8 +336,11 @@ python main.py setup --create-schema
### Migrazione molto lenta
```bash
# Aumentare batch size temporaneamente
# Editare .env: BATCH_SIZE=100000
# Aumentare consolidation group limit temporaneamente
# Editare .env: CONSOLIDATION_GROUP_LIMIT=100000
# Ridurre logging
# Editare .env: PROGRESS_LOG_INTERVAL=50000
# Oppure verificare:
# - Latency rete MySQL↔PostgreSQL

View File

@@ -2,244 +2,396 @@
## Overview
This tool supports three migration modes:
This tool implements **consolidation-based incremental migration** for two tables:
- **RAWDATACOR**: Raw sensor measurements
- **ELABDATADISP**: Elaborated/calculated data
1. **Full Migration** (`full_migration.py`) - Initial complete migration
2. **Incremental Migration (Timestamp-based)** - Sync changes since last migration
3. **Incremental Migration (ID-based)** - Resumable migration from last checkpoint
Both tables use **consolidation keys** to group and migrate data efficiently.
---
## 1. Initial Full Migration
## Migration Modes
### First Time Setup
### 1. Full Migration
Initial migration of all historical data, migrating one partition at a time.
```bash
# Create the PostgreSQL schema
python main.py setup --create-schema
# Migrate all partitions for all tables
python main.py migrate full
# Run full migration (one-time)
python main.py migrate --full RAWDATACOR
python main.py migrate --full ELABDATADISP
# Migrate specific table
python main.py migrate full --table RAWDATACOR
# Migrate specific partition (year-based)
python main.py migrate full --table ELABDATADISP --partition 2024
# Dry-run to see what would be migrated
python main.py migrate full --dry-run
```
**When to use:** First time migrating data or need complete fresh migration.
**Characteristics:**
- Fetches ALL rows from MySQL
- No checkpoint tracking
- Cannot resume if interrupted
- Good for initial data load
- Migrates data partition by partition (year-based)
- Uses consolidation groups for efficiency
- Tracks progress in `migration_state` table (PostgreSQL)
- Can resume from last completed partition if interrupted
- Uses `mysql_max_id` optimization for performance
---
## 2. Timestamp-based Incremental Migration
### 2. Incremental Migration
### For Continuous Sync (Recommended for most cases)
Sync only new data since the last migration.
```bash
# After initial full migration, use incremental with timestamps
python main.py migrate --incremental RAWDATACOR
python main.py migrate --incremental ELABDATADISP
# Migrate new data for all tables
python main.py migrate incremental
# Migrate specific table
python main.py migrate incremental --table ELABDATADISP
# Dry-run to check what would be migrated
python main.py migrate incremental --dry-run
```
**When to use:** Continuous sync of new/updated records.
**Characteristics:**
- Tracks `created_at` (RAWDATACOR) or `updated_at` (ELABDATADISP)
- Uses JSON state file (`migration_state.json`)
- Only fetches rows modified since last run
- Perfect for scheduled jobs (cron, airflow, etc.)
- Syncs changes but NOT deletions
- Uses **consolidation keys** to identify new records:
- `(UnitName, ToolNameID, EventDate, EventTime)`
- Tracks last migrated key in `migration_state` table
- Optimized with `min_mysql_id` filter for performance
- Handles duplicates with `ON CONFLICT DO NOTHING`
- Perfect for scheduled jobs (cron, systemd timers)
**How it works:**
1. First run: Returns with message "No previous migration found" - must run full migration first
2. Subsequent runs: Only fetches rows where `created_at` > last_migration_timestamp
3. Updates state file with new timestamp for next run
**Example workflow:**
```bash
# Day 1: Initial full migration
python main.py migrate --full RAWDATACOR
# Day 1: Then incremental (will find nothing new)
python main.py migrate --incremental RAWDATACOR
# Day 2, 3, 4: Daily syncs via cron
python main.py migrate --incremental RAWDATACOR
```
1. Retrieves `last_key` from `migration_state` table
2. Gets `MAX(mysql_max_id)` from PostgreSQL table for optimization
3. Queries MySQL: `WHERE id > max_mysql_id AND (key_tuple) > last_key`
4. Migrates new consolidation groups
5. Updates `migration_state` with new `last_key`
---
## 3. ID-based Incremental Migration (Resumable)
## Consolidation Keys
### For Large Datasets or Unreliable Connections
Both tables use consolidation to group multiple measurements into a single JSONB record.
```bash
# First run
python main.py migrate --incremental RAWDATACOR --use-id
### Consolidation Key Structure
# Can interrupt and resume multiple times
python main.py migrate --incremental RAWDATACOR --use-id
```sql
(UnitName, ToolNameID, EventDate, EventTime)
```
**When to use:**
- Large datasets that may timeout
- Need to resume from exact last position
- Network is unstable
### Data Quality Validation
**Characteristics:**
- Tracks `last_id` instead of timestamp
- Updates state file after EACH BATCH (not just at end)
- Can interrupt and resume dozens of times
- Resumes from exact record ID where it stopped
- Works with `migration_state.json`
The migration automatically validates and logs invalid consolidation keys:
- `EventDate IS NULL` or `EventDate = '0000-00-00'`
- `ToolNameID IS NULL` or `ToolNameID = ''` (empty string)
- `UnitName IS NULL` or `UnitName = ''` (empty string)
- `UnitName` starting with `[L` (corrupted Java strings like `[Ljava.lang.String;@...`)
- `EventTime IS NULL`
**How it works:**
1. First run: Starts from beginning (ID = 0)
2. Each batch: Updates state file with max ID from batch
3. Interrupt: Can stop at any time
4. Resume: Next run continues from last ID stored
5. Continues until all rows processed
Invalid keys are:
- **Logged to error files** for tracking and analysis
- **Skipped automatically** to prevent migration failures
- **Counted and reported** at the end of migration
**Example workflow for large dataset:**
```bash
# Start ID-based migration (will migrate in batches)
python main.py migrate --incremental RAWDATACOR --use-id
Error log files:
- Full migration: `migration_errors_<table>_<partition>.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
# [If interrupted after 1M rows processed]
Each incremental migration creates a new timestamped file to preserve history.
# Resume from ID 1M (automatically detects last position)
python main.py migrate --incremental RAWDATACOR --use-id
### Why Consolidation?
# [Continues until complete]
Instead of migrating individual sensor readings, we:
1. **Group** all measurements for the same (unit, tool, date, time)
2. **Transform** 16-25 columns into structured JSONB
3. **Migrate** as a single consolidated record
**Example:**
MySQL has 10 rows for `(Unit1, Tool1, 2024-01-01, 10:00:00)`:
```
id | UnitName | ToolNameID | EventDate | EventTime | Val0 | Val1 | ...
1 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.5 | 45.2 | ...
2 | Unit1 | Tool1 | 2024-01-01 | 10:00:00 | 23.6 | 45.3 | ...
...
```
PostgreSQL gets 1 consolidated record:
```json
{
"unit_name": "Unit1",
"tool_name_id": "Tool1",
"event_timestamp": "2024-01-01 10:00:00",
"measurements": {
"0": {"value": 23.5, "unit": "°C"},
"1": {"value": 45.2, "unit": "bar"},
...
},
"mysql_max_id": 10
}
```
---
## State Management
### State File Location
```
migration_state.json # In project root
### Migration State Table
The `migration_state` table in PostgreSQL tracks migration progress:
```sql
CREATE TABLE migration_state (
table_name VARCHAR(50),
partition_name VARCHAR(50),
last_key JSONB, -- Last migrated consolidation key
started_at TIMESTAMP,
completed_at TIMESTAMP,
total_rows INTEGER,
status VARCHAR(20)
);
```
### State File Content (Timestamp-based)
```json
{
"rawdatacor": {
"last_timestamp": "2024-12-11T19:30:45.123456",
"last_updated": "2024-12-11T19:30:45.123456",
"total_migrated": 50000
}
}
### State Records
- **Per-partition state**: Tracks each partition's progress
- Example: `('rawdatacor', '2024', {...}, '2024-01-15 10:30:00', 'completed', 1000000)`
- **Global state**: Tracks overall incremental migration position
- Example: `('rawdatacor', '_global', {...}, NULL, NULL, 0, 'in_progress')`
### Checking State
```sql
-- View all migration state
SELECT * FROM migration_state ORDER BY table_name, partition_name;
-- View global state (for incremental migration)
SELECT * FROM migration_state WHERE partition_name = '_global';
```
### State File Content (ID-based)
```json
{
"rawdatacor": {
"last_id": 1000000,
"total_migrated": 1000000,
"last_updated": "2024-12-11T19:45:30.123456"
}
}
---
## Performance Optimization
### MySQL ID Filter
The incremental migration uses `MAX(mysql_max_id)` from PostgreSQL to filter MySQL queries:
```sql
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM RAWDATACOR
WHERE id > 267399536 -- max_mysql_id from PostgreSQL
AND (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT 10000
```
### Reset Migration State
```python
from src.migrator.state import MigrationState
**Why this is fast:**
- Uses PRIMARY KEY index on `id` to skip millions of already-migrated rows
- Tuple comparison only applied to filtered subset
- Avoids full table scans
state = MigrationState()
### Consolidation Group Batching
# Reset specific table
state.reset("rawdatacor")
# Reset all tables
state.reset()
```
Instead of fetching individual rows, we:
1. Fetch 10,000 consolidation keys at a time
2. For each key, fetch all matching rows from MySQL
3. Transform and insert into PostgreSQL
4. Update state every batch
---
## Recommended Workflow
### For Daily Continuous Sync
```bash
# Week 1: Initial setup
python main.py setup --create-schema
python main.py migrate --full RAWDATACOR
python main.py migrate --full ELABDATADISP
### Initial Setup (One-time)
# Week 2+: Daily incremental syncs (via cron job)
# Schedule: `0 2 * * * cd /path/to/project && python main.py migrate --incremental RAWDATACOR`
python main.py migrate --incremental RAWDATACOR
python main.py migrate --incremental ELABDATADISP
```bash
# 1. Configure .env file
cp .env.example .env
nano .env
# 2. Create PostgreSQL schema
python main.py setup --create-schema
# 3. Run full migration
python main.py migrate full
```
### For Large Initial Migration
```bash
# If dataset > 10 million rows
python main.py setup --create-schema
python main.py migrate --incremental RAWDATACOR --use-id # Can interrupt/resume
### Daily Incremental Sync
# For subsequent syncs, use timestamp
python main.py migrate --incremental RAWDATACOR # Timestamp-based
```bash
# Run incremental migration (via cron or manual)
python main.py migrate incremental
```
**Cron example** (daily at 2 AM):
```cron
0 2 * * * cd /path/to/mysql2postgres && python main.py migrate incremental >> /var/log/migration.log 2>&1
```
---
## Key Differences at a Glance
## Resuming Interrupted Migrations
| Feature | Full | Timestamp | ID-based |
|---------|------|-----------|----------|
| Initial setup | ✅ Required first | ✅ After full | ✅ After full |
| Sync new/updated | ❌ No | ✅ Yes | ✅ Yes |
| Resumable | ❌ No | ⚠️ Partial* | ✅ Full |
| Batched state tracking | ❌ No | ❌ No | ✅ Yes |
| Large datasets | ⚠️ Risky | ✅ Good | ✅ Best |
| Scheduled jobs | ❌ No | ✅ Perfect | ⚠️ Unnecessary |
### Full Migration
*Timestamp mode can resume, but must wait for full batch to complete before continuing
If interrupted, full migration resumes from the last completed partition:
```bash
# First run: migrates partitions 2014, 2015, 2016... (interrupted after 2020)
python main.py migrate full --table RAWDATACOR
# Resume: continues from partition 2021
python main.py migrate full --table RAWDATACOR
```
### Incremental Migration
Incremental migration uses the `last_key` from `migration_state`:
```bash
# Always safe to re-run - uses ON CONFLICT DO NOTHING
python main.py migrate incremental
```
---
## Default Partitions
## Syncing Migration State
Both tables are partitioned by year (2014-2031) plus a DEFAULT partition:
- **rawdatacor_2014** through **rawdatacor_2031** (yearly partitions)
- **rawdatacor_default** (catches data outside 2014-2031)
If `migration_state` becomes out of sync with actual data, use the sync utility:
Same for ELABDATADISP. This ensures data with edge-case timestamps doesn't break migration.
```bash
# Sync migration_state with actual PostgreSQL data
python scripts/sync_migration_state.py
```
This finds the most recent row (by `created_at`) and updates `migration_state._global`.
---
## Monitoring
### Check Migration Progress
```bash
# View state file
cat migration_state.json
### Check Progress
# Check PostgreSQL row counts
psql -U postgres -h localhost -d your_db -c "SELECT COUNT(*) FROM rawdatacor;"
```bash
# View migration state
psql -h localhost -U postgres -d migrated_db -c \
"SELECT table_name, partition_name, status, total_rows, completed_at
FROM migration_state
ORDER BY table_name, partition_name"
```
### Common Issues
### Verify Row Counts
**"No previous migration found"** (Timestamp mode)
- Solution: Run full migration first with `--full` flag
```sql
-- PostgreSQL
SELECT COUNT(*) FROM rawdatacor;
SELECT COUNT(*) FROM elabdatadisp;
**"Duplicate key value violates unique constraint"**
- Cause: Running full migration twice
- Solution: Use timestamp-based incremental sync instead
-- Compare with MySQL
-- mysql> SELECT COUNT(DISTINCT UnitName, ToolNameID, EventDate, EventTime) FROM RAWDATACOR;
```
**"Timeout during migration"** (Large datasets)
- Solution: Switch to ID-based resumable migration with `--use-id`
---
## Common Issues
### "No previous migration found"
**Cause**: Running incremental migration before full migration
**Solution**: Run full migration first
```bash
python main.py migrate full
```
### "Duplicate key value violates unique constraint"
**Cause**: Data already exists (shouldn't happen with ON CONFLICT DO NOTHING)
**Solution**: Migration handles this automatically - check logs for details
### Slow Incremental Migration
**Cause**: `MAX(mysql_max_id)` query is slow (~60 seconds on large tables)
**Solution**: This is expected and only happens once at start. The MySQL queries are instant thanks to the optimization.
**Alternative**: Create an index on `mysql_max_id` in PostgreSQL (uses disk space):
```sql
CREATE INDEX idx_rawdatacor_mysql_max_id ON rawdatacor (mysql_max_id DESC);
CREATE INDEX idx_elabdatadisp_mysql_max_id ON elabdatadisp (mysql_max_id DESC);
```
---
## Key Technical Details
### Tuple Comparison in MySQL
MySQL supports lexicographic tuple comparison:
```sql
WHERE (UnitName, ToolNameID, EventDate, EventTime) > ('Unit1', 'Tool1', '2024-01-01', '10:00:00')
```
This is equivalent to:
```sql
WHERE UnitName > 'Unit1'
OR (UnitName = 'Unit1' AND ToolNameID > 'Tool1')
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate > '2024-01-01')
OR (UnitName = 'Unit1' AND ToolNameID = 'Tool1' AND EventDate = '2024-01-01' AND EventTime > '10:00:00')
```
But much more efficient!
### Partitioning in PostgreSQL
Tables are partitioned by year (2014-2031):
```sql
CREATE TABLE rawdatacor_2024 PARTITION OF rawdatacor
FOR VALUES FROM (2024) TO (2025);
```
PostgreSQL automatically routes INSERTs to the correct partition based on `event_year`.
### Indexes in PostgreSQL
Both tables have these indexes automatically created:
**Primary Key** (required for partitioned tables):
```sql
-- Must include partition key (event_year)
UNIQUE (id, event_year)
```
**Consolidation Key** (prevents duplicates):
```sql
-- Ensures one record per consolidation group
UNIQUE (unit_name, tool_name_id, event_timestamp, event_year)
```
**Query Optimization**:
```sql
-- Fast filtering by unit/tool
(unit_name, tool_name_id)
-- JSONB queries with GIN index
GIN (measurements)
```
**Note**: All indexes are automatically created on all partitions when you run `setup --create-schema`.
---
## Summary
- **Start with:** Full migration (`--full`) for initial data load
- **Then use:** Timestamp-based incremental (`--incremental`) for daily syncs
- **Switch to:** ID-based resumable (`--incremental --use-id`) if full migration is too large
1. **Full migration**: One-time initial load, partition by partition
2. **Incremental migration**: Daily sync of new data using consolidation keys
3. **State tracking**: PostgreSQL `migration_state` table
4. **Performance**: `mysql_max_id` filter + consolidation batching
5. **Resumable**: Both modes can resume from interruptions
6. **Safe**: ON CONFLICT DO NOTHING prevents duplicates

View File

@@ -42,8 +42,9 @@ POSTGRES_USER=postgres
POSTGRES_PASSWORD=pgpassword
POSTGRES_DATABASE=migrated_db
BATCH_SIZE=10000
LOG_LEVEL=INFO
CONSOLIDATION_GROUP_LIMIT=40000
PROGRESS_LOG_INTERVAL=10000
```
### 3. Creare PostgreSQL in Incus
@@ -111,8 +112,11 @@ python main.py migrate full --table RAWDATACOR
# Migrare solo i cambiamenti dal last sync
python main.py migrate incremental
# Con stato personalizzato
python main.py migrate incremental --state-file daily_sync.json
# Dry-run per vedere cosa verrebbe migrato
python main.py migrate incremental --dry-run
# Solo una tabella specifica
python main.py migrate incremental --table RAWDATACOR
```
### Benchmark
@@ -221,8 +225,11 @@ python main.py setup --create-schema
### "Migration is slow"
```bash
# Aumentare batch size in .env
BATCH_SIZE=50000
# Aumentare consolidation group limit in .env
CONSOLIDATION_GROUP_LIMIT=80000
# Oppure ridurre logging
PROGRESS_LOG_INTERVAL=20000
# Oppure ottimizzare MySQL
mysql> FLUSH PRIVILEGES;

BIN
README.md

Binary file not shown.

115
config.py
View File

@@ -187,3 +187,118 @@ TABLE_CONFIGS = {
"elabdatadisp": _elabdatadisp_config,
"ELABDATADISP": _elabdatadisp_config,
}
# Partition mapping utilities
def year_to_partition_name(year: int, table: str) -> str:
"""Map year to partition name.
Partition naming scheme (different for each table):
- RAWDATACOR: part0=2014, part1=2015, ..., part10=2024 (part{year-2014})
- ELABDATADISP: d0=2013, d1=2014, ..., d12=2025, ..., d17=2030 (d{year-2013})
Args:
year: Year (2013-2031, depending on table)
table: Table name (RAWDATACOR or ELABDATADISP)
Returns:
Partition name (e.g., "part8" for RAWDATACOR/2022, "d14" for ELABDATADISP/2026)
Raises:
ValueError: If year is out of range or table is unknown
"""
table_upper = table.upper()
if table_upper == "RAWDATACOR":
# RAWDATACOR: 2014-2024 (part0-part10)
# RAWDATACOR: 2025-2030 (d12-d17)
if year < 2014:
year = 2014
elif year > 2030:
year = 2030
if year < 2025:
suffix = "part"
d_year = 2014
else:
suffix = "d"
d_year = 2013 # Continue naming as d12, d13, ...
partition_index = year - d_year # 2014→0, 2015→1, ..., 2024→10 - 2025→12, ..., 2030→17
return f"{suffix}{partition_index}"
elif table_upper == "ELABDATADISP":
# ELABDATADISP: 2013-2031 (d0-d18)
if year < 2013:
year = 2013
elif year > 2031:
year = 2031
partition_index = year - 2013 # 2013→0, 2014→1, ..., 2025→12, ..., 2031→18
return f"d{partition_index}"
else:
raise ValueError(f"Unknown table: {table}")
def get_partitions_from_year(year: int, table: str) -> list[str]:
"""Get list of partition names from a specific year onwards.
Args:
year: Starting year
table: Table name (RAWDATACOR or ELABDATADISP)
Returns:
List of partition names from that year to the latest available year
Example:
get_partitions_from_year(2022, "RAWDATACOR")
→ ["part8", "part9", "part10", "d12", "d13", "d14", "d15", "d16", "d17"] # 2022→part8, ..., 2024→part10, 2025→d12, ..., 2030→d17
get_partitions_from_year(2025, "ELABDATADISP")
→ ["d12", "d13", "d14", "d15", "d16", "d17"] # 2025-2030
"""
table_upper = table.upper()
partitions = []
if table_upper == "RAWDATACOR":
end_year = 2030 # RAWDATACOR: part0-part10 (2014-2024) + d12-d17 (2025-2030)
elif table_upper == "ELABDATADISP":
end_year = 2030 # ELABDATADISP: d0-d17 (2013-2030)
else:
raise ValueError(f"Unknown table: {table}")
# Generate partitions for each year from start_year to end_year
for y in range(year, end_year + 1):
partition_name = year_to_partition_name(y, table)
# Avoid duplicates (can happen if mapping multiple years to same partition)
if not partitions or partitions[-1] != partition_name:
partitions.append(partition_name)
return partitions
def date_string_to_partition_name(date_str: str, table: str) -> str:
"""Extract year from date string and map to partition name.
Args:
date_str: Date string in format 'YYYY-MM-DD' (e.g., '2022-05-15')
table: Table name (RAWDATACOR or ELABDATADISP)
Returns:
Partition name (e.g., "part8" or "d8")
Example:
date_string_to_partition_name("2022-05-15", "RAWDATACOR") → "part8"
"""
if not date_str or len(date_str) < 4:
# Default to 2014 if invalid date
return year_to_partition_name(2014, table)
try:
year = int(date_str[:4])
return year_to_partition_name(year, table)
except (ValueError, TypeError):
# Default to 2014 if can't parse
return year_to_partition_name(2014, table)

43
main.py
View File

@@ -31,7 +31,7 @@ def cli(ctx):
)
def setup(create_schema):
"""Setup PostgreSQL database."""
setup_logger(__name__)
setup_logger("") # Configure root logger to show all module logs
if not create_schema:
click.echo("Usage: python main.py setup --create-schema")
@@ -89,7 +89,7 @@ def migrate():
)
def full(table, dry_run, resume, partition, parallel):
"""Perform full migration of all data."""
setup_logger(__name__)
setup_logger("") # Configure root logger to show all module logs
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
@@ -178,7 +178,7 @@ def incremental(table, dry_run):
)
def benchmark(iterations, output):
"""Run performance benchmarks comparing MySQL and PostgreSQL."""
setup_logger(__name__)
setup_logger("") # Configure root logger to show all module logs
try:
click.echo("Running performance benchmarks...")
@@ -194,7 +194,7 @@ def benchmark(iterations, output):
@cli.command()
def info():
"""Show configuration information."""
setup_logger(__name__)
setup_logger("") # Configure root logger to show all module logs
settings = get_settings()
@@ -219,5 +219,40 @@ def info():
click.echo(f" Iterations: {settings.benchmark.iterations}")
@cli.command()
@click.option(
"--port",
type=int,
default=7860,
help="Port to run the web interface on (default: 7860)"
)
@click.option(
"--share",
is_flag=True,
help="Create a public share link (useful for remote access)"
)
def web(port, share):
"""Launch web-based GUI for migration monitoring and control."""
setup_logger("") # Configure root logger to show all module logs
try:
from web_ui import launch_ui
click.echo(f"\n🚀 Starting Migration Dashboard on http://localhost:{port}")
if share:
click.echo("📡 Creating public share link...")
launch_ui(share=share, server_port=port)
except ImportError as e:
click.echo(f"✗ Failed to import web_ui module: {e}", err=True)
click.echo("Make sure gradio is installed: uv sync", err=True)
sys.exit(1)
except Exception as e:
logger.error(f"Web interface failed: {e}")
click.echo(f"✗ Web interface failed: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
cli(obj={})

View File

@@ -13,4 +13,7 @@ dependencies = [
"pydantic>=2.5.0",
"pydantic-settings>=2.1.0",
"cryptography>=46.0.3",
"gradio>=4.0.0",
"pandas>=2.0.0",
"plotly>=5.0.0",
]

78
scripts/README.md Normal file
View File

@@ -0,0 +1,78 @@
# Migration Scripts
Utility scripts per la gestione della migrazione.
## sync_migration_state.py
Sincronizza la tabella `migration_state` con i dati effettivamente presenti in PostgreSQL.
### Quando usare
Usa questo script quando `migration_state` non è sincronizzato con i dati reali, ad esempio:
- Dopo inserimenti manuali in PostgreSQL
- Dopo corruzione dello stato
- Prima di eseguire migrazione incrementale su dati già esistenti
### Come funziona
Per ogni tabella (rawdatacor, elabdatadisp):
1. Trova la riga con MAX(created_at) - l'ultima riga inserita
2. Estrae la consolidation key da quella riga
3. Aggiorna `migration_state._global` con quella chiave
### Utilizzo
```bash
# Eseguire dalla root del progetto
python scripts/sync_migration_state.py
```
### Output
```
Syncing migration_state with actual PostgreSQL data...
================================================================================
ELABDATADISP:
Most recently inserted row (by created_at):
created_at: 2025-12-30 11:58:24
event_timestamp: 2025-12-30 14:58:24
Consolidation key: (ID0290, DT0007, 2025-12-30, 14:58:24)
✓ Updated migration_state with this key
RAWDATACOR:
Most recently inserted row (by created_at):
created_at: 2025-12-30 11:13:29
event_timestamp: 2025-12-30 11:11:39
Consolidation key: (ID0304, DT0024, 2025-12-30, 11:11:39)
✓ Updated migration_state with this key
================================================================================
✓ Done! Incremental migration will now start from the correct position.
```
### Effetti
Dopo aver eseguito questo script:
- `migration_state._global` sarà aggiornato con l'ultima chiave migrata
- `python main.py migrate incremental` partirà dalla posizione corretta
- Non verranno create duplicazioni (usa ON CONFLICT DO NOTHING)
### Avvertenze
- Esclude automaticamente dati corrotti (unit_name come `[Ljava.lang.String;@...`)
- Usa `created_at` per trovare l'ultima riga inserita (non `event_timestamp`)
- Sovrascrive lo stato globale esistente
### Verifica
Dopo aver eseguito lo script, verifica lo stato:
```sql
SELECT table_name, partition_name, last_key
FROM migration_state
WHERE partition_name = '_global'
ORDER BY table_name;
```
Dovrebbe mostrare le chiavi più recenti per entrambe le tabelle.

63
scripts/sync_migration_state.py Executable file
View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python3
"""Sync migration_state with actual data in PostgreSQL tables."""
import sys
sys.path.insert(0, '/home/alex/devel/mysql2postgres')
from src.connectors.postgres_connector import PostgreSQLConnector
from src.migrator.state_manager import StateManager
def sync_table_state(table_name: str):
"""Sync migration_state for a table with its actual data."""
with PostgreSQLConnector() as pg_conn:
cursor = pg_conn.connection.cursor()
# Find the row with MAX(created_at) - most recently inserted
# Exclude corrupted data (Java strings)
cursor.execute(f"""
SELECT unit_name, tool_name_id,
DATE(event_timestamp)::text as event_date,
event_timestamp::time::text as event_time,
created_at,
event_timestamp
FROM {table_name}
WHERE unit_name NOT LIKE '[L%' -- Exclude corrupted Java strings
ORDER BY created_at DESC
LIMIT 1
""")
result = cursor.fetchone()
if not result:
print(f"No data found in {table_name}")
return
unit_name, tool_name_id, event_date, event_time, created_at, event_timestamp = result
print(f"\n{table_name.upper()}:")
print(f" Most recently inserted row (by created_at):")
print(f" created_at: {created_at}")
print(f" event_timestamp: {event_timestamp}")
print(f" Consolidation key: ({unit_name}, {tool_name_id}, {event_date}, {event_time})")
# Update global migration_state with this key
state_mgr = StateManager(pg_conn, table_name, partition_name="_global")
last_key = {
"unit_name": unit_name,
"tool_name_id": tool_name_id,
"event_date": event_date,
"event_time": event_time
}
state_mgr.update_state(last_key=last_key)
print(f" ✓ Updated migration_state with this key")
if __name__ == "__main__":
print("Syncing migration_state with actual PostgreSQL data...")
print("="*80)
sync_table_state("elabdatadisp")
sync_table_state("rawdatacor")
print("\n" + "="*80)
print("✓ Done! Incremental migration will now start from the correct position.")

View File

@@ -294,6 +294,7 @@ class MySQLConnector:
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
LIMIT %s
"""
logger.info(f"Executing first query on partition {partition} (fetching up to {limit} rows, sorted by consolidation key)...")
cursor.execute(rows_query, (limit,))
else:
# Resume AFTER last completely yielded key
@@ -304,9 +305,11 @@ class MySQLConnector:
ORDER BY UnitName ASC, ToolNameID ASC, EventDate ASC, EventTime ASC
LIMIT %s
"""
logger.debug(f"Executing query on partition {partition} (resuming from key {last_completed_key}, limit {limit})...")
cursor.execute(rows_query, (last_completed_key[0], last_completed_key[1], last_completed_key[2], last_completed_key[3], limit))
rows = cursor.fetchall()
logger.debug(f"Fetched {len(rows)} rows from partition {partition}")
if not rows:
# No more rows - yield any buffered group and finish
@@ -692,6 +695,92 @@ class MySQLConnector:
)
self._reconnect()
def fetch_consolidation_keys_from_partition_after(
self,
table: str,
partition: str,
after_key: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Fetch distinct consolidation keys from a specific partition after a specific key.
Optimized version for incremental migration that queries only one partition.
Query pattern:
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM table PARTITION (partition_name)
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT X
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
partition: Partition name (e.g., 'part8', 'd9')
after_key: Start after this key (dict with unit_name, tool_name_id, event_date, event_time)
limit: Number of keys to fetch (uses CONSOLIDATION_GROUP_LIMIT if None)
Returns:
List of dicts with keys: UnitName, ToolNameID, EventDate, EventTime
"""
if table not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Consolidation not supported for table {table}")
if limit is None:
limit = self.settings.migration.consolidation_group_limit
retries = 0
while retries < self.MAX_RETRIES:
try:
with self.connection.cursor() as cursor:
if after_key:
# Fetch keys AFTER the last migrated key from this specific partition
query = f"""
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM `{table}` PARTITION (`{partition}`)
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s)
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT %s
"""
cursor.execute(
query,
(
after_key.get("unit_name"),
after_key.get("tool_name_id"),
after_key.get("event_date"),
after_key.get("event_time"),
limit
)
)
else:
# No after_key: fetch from beginning of partition
query = f"""
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM `{table}` PARTITION (`{partition}`)
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT %s
"""
cursor.execute(query, (limit,))
keys = cursor.fetchall()
return keys
except pymysql.Error as e:
retries += 1
if retries >= self.MAX_RETRIES:
logger.error(
f"Failed to fetch consolidation keys from {table} PARTITION ({partition}) "
f"(after_key={after_key}) after {self.MAX_RETRIES} retries: {e}"
)
raise
else:
logger.warning(
f"Fetch consolidation keys from partition failed (retry {retries}/{self.MAX_RETRIES}): {e}"
)
self._reconnect()
def fetch_records_for_key_all_partitions(
self,
table: str,

View File

@@ -11,6 +11,7 @@ from src.migrator.consolidator import consolidate_rows
from src.migrator.state_manager import StateManager
from src.utils.logger import get_logger
from src.utils.progress import ProgressTracker
from src.utils.validation import validate_consolidation_key, ErrorLogger
logger = get_logger(__name__)
@@ -60,6 +61,9 @@ class IncrementalMigrator:
# Initialize state manager
state_mgr = StateManager(pg_conn, pg_table)
# Initialize error logger
error_logger = ErrorLogger(pg_table, "incremental")
# Get last migrated key from migration_state
# This was saved during the last full/incremental migration
last_key = state_mgr.get_last_key()
@@ -77,52 +81,63 @@ class IncrementalMigrator:
f"{last_key.get('event_date')}, {last_key.get('event_time')})"
)
# Get max MySQL ID already migrated to optimize query performance
cursor = pg_conn.connection.cursor()
cursor.execute(f"SELECT MAX(mysql_max_id) FROM {pg_table}")
result = cursor.fetchone()
max_mysql_id = result[0] if result and result[0] else 0
# Determine which partitions to process based on last_key's event_date
# This is a MAJOR optimization: instead of querying the entire table,
# we only process partitions from the last migrated year onwards
from config import get_partitions_from_year, date_string_to_partition_name
logger.info(f"Max MySQL ID already migrated: {max_mysql_id}")
last_event_date = last_key.get('event_date')
if not last_event_date:
logger.warning("Last key has no event_date, starting from 2014")
partitions_to_process = mysql_conn.get_table_partitions(mysql_table)
else:
# Extract year from last_event_date and get partitions from that year onwards
year = int(str(last_event_date)[:4]) if len(str(last_event_date)) >= 4 else 2014
partitions_to_process = get_partitions_from_year(year, mysql_table)
logger.info(
f"Optimized incremental sync: Processing only {len(partitions_to_process)} "
f"partitions from year {year} onwards: {partitions_to_process}"
)
logger.info(
f"Skipping partitions before year {year} (no new data possible there)"
)
if dry_run:
# In dry-run, check how many new keys exist in MySQL
logger.info("[DRY RUN] Checking for new keys in MySQL...")
logger.info("[DRY RUN] Checking for new keys across relevant partitions...")
# Sample first 100 keys to check if there are new records
sample_keys = mysql_conn.fetch_consolidation_keys_after(
mysql_table,
after_key=last_key,
min_mysql_id=max_mysql_id,
limit=100,
offset=0
)
total_new_keys = 0
first_keys_found = []
if sample_keys:
# If we found 100 keys in the sample, there might be many more
# Try to get a rough count by checking larger offsets
if len(sample_keys) == 100:
# There are at least 100 keys, check if there are more
logger.info(
f"[DRY RUN] Found at least 100 new keys, checking total count..."
)
# Sample at different offsets to estimate total
test_batch = mysql_conn.fetch_consolidation_keys_after(
for partition in partitions_to_process:
# For first partition (same year as last_key), use after_key
# For subsequent partitions, start from beginning
if partition == partitions_to_process[0]:
sample_keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
mysql_table,
partition=partition,
after_key=last_key,
min_mysql_id=max_mysql_id,
limit=1,
offset=1000
limit=100
)
if test_batch:
logger.info(f"[DRY RUN] Estimated: More than 1000 new keys to migrate")
else:
logger.info(f"[DRY RUN] Estimated: Between 100-1000 new keys to migrate")
logger.info(f"[DRY RUN] Partition {partition}: {len(sample_keys)} new keys (after last_key)")
else:
logger.info(f"[DRY RUN] Found {len(sample_keys)} new keys to migrate")
sample_keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
mysql_table,
partition=partition,
after_key=None, # All keys from this partition are new
limit=100
)
logger.info(f"[DRY RUN] Partition {partition}: {len(sample_keys)} new keys (all new)")
total_new_keys += len(sample_keys)
if sample_keys and len(first_keys_found) < 3:
first_keys_found.extend(sample_keys[:3 - len(first_keys_found)])
if total_new_keys > 0:
logger.info(f"[DRY RUN] Found at least {total_new_keys} new keys across {len(partitions_to_process)} partitions")
logger.info("[DRY RUN] First 3 keys:")
for i, key in enumerate(sample_keys[:3]):
for i, key in enumerate(first_keys_found[:3]):
logger.info(
f" {i+1}. ({key.get('UnitName')}, {key.get('ToolNameID')}, "
f"{key.get('EventDate')}, {key.get('EventTime')})"
@@ -130,17 +145,16 @@ class IncrementalMigrator:
logger.info(
f"[DRY RUN] Run without --dry-run to perform actual migration"
)
# Return a positive number to indicate there's data to migrate
return len(sample_keys)
return total_new_keys
else:
logger.info("[DRY RUN] No new keys found - database is up to date")
return 0
# Migrate new keys
migrated_rows = 0
offset = 0
insert_buffer = []
buffer_size = self.settings.migration.consolidation_group_limit // 10
last_processed_key = None # Track last key for final state update
with ProgressTracker(
total=None, # Unknown total
@@ -150,99 +164,133 @@ class IncrementalMigrator:
# Get column order for PostgreSQL insert
pg_columns = self._get_pg_columns()
while True:
# Fetch batch of consolidation keys AFTER last_key
logger.debug(f"Fetching keys after last_key with offset={offset}")
keys = mysql_conn.fetch_consolidation_keys_after(
mysql_table,
after_key=last_key,
min_mysql_id=max_mysql_id,
limit=self.settings.migration.consolidation_group_limit,
offset=offset
# Process each partition
for partition_idx, partition in enumerate(partitions_to_process, 1):
logger.info(
f"[{partition_idx}/{len(partitions_to_process)}] "
f"Processing partition {partition}..."
)
if not keys:
logger.info("No more new keys to migrate")
break
# For first partition (same year as last_key), fetch keys AFTER last_key
# For subsequent partitions, fetch ALL keys (they're all new)
use_after_key = last_key if partition == partitions_to_process[0] else None
logger.info(f"Processing {len(keys)} new keys (offset={offset})")
# Process each consolidation key
keys_processed = 0
for key in keys:
keys_processed += 1
# Log progress every 1000 keys
if keys_processed % 1000 == 0:
logger.info(f" Processed {keys_processed}/{len(keys)} keys in this batch...")
unit_name = key.get("UnitName")
tool_name_id = key.get("ToolNameID")
event_date = key.get("EventDate")
event_time = key.get("EventTime")
# Fetch all MySQL rows for this key (all nodes, all partitions)
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
while True:
# Fetch batch of consolidation keys from this partition
keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
mysql_table,
unit_name,
tool_name_id,
event_date,
event_time
partition=partition,
after_key=use_after_key,
limit=self.settings.migration.consolidation_group_limit
)
if not mysql_rows:
logger.warning(
f"No records found for key: "
f"({unit_name}, {tool_name_id}, {event_date}, {event_time})"
if not keys:
logger.info(f" No more keys in partition {partition}")
break
logger.info(f" Processing {len(keys)} keys from {partition}...")
# Process each consolidation key
keys_processed = 0
for key in keys:
keys_processed += 1
# Log progress every 1000 keys
if keys_processed % 1000 == 0:
logger.info(f" Processed {keys_processed}/{len(keys)} keys in this batch...")
unit_name = key.get("UnitName")
tool_name_id = key.get("ToolNameID")
event_date = key.get("EventDate")
event_time = key.get("EventTime")
# Validate consolidation key before fetching
is_valid, error_reason = validate_consolidation_key(
unit_name, tool_name_id, event_date, event_time
)
continue
# Consolidate into single PostgreSQL row
try:
pg_row = consolidate_rows(self.table, mysql_rows)
except Exception as e:
logger.error(
f"Failed to consolidate key "
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}"
)
continue
if not is_valid:
# Log invalid key and skip
error_logger.log_invalid_key(
unit_name, tool_name_id, event_date, event_time, error_reason
)
continue
# Add to insert buffer
insert_buffer.append(pg_row)
# Fetch all MySQL rows for this key (all nodes, all partitions)
try:
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
mysql_table,
unit_name,
tool_name_id,
event_date,
event_time
)
except Exception as e:
# Log corrupted key that caused fetch error
error_logger.log_invalid_key(
unit_name, tool_name_id, event_date, event_time,
f"Fetch failed: {e}"
)
continue
# Flush buffer when full
if len(insert_buffer) >= buffer_size:
# Use COPY with ON CONFLICT to handle duplicates
inserted = pg_conn.copy_from_with_conflict(
pg_table,
insert_buffer,
pg_columns,
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
)
migrated_rows += inserted
progress.update(inserted)
if not mysql_rows:
logger.warning(
f"No records found for key: "
f"({unit_name}, {tool_name_id}, {event_date}, {event_time})"
)
continue
# Update state with last key
# Consolidate into single PostgreSQL row
try:
pg_row = consolidate_rows(self.table, mysql_rows)
except Exception as e:
logger.error(
f"Failed to consolidate key "
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}"
)
continue
# Add to insert buffer
insert_buffer.append(pg_row)
# Track last processed key
last_processed_key = {
"unit_name": unit_name,
"tool_name_id": tool_name_id,
"event_date": str(event_date) if event_date else None,
"event_time": str(event_time) if event_time else None,
}
state_mgr.update_state(
last_key=last_processed_key,
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
)
logger.debug(
f"Flushed {inserted} rows, total new: {migrated_rows}"
)
insert_buffer = []
# Flush buffer when full
if len(insert_buffer) >= buffer_size:
# Use COPY with ON CONFLICT to handle duplicates
inserted = pg_conn.copy_from_with_conflict(
pg_table,
insert_buffer,
pg_columns,
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
)
migrated_rows += inserted
progress.update(inserted)
# Move to next batch of keys
offset += len(keys)
# Update state with last key (from tracked variable)
state_mgr.update_state(
last_key=last_processed_key,
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
)
# If we got fewer keys than requested, we're done
if len(keys) < self.settings.migration.consolidation_group_limit:
break
logger.debug(
f"Flushed {inserted} rows, total new: {migrated_rows}"
)
insert_buffer = []
# After processing all keys in batch, update use_after_key for next iteration
if keys:
last_key_in_batch = keys[-1]
use_after_key = {
"unit_name": last_key_in_batch.get("UnitName"),
"tool_name_id": last_key_in_batch.get("ToolNameID"),
"event_date": str(last_key_in_batch.get("EventDate")) if last_key_in_batch.get("EventDate") else None,
"event_time": str(last_key_in_batch.get("EventTime")) if last_key_in_batch.get("EventTime") else None,
}
# Flush remaining buffer
if insert_buffer:
@@ -257,13 +305,24 @@ class IncrementalMigrator:
progress.update(inserted)
logger.debug(f"Final flush: {inserted} rows")
# Update state with last key after final flush
if last_processed_key:
state_mgr.update_state(
last_key=last_processed_key,
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
)
# Get final row count
final_count = pg_conn.get_row_count(pg_table)
logger.info(f"Total PostgreSQL rows: {final_count}")
# Close error logger and get count
error_logger.close()
logger.info(
f"✓ Incremental migration complete: "
f"{migrated_rows} new rows migrated to {pg_table}"
f"{migrated_rows} new rows migrated to {pg_table}, "
f"{error_logger.get_error_count()} invalid keys skipped"
)
return migrated_rows

View File

@@ -120,6 +120,9 @@ class PartitionMigrator:
description=f"Streaming {mysql_table} partition {partition_name}"
) as progress:
# Log before starting to fetch (this query can take several minutes for large partitions)
logger.info(f"Fetching consolidation groups from MySQL partition {partition_name}...")
# Use fetch_consolidation_groups_from_partition with start_key for efficient resume
# MySQL will skip all keys <= start_key using WHERE clause (no unnecessary data transfer)
for group in mysql_conn.fetch_consolidation_groups_from_partition(

View File

@@ -7,7 +7,7 @@ Tracks migration progress with:
- status: pending, in_progress, completed
"""
from typing import Optional, Dict, Any
from datetime import datetime
from datetime import datetime, timezone
import json
from src.connectors.postgres_connector import PostgreSQLConnector
@@ -199,7 +199,7 @@ class StateManager:
if mark_completed:
updates.append("migration_completed_at = %s")
params.append(datetime.utcnow())
params.append(datetime.now(timezone.utc))
if status is None:
updates.append("status = 'completed'")

View File

@@ -183,8 +183,8 @@ CREATE TABLE IF NOT EXISTS migration_state (
table_name VARCHAR(255) NOT NULL,
partition_name VARCHAR(255) NOT NULL,
last_key JSONB,
migration_started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
migration_completed_at TIMESTAMP,
migration_started_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
migration_completed_at TIMESTAMPTZ,
total_rows_migrated BIGINT DEFAULT 0,
status VARCHAR(32) DEFAULT 'pending',
PRIMARY KEY (table_name, partition_name),

View File

@@ -10,21 +10,30 @@ logger = get_logger(__name__)
class ErrorLogger:
"""Log invalid migration keys to a file."""
def __init__(self, table: str, partition: str):
def __init__(self, table: str, partition: str, use_timestamp: bool = False):
"""Initialize error logger.
Args:
table: Table name
partition: Partition name
partition: Partition name (e.g., 'p2024' or 'incremental')
use_timestamp: If True, add timestamp to filename (for incremental migrations)
"""
self.table = table
self.partition = partition
self.error_file = f"migration_errors_{table}_{partition}.log"
# Add timestamp to filename for incremental migrations to avoid overwriting
if use_timestamp or partition == "incremental":
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.error_file = f"migration_errors_{table}_{partition}_{timestamp}.log"
else:
self.error_file = f"migration_errors_{table}_{partition}.log"
self.error_count = 0
# Create error file with header
with open(self.error_file, "w") as f:
f.write(f"# Migration errors for {table} partition {partition}\n")
f.write(f"# Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("# Format: UnitName|ToolNameID|EventDate|EventTime|Reason\n\n")
logger.info(f"Error log file created: {self.error_file}")
@@ -99,6 +108,10 @@ def validate_consolidation_key(
if unit_name is None or unit_name == "":
return False, "UnitName is NULL or empty"
# Check for corrupted Java strings (like '[Ljava.lang.String;@...')
if isinstance(unit_name, str) and unit_name.startswith("[L"):
return False, f"UnitName is corrupted Java string: {unit_name}"
if tool_name_id is None or tool_name_id == "":
return False, "ToolNameID is NULL or empty"

1118
uv.lock generated

File diff suppressed because it is too large Load Diff

593
web_ui.py Normal file
View File

@@ -0,0 +1,593 @@
"""Web UI for MySQL to PostgreSQL migration monitoring and control.
Provides a Gradio-based interface for:
- Viewing migration status and progress
- Starting/monitoring migrations
- Viewing logs and errors
- Performance metrics and graphs
"""
import gradio as gr
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from pathlib import Path
import threading
import time
from datetime import datetime
from typing import Optional, Dict, List, Tuple
import logging
from config import get_settings
from src.connectors.postgres_connector import PostgreSQLConnector
from src.migrator.full_migrator import run_full_migration
from src.migrator.parallel_migrator import run_parallel_migration
from src.utils.logger import get_logger
logger = get_logger(__name__)
# Global state for tracking running migrations
running_migrations = {}
migration_logs = {}
def get_migration_state_df() -> pd.DataFrame:
"""Fetch current migration state from PostgreSQL.
Returns:
DataFrame with columns: table_name, partition_name, status,
total_rows_migrated, migration_started_at, migration_completed_at
"""
try:
with PostgreSQLConnector() as pg_conn:
with pg_conn.connection.cursor() as cursor:
cursor.execute("""
SELECT
table_name,
partition_name,
status,
total_rows_migrated,
migration_started_at,
migration_completed_at,
last_key
FROM migration_state
WHERE partition_name != '_global'
ORDER BY table_name, partition_name
""")
rows = cursor.fetchall()
if not rows:
return pd.DataFrame(columns=[
'Table', 'Partition', 'Status', 'Rows Migrated',
'Started At', 'Completed At'
])
df = pd.DataFrame(rows, columns=[
'Table', 'Partition', 'Status', 'Rows Migrated',
'Started At', 'Completed At', 'Last Key'
])
# Format dates
df['Started At'] = pd.to_datetime(df['Started At']).dt.strftime('%Y-%m-%d %H:%M:%S')
df['Completed At'] = pd.to_datetime(df['Completed At']).dt.strftime('%Y-%m-%d %H:%M:%S')
df['Completed At'] = df['Completed At'].fillna('-')
# Drop Last Key column for display (too verbose)
df = df.drop('Last Key', axis=1)
return df
except Exception as e:
logger.error(f"Failed to fetch migration state: {e}")
return pd.DataFrame(columns=[
'Table', 'Partition', 'Status', 'Rows Migrated',
'Started At', 'Completed At'
])
def get_migration_summary() -> Dict[str, any]:
"""Get summary statistics for migrations.
Returns:
Dict with total/completed/in_progress/pending counts per table
"""
try:
with PostgreSQLConnector() as pg_conn:
with pg_conn.connection.cursor() as cursor:
cursor.execute("""
SELECT
table_name,
status,
COUNT(*) as count,
SUM(total_rows_migrated) as total_rows
FROM migration_state
WHERE partition_name != '_global'
GROUP BY table_name, status
ORDER BY table_name, status
""")
rows = cursor.fetchall()
summary = {
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
}
for table, status, count, total_rows in rows:
table_upper = table.upper()
if table_upper in summary:
summary[table_upper][status] = count
summary[table_upper]['total_rows'] += total_rows or 0
return summary
except Exception as e:
logger.error(f"Failed to fetch migration summary: {e}")
return {
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
}
def create_status_chart() -> go.Figure:
"""Create a bar chart showing migration status by table.
Returns:
Plotly figure
"""
summary = get_migration_summary()
tables = []
completed = []
in_progress = []
pending = []
for table, stats in summary.items():
tables.append(table)
completed.append(stats['completed'])
in_progress.append(stats['in_progress'])
pending.append(stats['pending'])
fig = go.Figure(data=[
go.Bar(name='Completed', x=tables, y=completed, marker_color='green'),
go.Bar(name='In Progress', x=tables, y=in_progress, marker_color='orange'),
go.Bar(name='Pending', x=tables, y=pending, marker_color='gray')
])
fig.update_layout(
barmode='stack',
title='Migration Status by Table',
xaxis_title='Table',
yaxis_title='Number of Partitions',
height=400
)
return fig
def get_error_logs() -> List[Tuple[str, str]]:
"""Get list of error log files with their paths.
Returns:
List of (filename, filepath) tuples
"""
error_logs = []
for log_file in Path('.').glob('migration_errors_*.log'):
error_logs.append((log_file.name, str(log_file)))
return sorted(error_logs, key=lambda x: x[0], reverse=True)
def read_error_log(log_file: str) -> str:
"""Read contents of an error log file.
Args:
log_file: Path to log file
Returns:
Contents of log file
"""
try:
if not log_file or log_file == "Select a log file...":
return "No log file selected"
with open(log_file, 'r') as f:
content = f.read()
if not content:
return f"Log file {log_file} is empty (no errors logged)"
return content
except Exception as e:
return f"Error reading log file: {e}"
def start_migration_task(
table: str,
parallel_workers: int,
resume: bool,
dry_run: bool,
partition: Optional[str] = None
) -> str:
"""Start a migration in a background thread.
Args:
table: Table name (RAWDATACOR, ELABDATADISP, or all)
parallel_workers: Number of parallel workers (0 = sequential)
resume: Whether to resume from last checkpoint
dry_run: Whether to run in dry-run mode
partition: Optional partition name for single partition migration
Returns:
Status message
"""
task_id = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if task_id in running_migrations:
return f"❌ Migration already running for {table}"
# Validate options
if parallel_workers > 0 and partition:
return "❌ Cannot use parallel workers with single partition mode"
# Initialize log buffer for this task
migration_logs[task_id] = []
def run_migration():
"""Worker function to run migration."""
try:
migration_logs[task_id].append(f"Starting migration for {table}...")
migration_logs[task_id].append(f"Mode: {'Parallel' if parallel_workers > 0 else 'Sequential'}")
migration_logs[task_id].append(f"Resume: {resume}, Dry-run: {dry_run}")
if parallel_workers > 0:
# Parallel migration
rows = run_parallel_migration(
table,
num_workers=parallel_workers,
dry_run=dry_run,
resume=resume
)
else:
# Sequential migration
rows = run_full_migration(
table,
dry_run=dry_run,
resume=resume,
partition=partition
)
migration_logs[task_id].append(f"✓ Migration complete: {rows:,} rows migrated")
running_migrations[task_id] = 'completed'
except Exception as e:
migration_logs[task_id].append(f"❌ Migration failed: {e}")
running_migrations[task_id] = 'failed'
logger.error(f"Migration failed: {e}")
# Start migration in background thread
thread = threading.Thread(target=run_migration, daemon=True)
thread.start()
running_migrations[task_id] = 'running'
return f"✓ Migration started: {task_id}\nCheck the 'Logs' tab for progress"
def start_incremental_migration_task(
table: str,
dry_run: bool
) -> str:
"""Start an incremental migration in a background thread.
Args:
table: Table name (RAWDATACOR, ELABDATADISP, or all)
dry_run: Whether to run in dry-run mode
Returns:
Status message
"""
task_id = f"incremental_{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if task_id in running_migrations:
return f"❌ Incremental migration already running for {table}"
# Initialize log buffer for this task
migration_logs[task_id] = []
def run_migration():
"""Worker function to run incremental migration."""
try:
migration_logs[task_id].append(f"Starting INCREMENTAL migration for {table}...")
migration_logs[task_id].append(f"Dry-run: {dry_run}")
migration_logs[task_id].append("This will sync only NEW data added since last migration")
from src.migrator.incremental_migrator import run_incremental_migration
rows = run_incremental_migration(
table,
dry_run=dry_run
)
migration_logs[task_id].append(f"✓ Incremental migration complete: {rows:,} rows migrated")
running_migrations[task_id] = 'completed'
except Exception as e:
migration_logs[task_id].append(f"❌ Incremental migration failed: {e}")
running_migrations[task_id] = 'failed'
logger.error(f"Incremental migration failed: {e}")
# Start migration in background thread
thread = threading.Thread(target=run_migration, daemon=True)
thread.start()
running_migrations[task_id] = 'running'
return f"✓ Incremental migration started: {task_id}\nCheck the 'Logs' tab for progress"
def get_migration_logs(task_id: str) -> str:
"""Get logs for a specific migration task.
Args:
task_id: Task identifier
Returns:
Log contents
"""
if not task_id or task_id == "No migrations running":
return "Select a migration to view logs"
logs = migration_logs.get(task_id, [])
if not logs:
return f"No logs available for {task_id}"
return "\n".join(logs)
def refresh_migration_state():
"""Refresh migration state display."""
return get_migration_state_df()
def refresh_status_chart():
"""Refresh status chart."""
return create_status_chart()
def refresh_running_migrations() -> gr.Dropdown:
"""Refresh list of running migrations."""
if not running_migrations:
return gr.Dropdown(choices=["No migrations running"], value="No migrations running")
choices = list(running_migrations.keys())
return gr.Dropdown(choices=choices, value=choices[0] if choices else None)
# Build Gradio interface
with gr.Blocks(title="MySQL to PostgreSQL Migration", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🔄 MySQL to PostgreSQL Migration Dashboard")
gr.Markdown("Monitor and control data migration from MySQL to PostgreSQL")
with gr.Tabs():
# Tab 1: Overview
with gr.Tab("📊 Overview"):
gr.Markdown("## Migration Status Overview")
with gr.Row():
refresh_btn = gr.Button("🔄 Refresh", scale=1)
with gr.Row():
status_chart = gr.Plot(label="Partition Status by Table")
with gr.Row():
state_table = gr.Dataframe(
value=get_migration_state_df(),
label="Migration State (All Partitions)",
interactive=False,
wrap=True
)
# Auto-refresh every 10 seconds
refresh_btn.click(
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
outputs=[state_table, status_chart]
)
# Initial load
demo.load(
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
outputs=[state_table, status_chart]
)
# Tab 2: Start Migration
with gr.Tab("▶️ Start Migration"):
gr.Markdown("## Start New Migration")
with gr.Row():
table_select = gr.Dropdown(
choices=["RAWDATACOR", "ELABDATADISP", "all"],
value="RAWDATACOR",
label="Table to Migrate"
)
partition_input = gr.Textbox(
label="Partition (optional)",
placeholder="Leave empty for all partitions, or enter e.g. 'part7', 'd9'",
value=""
)
with gr.Row():
parallel_slider = gr.Slider(
minimum=0,
maximum=10,
value=0,
step=1,
label="Parallel Workers (0 = sequential)"
)
with gr.Row():
resume_check = gr.Checkbox(label="Resume from last checkpoint", value=True)
dry_run_check = gr.Checkbox(label="Dry run (simulate without writing)", value=False)
start_btn = gr.Button("▶️ Start Migration", variant="primary", size="lg")
migration_output = gr.Textbox(
label="Migration Status",
lines=3,
interactive=False
)
start_btn.click(
fn=start_migration_task,
inputs=[table_select, parallel_slider, resume_check, dry_run_check, partition_input],
outputs=migration_output
)
# Tab 3: Incremental Migration
with gr.Tab("🔄 Incremental Sync"):
gr.Markdown("## Incremental Migration (Sync New Data)")
gr.Markdown("""
Questa modalità sincronizza **solo i dati nuovi** aggiunti dopo l'ultima migrazione full.
**Come funziona:**
- Trova le nuove consolidation keys in MySQL che non esistono ancora in PostgreSQL
- Migra solo quelle chiavi (non riprocessa dati già migrati)
- Usa `migration_state` per tracciare l'ultima chiave processata
**Quando usare:**
- Dopo aver completato una migrazione full di tutte le partizioni
- Per sincronizzare periodicamente nuovi dati senza rifare tutto
- Per aggiornamenti quotidiani/settimanali
""")
with gr.Row():
inc_table_select = gr.Dropdown(
choices=["RAWDATACOR", "ELABDATADISP", "all"],
value="RAWDATACOR",
label="Table to Sync"
)
with gr.Row():
inc_dry_run_check = gr.Checkbox(
label="Dry run (simulate without writing)",
value=False
)
inc_start_btn = gr.Button("🔄 Start Incremental Sync", variant="primary", size="lg")
inc_migration_output = gr.Textbox(
label="Sync Status",
lines=3,
interactive=False
)
inc_start_btn.click(
fn=start_incremental_migration_task,
inputs=[inc_table_select, inc_dry_run_check],
outputs=inc_migration_output
)
# Tab 4: Logs
with gr.Tab("📝 Logs"):
gr.Markdown("## Migration Logs")
with gr.Row():
running_migrations_dropdown = gr.Dropdown(
choices=["No migrations running"],
value="No migrations running",
label="Select Migration",
interactive=True
)
refresh_migrations_btn = gr.Button("🔄 Refresh List")
log_output = gr.Textbox(
label="Log Output",
lines=20,
interactive=False,
max_lines=50
)
refresh_migrations_btn.click(
fn=refresh_running_migrations,
outputs=running_migrations_dropdown
)
running_migrations_dropdown.change(
fn=get_migration_logs,
inputs=running_migrations_dropdown,
outputs=log_output
)
# Tab 5: Error Logs
with gr.Tab("⚠️ Error Logs"):
gr.Markdown("## Error Log Viewer")
gr.Markdown("View logged validation errors (invalid consolidation keys)")
with gr.Row():
error_log_dropdown = gr.Dropdown(
choices=[x[0] for x in get_error_logs()] if get_error_logs() else ["No error logs found"],
label="Select Error Log File",
value="Select a log file..."
)
refresh_error_logs_btn = gr.Button("🔄 Refresh")
error_log_content = gr.Textbox(
label="Error Log Contents",
lines=20,
interactive=False,
max_lines=50
)
def refresh_error_log_list():
logs = get_error_logs()
choices = [x[0] for x in logs] if logs else ["No error logs found"]
return gr.Dropdown(choices=choices)
def show_error_log(filename):
if not filename or filename == "Select a log file..." or filename == "No error logs found":
return "Select a log file to view its contents"
return read_error_log(filename)
refresh_error_logs_btn.click(
fn=refresh_error_log_list,
outputs=error_log_dropdown
)
error_log_dropdown.change(
fn=show_error_log,
inputs=error_log_dropdown,
outputs=error_log_content
)
def launch_ui(share=False, server_port=7860):
"""Launch the Gradio UI.
Args:
share: Whether to create a public share link
server_port: Port to run the server on
"""
demo.launch(
share=share,
server_port=server_port,
server_name="0.0.0.0" # Listen on all interfaces
)
if __name__ == "__main__":
import sys
# Parse command line args
share = "--share" in sys.argv
port = 7860
for arg in sys.argv:
if arg.startswith("--port="):
port = int(arg.split("=")[1])
print(f"Starting Migration Dashboard on port {port}...")
print(f"Share mode: {share}")
launch_ui(share=share, server_port=port)