con web ui
This commit is contained in:
45
README.md
45
README.md
@@ -4,8 +4,10 @@ Un tool robusto per la migrazione di database MySQL a PostgreSQL con trasformazi
|
||||
|
||||
## Caratteristiche
|
||||
|
||||
- **Web GUI**: Interfaccia grafica basata su Gradio per monitoraggio e controllo visuale
|
||||
- **Migrazione Completa**: Trasferimento di tutti i dati da MySQL a PostgreSQL
|
||||
- **Migrazione Incrementale**: Sincronizzazione periodica basata su consolidation keys
|
||||
- **Migrazione Parallela**: Supporto per worker multipli per velocizzare le migrazioni
|
||||
- **Consolidamento Dati**: Raggruppa multiple righe MySQL in singoli record PostgreSQL
|
||||
- **Trasformazione JSONB**: Consolidamento automatico di colonne multiple in campi JSONB
|
||||
- **Partizionamento**: Supporto per partizioni per anno (2014-2031)
|
||||
@@ -16,6 +18,7 @@ Un tool robusto per la migrazione di database MySQL a PostgreSQL con trasformazi
|
||||
- **Logging**: Logging strutturato con Rich per output colorato
|
||||
- **Dry-Run Mode**: Modalità test senza modificare i dati
|
||||
- **State Management**: Tracking affidabile con tabella `migration_state` in PostgreSQL
|
||||
- **Error Logging**: Tracciamento automatico di chiavi di consolidamento non valide
|
||||
|
||||
## Setup
|
||||
|
||||
@@ -94,7 +97,47 @@ Crea lo schema PostgreSQL con:
|
||||
- Indici ottimizzati per JSONB
|
||||
- Tabella di tracking `migration_state`
|
||||
|
||||
#### Migrazione Completa
|
||||
#### Web GUI (Interfaccia Grafica) 🎨
|
||||
|
||||
Lancia l'interfaccia web per monitorare e controllare le migrazioni in modo visuale:
|
||||
|
||||
```bash
|
||||
# Avvia la GUI sulla porta predefinita (7860)
|
||||
uv run main.py web
|
||||
|
||||
# Avvia su una porta specifica
|
||||
uv run main.py web --port 8080
|
||||
|
||||
# Crea un link pubblico condivisibile (utile per accesso remoto)
|
||||
uv run main.py web --share
|
||||
```
|
||||
|
||||
Poi apri il browser su `http://localhost:7860` per accedere alla dashboard.
|
||||
|
||||
**Funzionalità della GUI:**
|
||||
|
||||
- **📊 Overview Tab**: Visualizza lo stato di tutte le partizioni con grafici e tabelle in tempo reale
|
||||
- **▶️ Start Migration Tab**: Avvia nuove migrazioni full con controlli per:
|
||||
- Selezione tabella (RAWDATACOR, ELABDATADISP, o entrambe)
|
||||
- Modalità parallela (numero di worker)
|
||||
- Resume da checkpoint
|
||||
- Dry-run mode
|
||||
- Migrazione di partizione singola
|
||||
- **🔄 Incremental Sync Tab**: Sincronizza solo i nuovi dati aggiunti dopo l'ultima migrazione full
|
||||
- Selezione tabella
|
||||
- Dry-run mode
|
||||
- Spiegazione di come funziona l'incremental
|
||||
- **📝 Logs Tab**: Visualizza i log in tempo reale delle migrazioni in corso
|
||||
- **⚠️ Error Logs Tab**: Esplora i file di log degli errori di validazione
|
||||
|
||||
**Vantaggi rispetto alla CLI:**
|
||||
- Monitoraggio visuale dello stato delle migrazioni
|
||||
- Grafici interattivi per statistiche e progress
|
||||
- Controllo centralizzato di più migrazioni
|
||||
- Nessun bisogno di terminali multipli
|
||||
- Accessibile anche da remoto con `--share`
|
||||
|
||||
#### Migrazione Completa (CLI)
|
||||
```bash
|
||||
# Migrare tutte le tabelle
|
||||
python main.py migrate full
|
||||
|
||||
35
main.py
35
main.py
@@ -219,5 +219,40 @@ def info():
|
||||
click.echo(f" Iterations: {settings.benchmark.iterations}")
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
"--port",
|
||||
type=int,
|
||||
default=7860,
|
||||
help="Port to run the web interface on (default: 7860)"
|
||||
)
|
||||
@click.option(
|
||||
"--share",
|
||||
is_flag=True,
|
||||
help="Create a public share link (useful for remote access)"
|
||||
)
|
||||
def web(port, share):
|
||||
"""Launch web-based GUI for migration monitoring and control."""
|
||||
setup_logger("") # Configure root logger to show all module logs
|
||||
|
||||
try:
|
||||
from web_ui import launch_ui
|
||||
|
||||
click.echo(f"\n🚀 Starting Migration Dashboard on http://localhost:{port}")
|
||||
if share:
|
||||
click.echo("📡 Creating public share link...")
|
||||
|
||||
launch_ui(share=share, server_port=port)
|
||||
|
||||
except ImportError as e:
|
||||
click.echo(f"✗ Failed to import web_ui module: {e}", err=True)
|
||||
click.echo("Make sure gradio is installed: uv sync", err=True)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
logger.error(f"Web interface failed: {e}")
|
||||
click.echo(f"✗ Web interface failed: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli(obj={})
|
||||
|
||||
@@ -13,4 +13,7 @@ dependencies = [
|
||||
"pydantic>=2.5.0",
|
||||
"pydantic-settings>=2.1.0",
|
||||
"cryptography>=46.0.3",
|
||||
"gradio>=4.0.0",
|
||||
"pandas>=2.0.0",
|
||||
"plotly>=5.0.0",
|
||||
]
|
||||
|
||||
@@ -695,6 +695,92 @@ class MySQLConnector:
|
||||
)
|
||||
self._reconnect()
|
||||
|
||||
def fetch_consolidation_keys_from_partition_after(
|
||||
self,
|
||||
table: str,
|
||||
partition: str,
|
||||
after_key: Optional[Dict[str, Any]] = None,
|
||||
limit: Optional[int] = None
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Fetch distinct consolidation keys from a specific partition after a specific key.
|
||||
|
||||
Optimized version for incremental migration that queries only one partition.
|
||||
|
||||
Query pattern:
|
||||
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||
FROM table PARTITION (partition_name)
|
||||
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
|
||||
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||
LIMIT X
|
||||
|
||||
Args:
|
||||
table: Table name (RAWDATACOR or ELABDATADISP)
|
||||
partition: Partition name (e.g., 'part8', 'd9')
|
||||
after_key: Start after this key (dict with unit_name, tool_name_id, event_date, event_time)
|
||||
limit: Number of keys to fetch (uses CONSOLIDATION_GROUP_LIMIT if None)
|
||||
|
||||
Returns:
|
||||
List of dicts with keys: UnitName, ToolNameID, EventDate, EventTime
|
||||
"""
|
||||
if table not in ("RAWDATACOR", "ELABDATADISP"):
|
||||
raise ValueError(f"Consolidation not supported for table {table}")
|
||||
|
||||
if limit is None:
|
||||
limit = self.settings.migration.consolidation_group_limit
|
||||
|
||||
retries = 0
|
||||
while retries < self.MAX_RETRIES:
|
||||
try:
|
||||
with self.connection.cursor() as cursor:
|
||||
if after_key:
|
||||
# Fetch keys AFTER the last migrated key from this specific partition
|
||||
query = f"""
|
||||
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||
FROM `{table}` PARTITION (`{partition}`)
|
||||
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s)
|
||||
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||
LIMIT %s
|
||||
"""
|
||||
cursor.execute(
|
||||
query,
|
||||
(
|
||||
after_key.get("unit_name"),
|
||||
after_key.get("tool_name_id"),
|
||||
after_key.get("event_date"),
|
||||
after_key.get("event_time"),
|
||||
limit
|
||||
)
|
||||
)
|
||||
else:
|
||||
# No after_key: fetch from beginning of partition
|
||||
query = f"""
|
||||
SELECT UnitName, ToolNameID, EventDate, EventTime
|
||||
FROM `{table}` PARTITION (`{partition}`)
|
||||
GROUP BY UnitName, ToolNameID, EventDate, EventTime
|
||||
ORDER BY UnitName, ToolNameID, EventDate, EventTime
|
||||
LIMIT %s
|
||||
"""
|
||||
cursor.execute(query, (limit,))
|
||||
|
||||
keys = cursor.fetchall()
|
||||
return keys
|
||||
|
||||
except pymysql.Error as e:
|
||||
retries += 1
|
||||
if retries >= self.MAX_RETRIES:
|
||||
logger.error(
|
||||
f"Failed to fetch consolidation keys from {table} PARTITION ({partition}) "
|
||||
f"(after_key={after_key}) after {self.MAX_RETRIES} retries: {e}"
|
||||
)
|
||||
raise
|
||||
else:
|
||||
logger.warning(
|
||||
f"Fetch consolidation keys from partition failed (retry {retries}/{self.MAX_RETRIES}): {e}"
|
||||
)
|
||||
self._reconnect()
|
||||
|
||||
def fetch_records_for_key_all_partitions(
|
||||
self,
|
||||
table: str,
|
||||
|
||||
@@ -81,52 +81,63 @@ class IncrementalMigrator:
|
||||
f"{last_key.get('event_date')}, {last_key.get('event_time')})"
|
||||
)
|
||||
|
||||
# Get max MySQL ID already migrated to optimize query performance
|
||||
cursor = pg_conn.connection.cursor()
|
||||
cursor.execute(f"SELECT MAX(mysql_max_id) FROM {pg_table}")
|
||||
result = cursor.fetchone()
|
||||
max_mysql_id = result[0] if result and result[0] else 0
|
||||
# Determine which partitions to process based on last_key's event_date
|
||||
# This is a MAJOR optimization: instead of querying the entire table,
|
||||
# we only process partitions from the last migrated year onwards
|
||||
from config import get_partitions_from_year, date_string_to_partition_name
|
||||
|
||||
logger.info(f"Max MySQL ID already migrated: {max_mysql_id}")
|
||||
last_event_date = last_key.get('event_date')
|
||||
if not last_event_date:
|
||||
logger.warning("Last key has no event_date, starting from 2014")
|
||||
partitions_to_process = mysql_conn.get_table_partitions(mysql_table)
|
||||
else:
|
||||
# Extract year from last_event_date and get partitions from that year onwards
|
||||
year = int(str(last_event_date)[:4]) if len(str(last_event_date)) >= 4 else 2014
|
||||
partitions_to_process = get_partitions_from_year(year, mysql_table)
|
||||
|
||||
logger.info(
|
||||
f"Optimized incremental sync: Processing only {len(partitions_to_process)} "
|
||||
f"partitions from year {year} onwards: {partitions_to_process}"
|
||||
)
|
||||
logger.info(
|
||||
f"Skipping partitions before year {year} (no new data possible there)"
|
||||
)
|
||||
|
||||
if dry_run:
|
||||
# In dry-run, check how many new keys exist in MySQL
|
||||
logger.info("[DRY RUN] Checking for new keys in MySQL...")
|
||||
logger.info("[DRY RUN] Checking for new keys across relevant partitions...")
|
||||
|
||||
# Sample first 100 keys to check if there are new records
|
||||
sample_keys = mysql_conn.fetch_consolidation_keys_after(
|
||||
mysql_table,
|
||||
after_key=last_key,
|
||||
min_mysql_id=max_mysql_id,
|
||||
limit=100,
|
||||
offset=0
|
||||
)
|
||||
total_new_keys = 0
|
||||
first_keys_found = []
|
||||
|
||||
if sample_keys:
|
||||
# If we found 100 keys in the sample, there might be many more
|
||||
# Try to get a rough count by checking larger offsets
|
||||
if len(sample_keys) == 100:
|
||||
# There are at least 100 keys, check if there are more
|
||||
logger.info(
|
||||
f"[DRY RUN] Found at least 100 new keys, checking total count..."
|
||||
)
|
||||
# Sample at different offsets to estimate total
|
||||
test_batch = mysql_conn.fetch_consolidation_keys_after(
|
||||
for partition in partitions_to_process:
|
||||
# For first partition (same year as last_key), use after_key
|
||||
# For subsequent partitions, start from beginning
|
||||
if partition == partitions_to_process[0]:
|
||||
sample_keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
|
||||
mysql_table,
|
||||
partition=partition,
|
||||
after_key=last_key,
|
||||
min_mysql_id=max_mysql_id,
|
||||
limit=1,
|
||||
offset=1000
|
||||
limit=100
|
||||
)
|
||||
if test_batch:
|
||||
logger.info(f"[DRY RUN] Estimated: More than 1000 new keys to migrate")
|
||||
else:
|
||||
logger.info(f"[DRY RUN] Estimated: Between 100-1000 new keys to migrate")
|
||||
logger.info(f"[DRY RUN] Partition {partition}: {len(sample_keys)} new keys (after last_key)")
|
||||
else:
|
||||
logger.info(f"[DRY RUN] Found {len(sample_keys)} new keys to migrate")
|
||||
sample_keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
|
||||
mysql_table,
|
||||
partition=partition,
|
||||
after_key=None, # All keys from this partition are new
|
||||
limit=100
|
||||
)
|
||||
logger.info(f"[DRY RUN] Partition {partition}: {len(sample_keys)} new keys (all new)")
|
||||
|
||||
total_new_keys += len(sample_keys)
|
||||
if sample_keys and len(first_keys_found) < 3:
|
||||
first_keys_found.extend(sample_keys[:3 - len(first_keys_found)])
|
||||
|
||||
if total_new_keys > 0:
|
||||
logger.info(f"[DRY RUN] Found at least {total_new_keys} new keys across {len(partitions_to_process)} partitions")
|
||||
logger.info("[DRY RUN] First 3 keys:")
|
||||
for i, key in enumerate(sample_keys[:3]):
|
||||
for i, key in enumerate(first_keys_found[:3]):
|
||||
logger.info(
|
||||
f" {i+1}. ({key.get('UnitName')}, {key.get('ToolNameID')}, "
|
||||
f"{key.get('EventDate')}, {key.get('EventTime')})"
|
||||
@@ -134,15 +145,13 @@ class IncrementalMigrator:
|
||||
logger.info(
|
||||
f"[DRY RUN] Run without --dry-run to perform actual migration"
|
||||
)
|
||||
# Return a positive number to indicate there's data to migrate
|
||||
return len(sample_keys)
|
||||
return total_new_keys
|
||||
else:
|
||||
logger.info("[DRY RUN] No new keys found - database is up to date")
|
||||
return 0
|
||||
|
||||
# Migrate new keys
|
||||
migrated_rows = 0
|
||||
offset = 0
|
||||
insert_buffer = []
|
||||
buffer_size = self.settings.migration.consolidation_group_limit // 10
|
||||
last_processed_key = None # Track last key for final state update
|
||||
@@ -155,121 +164,133 @@ class IncrementalMigrator:
|
||||
# Get column order for PostgreSQL insert
|
||||
pg_columns = self._get_pg_columns()
|
||||
|
||||
while True:
|
||||
# Fetch batch of consolidation keys AFTER last_key
|
||||
logger.debug(f"Fetching keys after last_key with offset={offset}")
|
||||
keys = mysql_conn.fetch_consolidation_keys_after(
|
||||
mysql_table,
|
||||
after_key=last_key,
|
||||
min_mysql_id=max_mysql_id,
|
||||
limit=self.settings.migration.consolidation_group_limit,
|
||||
offset=offset
|
||||
# Process each partition
|
||||
for partition_idx, partition in enumerate(partitions_to_process, 1):
|
||||
logger.info(
|
||||
f"[{partition_idx}/{len(partitions_to_process)}] "
|
||||
f"Processing partition {partition}..."
|
||||
)
|
||||
|
||||
if not keys:
|
||||
logger.info("No more new keys to migrate")
|
||||
break
|
||||
# For first partition (same year as last_key), fetch keys AFTER last_key
|
||||
# For subsequent partitions, fetch ALL keys (they're all new)
|
||||
use_after_key = last_key if partition == partitions_to_process[0] else None
|
||||
|
||||
logger.info(f"Processing {len(keys)} new keys (offset={offset})")
|
||||
|
||||
# Process each consolidation key
|
||||
keys_processed = 0
|
||||
for key in keys:
|
||||
keys_processed += 1
|
||||
# Log progress every 1000 keys
|
||||
if keys_processed % 1000 == 0:
|
||||
logger.info(f" Processed {keys_processed}/{len(keys)} keys in this batch...")
|
||||
unit_name = key.get("UnitName")
|
||||
tool_name_id = key.get("ToolNameID")
|
||||
event_date = key.get("EventDate")
|
||||
event_time = key.get("EventTime")
|
||||
|
||||
# Validate consolidation key before fetching
|
||||
is_valid, error_reason = validate_consolidation_key(
|
||||
unit_name, tool_name_id, event_date, event_time
|
||||
while True:
|
||||
# Fetch batch of consolidation keys from this partition
|
||||
keys = mysql_conn.fetch_consolidation_keys_from_partition_after(
|
||||
mysql_table,
|
||||
partition=partition,
|
||||
after_key=use_after_key,
|
||||
limit=self.settings.migration.consolidation_group_limit
|
||||
)
|
||||
|
||||
if not is_valid:
|
||||
# Log invalid key and skip
|
||||
error_logger.log_invalid_key(
|
||||
unit_name, tool_name_id, event_date, event_time, error_reason
|
||||
)
|
||||
continue
|
||||
if not keys:
|
||||
logger.info(f" No more keys in partition {partition}")
|
||||
break
|
||||
|
||||
# Fetch all MySQL rows for this key (all nodes, all partitions)
|
||||
try:
|
||||
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
|
||||
mysql_table,
|
||||
unit_name,
|
||||
tool_name_id,
|
||||
event_date,
|
||||
event_time
|
||||
)
|
||||
except Exception as e:
|
||||
# Log corrupted key that caused fetch error
|
||||
error_logger.log_invalid_key(
|
||||
unit_name, tool_name_id, event_date, event_time,
|
||||
f"Fetch failed: {e}"
|
||||
)
|
||||
continue
|
||||
logger.info(f" Processing {len(keys)} keys from {partition}...")
|
||||
|
||||
if not mysql_rows:
|
||||
logger.warning(
|
||||
f"No records found for key: "
|
||||
f"({unit_name}, {tool_name_id}, {event_date}, {event_time})"
|
||||
)
|
||||
continue
|
||||
# Process each consolidation key
|
||||
keys_processed = 0
|
||||
for key in keys:
|
||||
keys_processed += 1
|
||||
# Log progress every 1000 keys
|
||||
if keys_processed % 1000 == 0:
|
||||
logger.info(f" Processed {keys_processed}/{len(keys)} keys in this batch...")
|
||||
unit_name = key.get("UnitName")
|
||||
tool_name_id = key.get("ToolNameID")
|
||||
event_date = key.get("EventDate")
|
||||
event_time = key.get("EventTime")
|
||||
|
||||
# Consolidate into single PostgreSQL row
|
||||
try:
|
||||
pg_row = consolidate_rows(self.table, mysql_rows)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to consolidate key "
|
||||
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Add to insert buffer
|
||||
insert_buffer.append(pg_row)
|
||||
|
||||
# Track last processed key
|
||||
last_processed_key = {
|
||||
"unit_name": unit_name,
|
||||
"tool_name_id": tool_name_id,
|
||||
"event_date": str(event_date) if event_date else None,
|
||||
"event_time": str(event_time) if event_time else None,
|
||||
}
|
||||
|
||||
# Flush buffer when full
|
||||
if len(insert_buffer) >= buffer_size:
|
||||
# Use COPY with ON CONFLICT to handle duplicates
|
||||
inserted = pg_conn.copy_from_with_conflict(
|
||||
pg_table,
|
||||
insert_buffer,
|
||||
pg_columns,
|
||||
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
|
||||
)
|
||||
migrated_rows += inserted
|
||||
progress.update(inserted)
|
||||
|
||||
# Update state with last key (from tracked variable)
|
||||
state_mgr.update_state(
|
||||
last_key=last_processed_key,
|
||||
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
||||
# Validate consolidation key before fetching
|
||||
is_valid, error_reason = validate_consolidation_key(
|
||||
unit_name, tool_name_id, event_date, event_time
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Flushed {inserted} rows, total new: {migrated_rows}"
|
||||
)
|
||||
insert_buffer = []
|
||||
if not is_valid:
|
||||
# Log invalid key and skip
|
||||
error_logger.log_invalid_key(
|
||||
unit_name, tool_name_id, event_date, event_time, error_reason
|
||||
)
|
||||
continue
|
||||
|
||||
# Move to next batch of keys
|
||||
offset += len(keys)
|
||||
# Fetch all MySQL rows for this key (all nodes, all partitions)
|
||||
try:
|
||||
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
|
||||
mysql_table,
|
||||
unit_name,
|
||||
tool_name_id,
|
||||
event_date,
|
||||
event_time
|
||||
)
|
||||
except Exception as e:
|
||||
# Log corrupted key that caused fetch error
|
||||
error_logger.log_invalid_key(
|
||||
unit_name, tool_name_id, event_date, event_time,
|
||||
f"Fetch failed: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
# If we got fewer keys than requested, we're done
|
||||
if len(keys) < self.settings.migration.consolidation_group_limit:
|
||||
break
|
||||
if not mysql_rows:
|
||||
logger.warning(
|
||||
f"No records found for key: "
|
||||
f"({unit_name}, {tool_name_id}, {event_date}, {event_time})"
|
||||
)
|
||||
continue
|
||||
|
||||
# Consolidate into single PostgreSQL row
|
||||
try:
|
||||
pg_row = consolidate_rows(self.table, mysql_rows)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to consolidate key "
|
||||
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Add to insert buffer
|
||||
insert_buffer.append(pg_row)
|
||||
|
||||
# Track last processed key
|
||||
last_processed_key = {
|
||||
"unit_name": unit_name,
|
||||
"tool_name_id": tool_name_id,
|
||||
"event_date": str(event_date) if event_date else None,
|
||||
"event_time": str(event_time) if event_time else None,
|
||||
}
|
||||
|
||||
# Flush buffer when full
|
||||
if len(insert_buffer) >= buffer_size:
|
||||
# Use COPY with ON CONFLICT to handle duplicates
|
||||
inserted = pg_conn.copy_from_with_conflict(
|
||||
pg_table,
|
||||
insert_buffer,
|
||||
pg_columns,
|
||||
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
|
||||
)
|
||||
migrated_rows += inserted
|
||||
progress.update(inserted)
|
||||
|
||||
# Update state with last key (from tracked variable)
|
||||
state_mgr.update_state(
|
||||
last_key=last_processed_key,
|
||||
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Flushed {inserted} rows, total new: {migrated_rows}"
|
||||
)
|
||||
insert_buffer = []
|
||||
|
||||
# After processing all keys in batch, update use_after_key for next iteration
|
||||
if keys:
|
||||
last_key_in_batch = keys[-1]
|
||||
use_after_key = {
|
||||
"unit_name": last_key_in_batch.get("UnitName"),
|
||||
"tool_name_id": last_key_in_batch.get("ToolNameID"),
|
||||
"event_date": str(last_key_in_batch.get("EventDate")) if last_key_in_batch.get("EventDate") else None,
|
||||
"event_time": str(last_key_in_batch.get("EventTime")) if last_key_in_batch.get("EventTime") else None,
|
||||
}
|
||||
|
||||
# Flush remaining buffer
|
||||
if insert_buffer:
|
||||
|
||||
593
web_ui.py
Normal file
593
web_ui.py
Normal file
@@ -0,0 +1,593 @@
|
||||
"""Web UI for MySQL to PostgreSQL migration monitoring and control.
|
||||
|
||||
Provides a Gradio-based interface for:
|
||||
- Viewing migration status and progress
|
||||
- Starting/monitoring migrations
|
||||
- Viewing logs and errors
|
||||
- Performance metrics and graphs
|
||||
"""
|
||||
import gradio as gr
|
||||
import pandas as pd
|
||||
import plotly.graph_objects as go
|
||||
import plotly.express as px
|
||||
from pathlib import Path
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, List, Tuple
|
||||
import logging
|
||||
|
||||
from config import get_settings
|
||||
from src.connectors.postgres_connector import PostgreSQLConnector
|
||||
from src.migrator.full_migrator import run_full_migration
|
||||
from src.migrator.parallel_migrator import run_parallel_migration
|
||||
from src.utils.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Global state for tracking running migrations
|
||||
running_migrations = {}
|
||||
migration_logs = {}
|
||||
|
||||
|
||||
def get_migration_state_df() -> pd.DataFrame:
|
||||
"""Fetch current migration state from PostgreSQL.
|
||||
|
||||
Returns:
|
||||
DataFrame with columns: table_name, partition_name, status,
|
||||
total_rows_migrated, migration_started_at, migration_completed_at
|
||||
"""
|
||||
try:
|
||||
with PostgreSQLConnector() as pg_conn:
|
||||
with pg_conn.connection.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT
|
||||
table_name,
|
||||
partition_name,
|
||||
status,
|
||||
total_rows_migrated,
|
||||
migration_started_at,
|
||||
migration_completed_at,
|
||||
last_key
|
||||
FROM migration_state
|
||||
WHERE partition_name != '_global'
|
||||
ORDER BY table_name, partition_name
|
||||
""")
|
||||
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
return pd.DataFrame(columns=[
|
||||
'Table', 'Partition', 'Status', 'Rows Migrated',
|
||||
'Started At', 'Completed At'
|
||||
])
|
||||
|
||||
df = pd.DataFrame(rows, columns=[
|
||||
'Table', 'Partition', 'Status', 'Rows Migrated',
|
||||
'Started At', 'Completed At', 'Last Key'
|
||||
])
|
||||
|
||||
# Format dates
|
||||
df['Started At'] = pd.to_datetime(df['Started At']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||
df['Completed At'] = pd.to_datetime(df['Completed At']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||
df['Completed At'] = df['Completed At'].fillna('-')
|
||||
|
||||
# Drop Last Key column for display (too verbose)
|
||||
df = df.drop('Last Key', axis=1)
|
||||
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch migration state: {e}")
|
||||
return pd.DataFrame(columns=[
|
||||
'Table', 'Partition', 'Status', 'Rows Migrated',
|
||||
'Started At', 'Completed At'
|
||||
])
|
||||
|
||||
|
||||
def get_migration_summary() -> Dict[str, any]:
|
||||
"""Get summary statistics for migrations.
|
||||
|
||||
Returns:
|
||||
Dict with total/completed/in_progress/pending counts per table
|
||||
"""
|
||||
try:
|
||||
with PostgreSQLConnector() as pg_conn:
|
||||
with pg_conn.connection.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT
|
||||
table_name,
|
||||
status,
|
||||
COUNT(*) as count,
|
||||
SUM(total_rows_migrated) as total_rows
|
||||
FROM migration_state
|
||||
WHERE partition_name != '_global'
|
||||
GROUP BY table_name, status
|
||||
ORDER BY table_name, status
|
||||
""")
|
||||
|
||||
rows = cursor.fetchall()
|
||||
|
||||
summary = {
|
||||
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
|
||||
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
|
||||
}
|
||||
|
||||
for table, status, count, total_rows in rows:
|
||||
table_upper = table.upper()
|
||||
if table_upper in summary:
|
||||
summary[table_upper][status] = count
|
||||
summary[table_upper]['total_rows'] += total_rows or 0
|
||||
|
||||
return summary
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch migration summary: {e}")
|
||||
return {
|
||||
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
|
||||
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
|
||||
}
|
||||
|
||||
|
||||
def create_status_chart() -> go.Figure:
|
||||
"""Create a bar chart showing migration status by table.
|
||||
|
||||
Returns:
|
||||
Plotly figure
|
||||
"""
|
||||
summary = get_migration_summary()
|
||||
|
||||
tables = []
|
||||
completed = []
|
||||
in_progress = []
|
||||
pending = []
|
||||
|
||||
for table, stats in summary.items():
|
||||
tables.append(table)
|
||||
completed.append(stats['completed'])
|
||||
in_progress.append(stats['in_progress'])
|
||||
pending.append(stats['pending'])
|
||||
|
||||
fig = go.Figure(data=[
|
||||
go.Bar(name='Completed', x=tables, y=completed, marker_color='green'),
|
||||
go.Bar(name='In Progress', x=tables, y=in_progress, marker_color='orange'),
|
||||
go.Bar(name='Pending', x=tables, y=pending, marker_color='gray')
|
||||
])
|
||||
|
||||
fig.update_layout(
|
||||
barmode='stack',
|
||||
title='Migration Status by Table',
|
||||
xaxis_title='Table',
|
||||
yaxis_title='Number of Partitions',
|
||||
height=400
|
||||
)
|
||||
|
||||
return fig
|
||||
|
||||
|
||||
def get_error_logs() -> List[Tuple[str, str]]:
|
||||
"""Get list of error log files with their paths.
|
||||
|
||||
Returns:
|
||||
List of (filename, filepath) tuples
|
||||
"""
|
||||
error_logs = []
|
||||
for log_file in Path('.').glob('migration_errors_*.log'):
|
||||
error_logs.append((log_file.name, str(log_file)))
|
||||
|
||||
return sorted(error_logs, key=lambda x: x[0], reverse=True)
|
||||
|
||||
|
||||
def read_error_log(log_file: str) -> str:
|
||||
"""Read contents of an error log file.
|
||||
|
||||
Args:
|
||||
log_file: Path to log file
|
||||
|
||||
Returns:
|
||||
Contents of log file
|
||||
"""
|
||||
try:
|
||||
if not log_file or log_file == "Select a log file...":
|
||||
return "No log file selected"
|
||||
|
||||
with open(log_file, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
if not content:
|
||||
return f"Log file {log_file} is empty (no errors logged)"
|
||||
|
||||
return content
|
||||
|
||||
except Exception as e:
|
||||
return f"Error reading log file: {e}"
|
||||
|
||||
|
||||
def start_migration_task(
|
||||
table: str,
|
||||
parallel_workers: int,
|
||||
resume: bool,
|
||||
dry_run: bool,
|
||||
partition: Optional[str] = None
|
||||
) -> str:
|
||||
"""Start a migration in a background thread.
|
||||
|
||||
Args:
|
||||
table: Table name (RAWDATACOR, ELABDATADISP, or all)
|
||||
parallel_workers: Number of parallel workers (0 = sequential)
|
||||
resume: Whether to resume from last checkpoint
|
||||
dry_run: Whether to run in dry-run mode
|
||||
partition: Optional partition name for single partition migration
|
||||
|
||||
Returns:
|
||||
Status message
|
||||
"""
|
||||
task_id = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
if task_id in running_migrations:
|
||||
return f"❌ Migration already running for {table}"
|
||||
|
||||
# Validate options
|
||||
if parallel_workers > 0 and partition:
|
||||
return "❌ Cannot use parallel workers with single partition mode"
|
||||
|
||||
# Initialize log buffer for this task
|
||||
migration_logs[task_id] = []
|
||||
|
||||
def run_migration():
|
||||
"""Worker function to run migration."""
|
||||
try:
|
||||
migration_logs[task_id].append(f"Starting migration for {table}...")
|
||||
migration_logs[task_id].append(f"Mode: {'Parallel' if parallel_workers > 0 else 'Sequential'}")
|
||||
migration_logs[task_id].append(f"Resume: {resume}, Dry-run: {dry_run}")
|
||||
|
||||
if parallel_workers > 0:
|
||||
# Parallel migration
|
||||
rows = run_parallel_migration(
|
||||
table,
|
||||
num_workers=parallel_workers,
|
||||
dry_run=dry_run,
|
||||
resume=resume
|
||||
)
|
||||
else:
|
||||
# Sequential migration
|
||||
rows = run_full_migration(
|
||||
table,
|
||||
dry_run=dry_run,
|
||||
resume=resume,
|
||||
partition=partition
|
||||
)
|
||||
|
||||
migration_logs[task_id].append(f"✓ Migration complete: {rows:,} rows migrated")
|
||||
running_migrations[task_id] = 'completed'
|
||||
|
||||
except Exception as e:
|
||||
migration_logs[task_id].append(f"❌ Migration failed: {e}")
|
||||
running_migrations[task_id] = 'failed'
|
||||
logger.error(f"Migration failed: {e}")
|
||||
|
||||
# Start migration in background thread
|
||||
thread = threading.Thread(target=run_migration, daemon=True)
|
||||
thread.start()
|
||||
|
||||
running_migrations[task_id] = 'running'
|
||||
|
||||
return f"✓ Migration started: {task_id}\nCheck the 'Logs' tab for progress"
|
||||
|
||||
|
||||
def start_incremental_migration_task(
|
||||
table: str,
|
||||
dry_run: bool
|
||||
) -> str:
|
||||
"""Start an incremental migration in a background thread.
|
||||
|
||||
Args:
|
||||
table: Table name (RAWDATACOR, ELABDATADISP, or all)
|
||||
dry_run: Whether to run in dry-run mode
|
||||
|
||||
Returns:
|
||||
Status message
|
||||
"""
|
||||
task_id = f"incremental_{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
if task_id in running_migrations:
|
||||
return f"❌ Incremental migration already running for {table}"
|
||||
|
||||
# Initialize log buffer for this task
|
||||
migration_logs[task_id] = []
|
||||
|
||||
def run_migration():
|
||||
"""Worker function to run incremental migration."""
|
||||
try:
|
||||
migration_logs[task_id].append(f"Starting INCREMENTAL migration for {table}...")
|
||||
migration_logs[task_id].append(f"Dry-run: {dry_run}")
|
||||
migration_logs[task_id].append("This will sync only NEW data added since last migration")
|
||||
|
||||
from src.migrator.incremental_migrator import run_incremental_migration
|
||||
|
||||
rows = run_incremental_migration(
|
||||
table,
|
||||
dry_run=dry_run
|
||||
)
|
||||
|
||||
migration_logs[task_id].append(f"✓ Incremental migration complete: {rows:,} rows migrated")
|
||||
running_migrations[task_id] = 'completed'
|
||||
|
||||
except Exception as e:
|
||||
migration_logs[task_id].append(f"❌ Incremental migration failed: {e}")
|
||||
running_migrations[task_id] = 'failed'
|
||||
logger.error(f"Incremental migration failed: {e}")
|
||||
|
||||
# Start migration in background thread
|
||||
thread = threading.Thread(target=run_migration, daemon=True)
|
||||
thread.start()
|
||||
|
||||
running_migrations[task_id] = 'running'
|
||||
|
||||
return f"✓ Incremental migration started: {task_id}\nCheck the 'Logs' tab for progress"
|
||||
|
||||
|
||||
def get_migration_logs(task_id: str) -> str:
|
||||
"""Get logs for a specific migration task.
|
||||
|
||||
Args:
|
||||
task_id: Task identifier
|
||||
|
||||
Returns:
|
||||
Log contents
|
||||
"""
|
||||
if not task_id or task_id == "No migrations running":
|
||||
return "Select a migration to view logs"
|
||||
|
||||
logs = migration_logs.get(task_id, [])
|
||||
if not logs:
|
||||
return f"No logs available for {task_id}"
|
||||
|
||||
return "\n".join(logs)
|
||||
|
||||
|
||||
def refresh_migration_state():
|
||||
"""Refresh migration state display."""
|
||||
return get_migration_state_df()
|
||||
|
||||
|
||||
def refresh_status_chart():
|
||||
"""Refresh status chart."""
|
||||
return create_status_chart()
|
||||
|
||||
|
||||
def refresh_running_migrations() -> gr.Dropdown:
|
||||
"""Refresh list of running migrations."""
|
||||
if not running_migrations:
|
||||
return gr.Dropdown(choices=["No migrations running"], value="No migrations running")
|
||||
|
||||
choices = list(running_migrations.keys())
|
||||
return gr.Dropdown(choices=choices, value=choices[0] if choices else None)
|
||||
|
||||
|
||||
# Build Gradio interface
|
||||
with gr.Blocks(title="MySQL to PostgreSQL Migration", theme=gr.themes.Soft()) as demo:
|
||||
gr.Markdown("# 🔄 MySQL to PostgreSQL Migration Dashboard")
|
||||
gr.Markdown("Monitor and control data migration from MySQL to PostgreSQL")
|
||||
|
||||
with gr.Tabs():
|
||||
# Tab 1: Overview
|
||||
with gr.Tab("📊 Overview"):
|
||||
gr.Markdown("## Migration Status Overview")
|
||||
|
||||
with gr.Row():
|
||||
refresh_btn = gr.Button("🔄 Refresh", scale=1)
|
||||
|
||||
with gr.Row():
|
||||
status_chart = gr.Plot(label="Partition Status by Table")
|
||||
|
||||
with gr.Row():
|
||||
state_table = gr.Dataframe(
|
||||
value=get_migration_state_df(),
|
||||
label="Migration State (All Partitions)",
|
||||
interactive=False,
|
||||
wrap=True
|
||||
)
|
||||
|
||||
# Auto-refresh every 10 seconds
|
||||
refresh_btn.click(
|
||||
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
|
||||
outputs=[state_table, status_chart]
|
||||
)
|
||||
|
||||
# Initial load
|
||||
demo.load(
|
||||
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
|
||||
outputs=[state_table, status_chart]
|
||||
)
|
||||
|
||||
# Tab 2: Start Migration
|
||||
with gr.Tab("▶️ Start Migration"):
|
||||
gr.Markdown("## Start New Migration")
|
||||
|
||||
with gr.Row():
|
||||
table_select = gr.Dropdown(
|
||||
choices=["RAWDATACOR", "ELABDATADISP", "all"],
|
||||
value="RAWDATACOR",
|
||||
label="Table to Migrate"
|
||||
)
|
||||
|
||||
partition_input = gr.Textbox(
|
||||
label="Partition (optional)",
|
||||
placeholder="Leave empty for all partitions, or enter e.g. 'part7', 'd9'",
|
||||
value=""
|
||||
)
|
||||
|
||||
with gr.Row():
|
||||
parallel_slider = gr.Slider(
|
||||
minimum=0,
|
||||
maximum=10,
|
||||
value=0,
|
||||
step=1,
|
||||
label="Parallel Workers (0 = sequential)"
|
||||
)
|
||||
|
||||
with gr.Row():
|
||||
resume_check = gr.Checkbox(label="Resume from last checkpoint", value=True)
|
||||
dry_run_check = gr.Checkbox(label="Dry run (simulate without writing)", value=False)
|
||||
|
||||
start_btn = gr.Button("▶️ Start Migration", variant="primary", size="lg")
|
||||
|
||||
migration_output = gr.Textbox(
|
||||
label="Migration Status",
|
||||
lines=3,
|
||||
interactive=False
|
||||
)
|
||||
|
||||
start_btn.click(
|
||||
fn=start_migration_task,
|
||||
inputs=[table_select, parallel_slider, resume_check, dry_run_check, partition_input],
|
||||
outputs=migration_output
|
||||
)
|
||||
|
||||
# Tab 3: Incremental Migration
|
||||
with gr.Tab("🔄 Incremental Sync"):
|
||||
gr.Markdown("## Incremental Migration (Sync New Data)")
|
||||
gr.Markdown("""
|
||||
Questa modalità sincronizza **solo i dati nuovi** aggiunti dopo l'ultima migrazione full.
|
||||
|
||||
**Come funziona:**
|
||||
- Trova le nuove consolidation keys in MySQL che non esistono ancora in PostgreSQL
|
||||
- Migra solo quelle chiavi (non riprocessa dati già migrati)
|
||||
- Usa `migration_state` per tracciare l'ultima chiave processata
|
||||
|
||||
**Quando usare:**
|
||||
- Dopo aver completato una migrazione full di tutte le partizioni
|
||||
- Per sincronizzare periodicamente nuovi dati senza rifare tutto
|
||||
- Per aggiornamenti quotidiani/settimanali
|
||||
""")
|
||||
|
||||
with gr.Row():
|
||||
inc_table_select = gr.Dropdown(
|
||||
choices=["RAWDATACOR", "ELABDATADISP", "all"],
|
||||
value="RAWDATACOR",
|
||||
label="Table to Sync"
|
||||
)
|
||||
|
||||
with gr.Row():
|
||||
inc_dry_run_check = gr.Checkbox(
|
||||
label="Dry run (simulate without writing)",
|
||||
value=False
|
||||
)
|
||||
|
||||
inc_start_btn = gr.Button("🔄 Start Incremental Sync", variant="primary", size="lg")
|
||||
|
||||
inc_migration_output = gr.Textbox(
|
||||
label="Sync Status",
|
||||
lines=3,
|
||||
interactive=False
|
||||
)
|
||||
|
||||
inc_start_btn.click(
|
||||
fn=start_incremental_migration_task,
|
||||
inputs=[inc_table_select, inc_dry_run_check],
|
||||
outputs=inc_migration_output
|
||||
)
|
||||
|
||||
# Tab 4: Logs
|
||||
with gr.Tab("📝 Logs"):
|
||||
gr.Markdown("## Migration Logs")
|
||||
|
||||
with gr.Row():
|
||||
running_migrations_dropdown = gr.Dropdown(
|
||||
choices=["No migrations running"],
|
||||
value="No migrations running",
|
||||
label="Select Migration",
|
||||
interactive=True
|
||||
)
|
||||
refresh_migrations_btn = gr.Button("🔄 Refresh List")
|
||||
|
||||
log_output = gr.Textbox(
|
||||
label="Log Output",
|
||||
lines=20,
|
||||
interactive=False,
|
||||
max_lines=50
|
||||
)
|
||||
|
||||
refresh_migrations_btn.click(
|
||||
fn=refresh_running_migrations,
|
||||
outputs=running_migrations_dropdown
|
||||
)
|
||||
|
||||
running_migrations_dropdown.change(
|
||||
fn=get_migration_logs,
|
||||
inputs=running_migrations_dropdown,
|
||||
outputs=log_output
|
||||
)
|
||||
|
||||
# Tab 5: Error Logs
|
||||
with gr.Tab("⚠️ Error Logs"):
|
||||
gr.Markdown("## Error Log Viewer")
|
||||
gr.Markdown("View logged validation errors (invalid consolidation keys)")
|
||||
|
||||
with gr.Row():
|
||||
error_log_dropdown = gr.Dropdown(
|
||||
choices=[x[0] for x in get_error_logs()] if get_error_logs() else ["No error logs found"],
|
||||
label="Select Error Log File",
|
||||
value="Select a log file..."
|
||||
)
|
||||
refresh_error_logs_btn = gr.Button("🔄 Refresh")
|
||||
|
||||
error_log_content = gr.Textbox(
|
||||
label="Error Log Contents",
|
||||
lines=20,
|
||||
interactive=False,
|
||||
max_lines=50
|
||||
)
|
||||
|
||||
def refresh_error_log_list():
|
||||
logs = get_error_logs()
|
||||
choices = [x[0] for x in logs] if logs else ["No error logs found"]
|
||||
return gr.Dropdown(choices=choices)
|
||||
|
||||
def show_error_log(filename):
|
||||
if not filename or filename == "Select a log file..." or filename == "No error logs found":
|
||||
return "Select a log file to view its contents"
|
||||
return read_error_log(filename)
|
||||
|
||||
refresh_error_logs_btn.click(
|
||||
fn=refresh_error_log_list,
|
||||
outputs=error_log_dropdown
|
||||
)
|
||||
|
||||
error_log_dropdown.change(
|
||||
fn=show_error_log,
|
||||
inputs=error_log_dropdown,
|
||||
outputs=error_log_content
|
||||
)
|
||||
|
||||
|
||||
def launch_ui(share=False, server_port=7860):
|
||||
"""Launch the Gradio UI.
|
||||
|
||||
Args:
|
||||
share: Whether to create a public share link
|
||||
server_port: Port to run the server on
|
||||
"""
|
||||
demo.launch(
|
||||
share=share,
|
||||
server_port=server_port,
|
||||
server_name="0.0.0.0" # Listen on all interfaces
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
# Parse command line args
|
||||
share = "--share" in sys.argv
|
||||
port = 7860
|
||||
|
||||
for arg in sys.argv:
|
||||
if arg.startswith("--port="):
|
||||
port = int(arg.split("=")[1])
|
||||
|
||||
print(f"Starting Migration Dashboard on port {port}...")
|
||||
print(f"Share mode: {share}")
|
||||
|
||||
launch_ui(share=share, server_port=port)
|
||||
Reference in New Issue
Block a user