"""Web UI for MySQL to PostgreSQL migration monitoring and control. Provides a Gradio-based interface for: - Viewing migration status and progress - Starting/monitoring migrations - Viewing logs and errors - Performance metrics and graphs """ import gradio as gr import pandas as pd import plotly.graph_objects as go import plotly.express as px from pathlib import Path import threading import time from datetime import datetime from typing import Optional, Dict, List, Tuple import logging from config import get_settings from src.connectors.postgres_connector import PostgreSQLConnector from src.migrator.full_migrator import run_full_migration from src.migrator.parallel_migrator import run_parallel_migration from src.utils.logger import get_logger logger = get_logger(__name__) # Global state for tracking running migrations running_migrations = {} migration_logs = {} def get_migration_state_df() -> pd.DataFrame: """Fetch current migration state from PostgreSQL. Returns: DataFrame with columns: table_name, partition_name, status, total_rows_migrated, migration_started_at, migration_completed_at """ try: with PostgreSQLConnector() as pg_conn: with pg_conn.connection.cursor() as cursor: cursor.execute(""" SELECT table_name, partition_name, status, total_rows_migrated, migration_started_at, migration_completed_at, last_key FROM migration_state WHERE partition_name != '_global' ORDER BY table_name, partition_name """) rows = cursor.fetchall() if not rows: return pd.DataFrame(columns=[ 'Table', 'Partition', 'Status', 'Rows Migrated', 'Started At', 'Completed At' ]) df = pd.DataFrame(rows, columns=[ 'Table', 'Partition', 'Status', 'Rows Migrated', 'Started At', 'Completed At', 'Last Key' ]) # Format dates df['Started At'] = pd.to_datetime(df['Started At']).dt.strftime('%Y-%m-%d %H:%M:%S') df['Completed At'] = pd.to_datetime(df['Completed At']).dt.strftime('%Y-%m-%d %H:%M:%S') df['Completed At'] = df['Completed At'].fillna('-') # Drop Last Key column for display (too verbose) df = df.drop('Last Key', axis=1) return df except Exception as e: logger.error(f"Failed to fetch migration state: {e}") return pd.DataFrame(columns=[ 'Table', 'Partition', 'Status', 'Rows Migrated', 'Started At', 'Completed At' ]) def get_migration_summary() -> Dict[str, any]: """Get summary statistics for migrations. Returns: Dict with total/completed/in_progress/pending counts per table """ try: with PostgreSQLConnector() as pg_conn: with pg_conn.connection.cursor() as cursor: cursor.execute(""" SELECT table_name, status, COUNT(*) as count, SUM(total_rows_migrated) as total_rows FROM migration_state WHERE partition_name != '_global' GROUP BY table_name, status ORDER BY table_name, status """) rows = cursor.fetchall() summary = { 'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}, 'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0} } for table, status, count, total_rows in rows: table_upper = table.upper() if table_upper in summary: summary[table_upper][status] = count summary[table_upper]['total_rows'] += total_rows or 0 return summary except Exception as e: logger.error(f"Failed to fetch migration summary: {e}") return { 'RAWDATACOR': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0}, 'ELABDATADISP': {'completed': 0, 'in_progress': 0, 'pending': 0, 'total_rows': 0} } def create_status_chart() -> go.Figure: """Create a bar chart showing migration status by table. Returns: Plotly figure """ summary = get_migration_summary() tables = [] completed = [] in_progress = [] pending = [] for table, stats in summary.items(): tables.append(table) completed.append(stats['completed']) in_progress.append(stats['in_progress']) pending.append(stats['pending']) fig = go.Figure(data=[ go.Bar(name='Completed', x=tables, y=completed, marker_color='green'), go.Bar(name='In Progress', x=tables, y=in_progress, marker_color='orange'), go.Bar(name='Pending', x=tables, y=pending, marker_color='gray') ]) fig.update_layout( barmode='stack', title='Migration Status by Table', xaxis_title='Table', yaxis_title='Number of Partitions', height=400 ) return fig def get_error_logs() -> List[Tuple[str, str]]: """Get list of error log files with their paths. Returns: List of (filename, filepath) tuples """ error_logs = [] for log_file in Path('.').glob('migration_errors_*.log'): error_logs.append((log_file.name, str(log_file))) return sorted(error_logs, key=lambda x: x[0], reverse=True) def read_error_log(log_file: str) -> str: """Read contents of an error log file. Args: log_file: Path to log file Returns: Contents of log file """ try: if not log_file or log_file == "Select a log file...": return "No log file selected" with open(log_file, 'r') as f: content = f.read() if not content: return f"Log file {log_file} is empty (no errors logged)" return content except Exception as e: return f"Error reading log file: {e}" def start_migration_task( table: str, parallel_workers: int, resume: bool, dry_run: bool, partition: Optional[str] = None ) -> str: """Start a migration in a background thread. Args: table: Table name (RAWDATACOR, ELABDATADISP, or all) parallel_workers: Number of parallel workers (0 = sequential) resume: Whether to resume from last checkpoint dry_run: Whether to run in dry-run mode partition: Optional partition name for single partition migration Returns: Status message """ task_id = f"{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" if task_id in running_migrations: return f"❌ Migration already running for {table}" # Validate options if parallel_workers > 0 and partition: return "❌ Cannot use parallel workers with single partition mode" # Initialize log buffer for this task migration_logs[task_id] = [] def run_migration(): """Worker function to run migration.""" try: migration_logs[task_id].append(f"Starting migration for {table}...") migration_logs[task_id].append(f"Mode: {'Parallel' if parallel_workers > 0 else 'Sequential'}") migration_logs[task_id].append(f"Resume: {resume}, Dry-run: {dry_run}") if parallel_workers > 0: # Parallel migration rows = run_parallel_migration( table, num_workers=parallel_workers, dry_run=dry_run, resume=resume ) else: # Sequential migration rows = run_full_migration( table, dry_run=dry_run, resume=resume, partition=partition ) migration_logs[task_id].append(f"✓ Migration complete: {rows:,} rows migrated") running_migrations[task_id] = 'completed' except Exception as e: migration_logs[task_id].append(f"❌ Migration failed: {e}") running_migrations[task_id] = 'failed' logger.error(f"Migration failed: {e}") # Start migration in background thread thread = threading.Thread(target=run_migration, daemon=True) thread.start() running_migrations[task_id] = 'running' return f"✓ Migration started: {task_id}\nCheck the 'Logs' tab for progress" def start_incremental_migration_task( table: str, dry_run: bool ) -> str: """Start an incremental migration in a background thread. Args: table: Table name (RAWDATACOR, ELABDATADISP, or all) dry_run: Whether to run in dry-run mode Returns: Status message """ task_id = f"incremental_{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" if task_id in running_migrations: return f"❌ Incremental migration already running for {table}" # Initialize log buffer for this task migration_logs[task_id] = [] def run_migration(): """Worker function to run incremental migration.""" try: migration_logs[task_id].append(f"Starting INCREMENTAL migration for {table}...") migration_logs[task_id].append(f"Dry-run: {dry_run}") migration_logs[task_id].append("This will sync only NEW data added since last migration") from src.migrator.incremental_migrator import run_incremental_migration rows = run_incremental_migration( table, dry_run=dry_run ) migration_logs[task_id].append(f"✓ Incremental migration complete: {rows:,} rows migrated") running_migrations[task_id] = 'completed' except Exception as e: migration_logs[task_id].append(f"❌ Incremental migration failed: {e}") running_migrations[task_id] = 'failed' logger.error(f"Incremental migration failed: {e}") # Start migration in background thread thread = threading.Thread(target=run_migration, daemon=True) thread.start() running_migrations[task_id] = 'running' return f"✓ Incremental migration started: {task_id}\nCheck the 'Logs' tab for progress" def get_migration_logs(task_id: str) -> str: """Get logs for a specific migration task. Args: task_id: Task identifier Returns: Log contents """ if not task_id or task_id == "No migrations running": return "Select a migration to view logs" logs = migration_logs.get(task_id, []) if not logs: return f"No logs available for {task_id}" return "\n".join(logs) def refresh_migration_state(): """Refresh migration state display.""" return get_migration_state_df() def refresh_status_chart(): """Refresh status chart.""" return create_status_chart() def refresh_running_migrations() -> gr.Dropdown: """Refresh list of running migrations.""" if not running_migrations: return gr.Dropdown(choices=["No migrations running"], value="No migrations running") choices = list(running_migrations.keys()) return gr.Dropdown(choices=choices, value=choices[0] if choices else None) # Build Gradio interface with gr.Blocks(title="MySQL to PostgreSQL Migration", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🔄 MySQL to PostgreSQL Migration Dashboard") gr.Markdown("Monitor and control data migration from MySQL to PostgreSQL") with gr.Tabs(): # Tab 1: Overview with gr.Tab("📊 Overview"): gr.Markdown("## Migration Status Overview") with gr.Row(): refresh_btn = gr.Button("🔄 Refresh", scale=1) with gr.Row(): status_chart = gr.Plot(label="Partition Status by Table") with gr.Row(): state_table = gr.Dataframe( value=get_migration_state_df(), label="Migration State (All Partitions)", interactive=False, wrap=True ) # Auto-refresh every 10 seconds refresh_btn.click( fn=lambda: (refresh_migration_state(), refresh_status_chart()), outputs=[state_table, status_chart] ) # Initial load demo.load( fn=lambda: (refresh_migration_state(), refresh_status_chart()), outputs=[state_table, status_chart] ) # Tab 2: Start Migration with gr.Tab("▶️ Start Migration"): gr.Markdown("## Start New Migration") with gr.Row(): table_select = gr.Dropdown( choices=["RAWDATACOR", "ELABDATADISP", "all"], value="RAWDATACOR", label="Table to Migrate" ) partition_input = gr.Textbox( label="Partition (optional)", placeholder="Leave empty for all partitions, or enter e.g. 'part7', 'd9'", value="" ) with gr.Row(): parallel_slider = gr.Slider( minimum=0, maximum=10, value=0, step=1, label="Parallel Workers (0 = sequential)" ) with gr.Row(): resume_check = gr.Checkbox(label="Resume from last checkpoint", value=True) dry_run_check = gr.Checkbox(label="Dry run (simulate without writing)", value=False) start_btn = gr.Button("▶️ Start Migration", variant="primary", size="lg") migration_output = gr.Textbox( label="Migration Status", lines=3, interactive=False ) start_btn.click( fn=start_migration_task, inputs=[table_select, parallel_slider, resume_check, dry_run_check, partition_input], outputs=migration_output ) # Tab 3: Incremental Migration with gr.Tab("🔄 Incremental Sync"): gr.Markdown("## Incremental Migration (Sync New Data)") gr.Markdown(""" Questa modalità sincronizza **solo i dati nuovi** aggiunti dopo l'ultima migrazione full. **Come funziona:** - Trova le nuove consolidation keys in MySQL che non esistono ancora in PostgreSQL - Migra solo quelle chiavi (non riprocessa dati già migrati) - Usa `migration_state` per tracciare l'ultima chiave processata **Quando usare:** - Dopo aver completato una migrazione full di tutte le partizioni - Per sincronizzare periodicamente nuovi dati senza rifare tutto - Per aggiornamenti quotidiani/settimanali """) with gr.Row(): inc_table_select = gr.Dropdown( choices=["RAWDATACOR", "ELABDATADISP", "all"], value="RAWDATACOR", label="Table to Sync" ) with gr.Row(): inc_dry_run_check = gr.Checkbox( label="Dry run (simulate without writing)", value=False ) inc_start_btn = gr.Button("🔄 Start Incremental Sync", variant="primary", size="lg") inc_migration_output = gr.Textbox( label="Sync Status", lines=3, interactive=False ) inc_start_btn.click( fn=start_incremental_migration_task, inputs=[inc_table_select, inc_dry_run_check], outputs=inc_migration_output ) # Tab 4: Logs with gr.Tab("📝 Logs"): gr.Markdown("## Migration Logs") with gr.Row(): running_migrations_dropdown = gr.Dropdown( choices=["No migrations running"], value="No migrations running", label="Select Migration", interactive=True ) refresh_migrations_btn = gr.Button("🔄 Refresh List") log_output = gr.Textbox( label="Log Output", lines=20, interactive=False, max_lines=50 ) refresh_migrations_btn.click( fn=refresh_running_migrations, outputs=running_migrations_dropdown ) running_migrations_dropdown.change( fn=get_migration_logs, inputs=running_migrations_dropdown, outputs=log_output ) # Tab 5: Error Logs with gr.Tab("⚠️ Error Logs"): gr.Markdown("## Error Log Viewer") gr.Markdown("View logged validation errors (invalid consolidation keys)") with gr.Row(): error_log_dropdown = gr.Dropdown( choices=[x[0] for x in get_error_logs()] if get_error_logs() else ["No error logs found"], label="Select Error Log File", value="Select a log file..." ) refresh_error_logs_btn = gr.Button("🔄 Refresh") error_log_content = gr.Textbox( label="Error Log Contents", lines=20, interactive=False, max_lines=50 ) def refresh_error_log_list(): logs = get_error_logs() choices = [x[0] for x in logs] if logs else ["No error logs found"] return gr.Dropdown(choices=choices) def show_error_log(filename): if not filename or filename == "Select a log file..." or filename == "No error logs found": return "Select a log file to view its contents" return read_error_log(filename) refresh_error_logs_btn.click( fn=refresh_error_log_list, outputs=error_log_dropdown ) error_log_dropdown.change( fn=show_error_log, inputs=error_log_dropdown, outputs=error_log_content ) def launch_ui(share=False, server_port=7860): """Launch the Gradio UI. Args: share: Whether to create a public share link server_port: Port to run the server on """ demo.launch( share=share, server_port=server_port, server_name="0.0.0.0" # Listen on all interfaces ) if __name__ == "__main__": import sys # Parse command line args share = "--share" in sys.argv port = 7860 for arg in sys.argv: if arg.startswith("--port="): port = int(arg.split("=")[1]) print(f"Starting Migration Dashboard on port {port}...") print(f"Share mode: {share}") launch_ui(share=share, server_port=port)