594 lines
19 KiB
Python
594 lines
19 KiB
Python
"""Web UI for MySQL to PostgreSQL migration monitoring and control.
|
|
|
|
Provides a Gradio-based interface for:
|
|
- Viewing migration status and progress
|
|
- Starting/monitoring migrations
|
|
- Viewing logs and errors
|
|
- Performance metrics and graphs
|
|
"""
|
|
import gradio as gr
|
|
import pandas as pd
|
|
import plotly.graph_objects as go
|
|
import plotly.express as px
|
|
from pathlib import Path
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, List, Tuple
|
|
import logging
|
|
|
|
from config import get_settings
|
|
from src.connectors.postgres_connector import PostgreSQLConnector
|
|
from src.migrator.full_migrator import run_full_migration
|
|
from src.migrator.parallel_migrator import run_parallel_migration
|
|
from src.utils.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# Global state for tracking running migrations
|
|
running_migrations = {}
|
|
migration_logs = {}
|
|
|
|
|
|
def get_migration_state_df() -> pd.DataFrame:
|
|
"""Fetch current migration state from PostgreSQL.
|
|
|
|
Returns:
|
|
DataFrame with columns: table_name, partition_name, status,
|
|
total_rows_migrated, migration_started_at, migration_completed_at
|
|
"""
|
|
try:
|
|
with PostgreSQLConnector() as pg_conn:
|
|
with pg_conn.connection.cursor() as cursor:
|
|
cursor.execute("""
|
|
SELECT
|
|
table_name,
|
|
partition_name,
|
|
status,
|
|
total_rows_migrated,
|
|
migration_started_at,
|
|
migration_completed_at,
|
|
last_key
|
|
FROM migration_state
|
|
WHERE partition_name != '_global'
|
|
ORDER BY table_name, partition_name
|
|
""")
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
if not rows:
|
|
return pd.DataFrame(columns=[
|
|
'Table', 'Partition', 'Status', 'Rows Migrated',
|
|
'Started At', 'Completed At'
|
|
])
|
|
|
|
df = pd.DataFrame(rows, columns=[
|
|
'Table', 'Partition', 'Status', 'Rows Migrated',
|
|
'Started At', 'Completed At', 'Last Key'
|
|
])
|
|
|
|
# Format dates
|
|
df['Started At'] = pd.to_datetime(df['Started At']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
df['Completed At'] = pd.to_datetime(df['Completed At']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
df['Completed At'] = df['Completed At'].fillna('-')
|
|
|
|
# Drop Last Key column for display (too verbose)
|
|
df = df.drop('Last Key', axis=1)
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch migration state: {e}")
|
|
return pd.DataFrame(columns=[
|
|
'Table', 'Partition', 'Status', 'Rows Migrated',
|
|
'Started At', 'Completed At'
|
|
])
|
|
|
|
|
|
def get_migration_summary() -> Dict[str, any]:
|
|
"""Get summary statistics for migrations.
|
|
|
|
Returns:
|
|
Dict with total/completed/in_progress/pending counts per table
|
|
"""
|
|
try:
|
|
with PostgreSQLConnector() as pg_conn:
|
|
with pg_conn.connection.cursor() as cursor:
|
|
cursor.execute("""
|
|
SELECT
|
|
table_name,
|
|
status,
|
|
COUNT(*) as count,
|
|
SUM(total_rows_migrated) as total_rows
|
|
FROM migration_state
|
|
WHERE partition_name != '_global'
|
|
GROUP BY table_name, status
|
|
ORDER BY table_name, status
|
|
""")
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
summary = {
|
|
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
|
|
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
|
|
}
|
|
|
|
for table, status, count, total_rows in rows:
|
|
table_upper = table.upper()
|
|
if table_upper in summary:
|
|
summary[table_upper][status] = count
|
|
summary[table_upper]['total_rows'] += total_rows or 0
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch migration summary: {e}")
|
|
return {
|
|
'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0},
|
|
'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}
|
|
}
|
|
|
|
|
|
def create_status_chart() -> go.Figure:
|
|
"""Create a bar chart showing migration status by table.
|
|
|
|
Returns:
|
|
Plotly figure
|
|
"""
|
|
summary = get_migration_summary()
|
|
|
|
tables = []
|
|
completed = []
|
|
in_progress = []
|
|
pending = []
|
|
|
|
for table, stats in summary.items():
|
|
tables.append(table)
|
|
completed.append(stats['completed'])
|
|
in_progress.append(stats['in_progress'])
|
|
pending.append(stats['pending'])
|
|
|
|
fig = go.Figure(data=[
|
|
go.Bar(name='Completed', x=tables, y=completed, marker_color='green'),
|
|
go.Bar(name='In Progress', x=tables, y=in_progress, marker_color='orange'),
|
|
go.Bar(name='Pending', x=tables, y=pending, marker_color='gray')
|
|
])
|
|
|
|
fig.update_layout(
|
|
barmode='stack',
|
|
title='Migration Status by Table',
|
|
xaxis_title='Table',
|
|
yaxis_title='Number of Partitions',
|
|
height=400
|
|
)
|
|
|
|
return fig
|
|
|
|
|
|
def get_error_logs() -> List[Tuple[str, str]]:
|
|
"""Get list of error log files with their paths.
|
|
|
|
Returns:
|
|
List of (filename, filepath) tuples
|
|
"""
|
|
error_logs = []
|
|
for log_file in Path('.').glob('migration_errors_*.log'):
|
|
error_logs.append((log_file.name, str(log_file)))
|
|
|
|
return sorted(error_logs, key=lambda x: x[0], reverse=True)
|
|
|
|
|
|
def read_error_log(log_file: str) -> str:
|
|
"""Read contents of an error log file.
|
|
|
|
Args:
|
|
log_file: Path to log file
|
|
|
|
Returns:
|
|
Contents of log file
|
|
"""
|
|
try:
|
|
if not log_file or log_file == "Select a log file...":
|
|
return "No log file selected"
|
|
|
|
with open(log_file, 'r') as f:
|
|
content = f.read()
|
|
|
|
if not content:
|
|
return f"Log file {log_file} is empty (no errors logged)"
|
|
|
|
return content
|
|
|
|
except Exception as e:
|
|
return f"Error reading log file: {e}"
|
|
|
|
|
|
def start_migration_task(
|
|
table: str,
|
|
parallel_workers: int,
|
|
resume: bool,
|
|
dry_run: bool,
|
|
partition: Optional[str] = None
|
|
) -> str:
|
|
"""Start a migration in a background thread.
|
|
|
|
Args:
|
|
table: Table name (RAWDATACOR, ELABDATADISP, or all)
|
|
parallel_workers: Number of parallel workers (0 = sequential)
|
|
resume: Whether to resume from last checkpoint
|
|
dry_run: Whether to run in dry-run mode
|
|
partition: Optional partition name for single partition migration
|
|
|
|
Returns:
|
|
Status message
|
|
"""
|
|
task_id = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
if task_id in running_migrations:
|
|
return f"❌ Migration already running for {table}"
|
|
|
|
# Validate options
|
|
if parallel_workers > 0 and partition:
|
|
return "❌ Cannot use parallel workers with single partition mode"
|
|
|
|
# Initialize log buffer for this task
|
|
migration_logs[task_id] = []
|
|
|
|
def run_migration():
|
|
"""Worker function to run migration."""
|
|
try:
|
|
migration_logs[task_id].append(f"Starting migration for {table}...")
|
|
migration_logs[task_id].append(f"Mode: {'Parallel' if parallel_workers > 0 else 'Sequential'}")
|
|
migration_logs[task_id].append(f"Resume: {resume}, Dry-run: {dry_run}")
|
|
|
|
if parallel_workers > 0:
|
|
# Parallel migration
|
|
rows = run_parallel_migration(
|
|
table,
|
|
num_workers=parallel_workers,
|
|
dry_run=dry_run,
|
|
resume=resume
|
|
)
|
|
else:
|
|
# Sequential migration
|
|
rows = run_full_migration(
|
|
table,
|
|
dry_run=dry_run,
|
|
resume=resume,
|
|
partition=partition
|
|
)
|
|
|
|
migration_logs[task_id].append(f"✓ Migration complete: {rows:,} rows migrated")
|
|
running_migrations[task_id] = 'completed'
|
|
|
|
except Exception as e:
|
|
migration_logs[task_id].append(f"❌ Migration failed: {e}")
|
|
running_migrations[task_id] = 'failed'
|
|
logger.error(f"Migration failed: {e}")
|
|
|
|
# Start migration in background thread
|
|
thread = threading.Thread(target=run_migration, daemon=True)
|
|
thread.start()
|
|
|
|
running_migrations[task_id] = 'running'
|
|
|
|
return f"✓ Migration started: {task_id}\nCheck the 'Logs' tab for progress"
|
|
|
|
|
|
def start_incremental_migration_task(
|
|
table: str,
|
|
dry_run: bool
|
|
) -> str:
|
|
"""Start an incremental migration in a background thread.
|
|
|
|
Args:
|
|
table: Table name (RAWDATACOR, ELABDATADISP, or all)
|
|
dry_run: Whether to run in dry-run mode
|
|
|
|
Returns:
|
|
Status message
|
|
"""
|
|
task_id = f"incremental_{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
if task_id in running_migrations:
|
|
return f"❌ Incremental migration already running for {table}"
|
|
|
|
# Initialize log buffer for this task
|
|
migration_logs[task_id] = []
|
|
|
|
def run_migration():
|
|
"""Worker function to run incremental migration."""
|
|
try:
|
|
migration_logs[task_id].append(f"Starting INCREMENTAL migration for {table}...")
|
|
migration_logs[task_id].append(f"Dry-run: {dry_run}")
|
|
migration_logs[task_id].append("This will sync only NEW data added since last migration")
|
|
|
|
from src.migrator.incremental_migrator import run_incremental_migration
|
|
|
|
rows = run_incremental_migration(
|
|
table,
|
|
dry_run=dry_run
|
|
)
|
|
|
|
migration_logs[task_id].append(f"✓ Incremental migration complete: {rows:,} rows migrated")
|
|
running_migrations[task_id] = 'completed'
|
|
|
|
except Exception as e:
|
|
migration_logs[task_id].append(f"❌ Incremental migration failed: {e}")
|
|
running_migrations[task_id] = 'failed'
|
|
logger.error(f"Incremental migration failed: {e}")
|
|
|
|
# Start migration in background thread
|
|
thread = threading.Thread(target=run_migration, daemon=True)
|
|
thread.start()
|
|
|
|
running_migrations[task_id] = 'running'
|
|
|
|
return f"✓ Incremental migration started: {task_id}\nCheck the 'Logs' tab for progress"
|
|
|
|
|
|
def get_migration_logs(task_id: str) -> str:
|
|
"""Get logs for a specific migration task.
|
|
|
|
Args:
|
|
task_id: Task identifier
|
|
|
|
Returns:
|
|
Log contents
|
|
"""
|
|
if not task_id or task_id == "No migrations running":
|
|
return "Select a migration to view logs"
|
|
|
|
logs = migration_logs.get(task_id, [])
|
|
if not logs:
|
|
return f"No logs available for {task_id}"
|
|
|
|
return "\n".join(logs)
|
|
|
|
|
|
def refresh_migration_state():
|
|
"""Refresh migration state display."""
|
|
return get_migration_state_df()
|
|
|
|
|
|
def refresh_status_chart():
|
|
"""Refresh status chart."""
|
|
return create_status_chart()
|
|
|
|
|
|
def refresh_running_migrations() -> gr.Dropdown:
|
|
"""Refresh list of running migrations."""
|
|
if not running_migrations:
|
|
return gr.Dropdown(choices=["No migrations running"], value="No migrations running")
|
|
|
|
choices = list(running_migrations.keys())
|
|
return gr.Dropdown(choices=choices, value=choices[0] if choices else None)
|
|
|
|
|
|
# Build Gradio interface
|
|
with gr.Blocks(title="MySQL to PostgreSQL Migration", theme=gr.themes.Soft()) as demo:
|
|
gr.Markdown("# 🔄 MySQL to PostgreSQL Migration Dashboard")
|
|
gr.Markdown("Monitor and control data migration from MySQL to PostgreSQL")
|
|
|
|
with gr.Tabs():
|
|
# Tab 1: Overview
|
|
with gr.Tab("📊 Overview"):
|
|
gr.Markdown("## Migration Status Overview")
|
|
|
|
with gr.Row():
|
|
refresh_btn = gr.Button("🔄 Refresh", scale=1)
|
|
|
|
with gr.Row():
|
|
status_chart = gr.Plot(label="Partition Status by Table")
|
|
|
|
with gr.Row():
|
|
state_table = gr.Dataframe(
|
|
value=get_migration_state_df(),
|
|
label="Migration State (All Partitions)",
|
|
interactive=False,
|
|
wrap=True
|
|
)
|
|
|
|
# Auto-refresh every 10 seconds
|
|
refresh_btn.click(
|
|
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
|
|
outputs=[state_table, status_chart]
|
|
)
|
|
|
|
# Initial load
|
|
demo.load(
|
|
fn=lambda: (refresh_migration_state(), refresh_status_chart()),
|
|
outputs=[state_table, status_chart]
|
|
)
|
|
|
|
# Tab 2: Start Migration
|
|
with gr.Tab("▶️ Start Migration"):
|
|
gr.Markdown("## Start New Migration")
|
|
|
|
with gr.Row():
|
|
table_select = gr.Dropdown(
|
|
choices=["RAWDATACOR", "ELABDATADISP", "all"],
|
|
value="RAWDATACOR",
|
|
label="Table to Migrate"
|
|
)
|
|
|
|
partition_input = gr.Textbox(
|
|
label="Partition (optional)",
|
|
placeholder="Leave empty for all partitions, or enter e.g. 'part7', 'd9'",
|
|
value=""
|
|
)
|
|
|
|
with gr.Row():
|
|
parallel_slider = gr.Slider(
|
|
minimum=0,
|
|
maximum=10,
|
|
value=0,
|
|
step=1,
|
|
label="Parallel Workers (0 = sequential)"
|
|
)
|
|
|
|
with gr.Row():
|
|
resume_check = gr.Checkbox(label="Resume from last checkpoint", value=True)
|
|
dry_run_check = gr.Checkbox(label="Dry run (simulate without writing)", value=False)
|
|
|
|
start_btn = gr.Button("▶️ Start Migration", variant="primary", size="lg")
|
|
|
|
migration_output = gr.Textbox(
|
|
label="Migration Status",
|
|
lines=3,
|
|
interactive=False
|
|
)
|
|
|
|
start_btn.click(
|
|
fn=start_migration_task,
|
|
inputs=[table_select, parallel_slider, resume_check, dry_run_check, partition_input],
|
|
outputs=migration_output
|
|
)
|
|
|
|
# Tab 3: Incremental Migration
|
|
with gr.Tab("🔄 Incremental Sync"):
|
|
gr.Markdown("## Incremental Migration (Sync New Data)")
|
|
gr.Markdown("""
|
|
Questa modalità sincronizza **solo i dati nuovi** aggiunti dopo l'ultima migrazione full.
|
|
|
|
**Come funziona:**
|
|
- Trova le nuove consolidation keys in MySQL che non esistono ancora in PostgreSQL
|
|
- Migra solo quelle chiavi (non riprocessa dati già migrati)
|
|
- Usa `migration_state` per tracciare l'ultima chiave processata
|
|
|
|
**Quando usare:**
|
|
- Dopo aver completato una migrazione full di tutte le partizioni
|
|
- Per sincronizzare periodicamente nuovi dati senza rifare tutto
|
|
- Per aggiornamenti quotidiani/settimanali
|
|
""")
|
|
|
|
with gr.Row():
|
|
inc_table_select = gr.Dropdown(
|
|
choices=["RAWDATACOR", "ELABDATADISP", "all"],
|
|
value="RAWDATACOR",
|
|
label="Table to Sync"
|
|
)
|
|
|
|
with gr.Row():
|
|
inc_dry_run_check = gr.Checkbox(
|
|
label="Dry run (simulate without writing)",
|
|
value=False
|
|
)
|
|
|
|
inc_start_btn = gr.Button("🔄 Start Incremental Sync", variant="primary", size="lg")
|
|
|
|
inc_migration_output = gr.Textbox(
|
|
label="Sync Status",
|
|
lines=3,
|
|
interactive=False
|
|
)
|
|
|
|
inc_start_btn.click(
|
|
fn=start_incremental_migration_task,
|
|
inputs=[inc_table_select, inc_dry_run_check],
|
|
outputs=inc_migration_output
|
|
)
|
|
|
|
# Tab 4: Logs
|
|
with gr.Tab("📝 Logs"):
|
|
gr.Markdown("## Migration Logs")
|
|
|
|
with gr.Row():
|
|
running_migrations_dropdown = gr.Dropdown(
|
|
choices=["No migrations running"],
|
|
value="No migrations running",
|
|
label="Select Migration",
|
|
interactive=True
|
|
)
|
|
refresh_migrations_btn = gr.Button("🔄 Refresh List")
|
|
|
|
log_output = gr.Textbox(
|
|
label="Log Output",
|
|
lines=20,
|
|
interactive=False,
|
|
max_lines=50
|
|
)
|
|
|
|
refresh_migrations_btn.click(
|
|
fn=refresh_running_migrations,
|
|
outputs=running_migrations_dropdown
|
|
)
|
|
|
|
running_migrations_dropdown.change(
|
|
fn=get_migration_logs,
|
|
inputs=running_migrations_dropdown,
|
|
outputs=log_output
|
|
)
|
|
|
|
# Tab 5: Error Logs
|
|
with gr.Tab("⚠️ Error Logs"):
|
|
gr.Markdown("## Error Log Viewer")
|
|
gr.Markdown("View logged validation errors (invalid consolidation keys)")
|
|
|
|
with gr.Row():
|
|
error_log_dropdown = gr.Dropdown(
|
|
choices=[x[0] for x in get_error_logs()] if get_error_logs() else ["No error logs found"],
|
|
label="Select Error Log File",
|
|
value="Select a log file..."
|
|
)
|
|
refresh_error_logs_btn = gr.Button("🔄 Refresh")
|
|
|
|
error_log_content = gr.Textbox(
|
|
label="Error Log Contents",
|
|
lines=20,
|
|
interactive=False,
|
|
max_lines=50
|
|
)
|
|
|
|
def refresh_error_log_list():
|
|
logs = get_error_logs()
|
|
choices = [x[0] for x in logs] if logs else ["No error logs found"]
|
|
return gr.Dropdown(choices=choices)
|
|
|
|
def show_error_log(filename):
|
|
if not filename or filename == "Select a log file..." or filename == "No error logs found":
|
|
return "Select a log file to view its contents"
|
|
return read_error_log(filename)
|
|
|
|
refresh_error_logs_btn.click(
|
|
fn=refresh_error_log_list,
|
|
outputs=error_log_dropdown
|
|
)
|
|
|
|
error_log_dropdown.change(
|
|
fn=show_error_log,
|
|
inputs=error_log_dropdown,
|
|
outputs=error_log_content
|
|
)
|
|
|
|
|
|
def launch_ui(share=False, server_port=7860):
|
|
"""Launch the Gradio UI.
|
|
|
|
Args:
|
|
share: Whether to create a public share link
|
|
server_port: Port to run the server on
|
|
"""
|
|
demo.launch(
|
|
share=share,
|
|
server_port=server_port,
|
|
server_name="0.0.0.0" # Listen on all interfaces
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
# Parse command line args
|
|
share = "--share" in sys.argv
|
|
port = 7860
|
|
|
|
for arg in sys.argv:
|
|
if arg.startswith("--port="):
|
|
port = int(arg.split("=")[1])
|
|
|
|
print(f"Starting Migration Dashboard on port {port}...")
|
|
print(f"Share mode: {share}")
|
|
|
|
launch_ui(share=share, server_port=port)
|