"""Full migration from MySQL to PostgreSQL.""" from typing import Optional from datetime import datetime import json from config import get_settings, TABLE_CONFIGS from src.connectors.mysql_connector import MySQLConnector from src.connectors.postgres_connector import PostgreSQLConnector from src.transformers.data_transformer import DataTransformer from src.utils.logger import get_logger, setup_logger from src.utils.progress import ProgressTracker from src.migrator.state import MigrationState logger = get_logger(__name__) class FullMigrator: """Perform full migration of a table from MySQL to PostgreSQL.""" def __init__(self, table: str): """Initialize migrator for a table. Args: table: Table name to migrate ('RAWDATACOR' or 'ELABDATADISP') """ if table not in TABLE_CONFIGS: raise ValueError(f"Unknown table: {table}") self.table = table self.config = TABLE_CONFIGS[table] self.settings = get_settings() self.state = MigrationState() def migrate(self, dry_run: bool = False, resume: bool = False) -> int: """Perform full migration of the table with resume capability. Args: dry_run: If True, log what would be done but don't modify data resume: If True, resume from last checkpoint; if False, check for conflicts Returns: Total number of rows migrated in this run """ setup_logger(__name__) mysql_table = self.config["mysql_table"] pg_table = self.config["postgres_table"] logger.info(f"Starting full migration of {mysql_table} -> {pg_table}") try: with MySQLConnector() as mysql_conn: # Get total row count total_rows = mysql_conn.get_row_count(mysql_table) logger.info(f"Total rows in source: {total_rows}") with PostgreSQLConnector() as pg_conn: # Check if table exists if not pg_conn.table_exists(pg_table): raise ValueError( f"PostgreSQL table {pg_table} does not exist. " "Run 'setup --create-schema' first." ) # Check for previous migration state # Note: With partition-based consolidation, we track progress differently # than with ID-based pagination. The resume capability is simplified: # - If data exists in table, migration was in progress # - Resume will continue from where we left off # - Full restart requires clearing the table previous_migrated_count = self._get_previous_migrated_count(pg_conn, pg_table) last_completed_partition = self._get_last_completed_partition(pg_conn, pg_table) if previous_migrated_count > 0: pg_row_count = pg_conn.get_row_count(pg_table) logger.warning( f"Found previous migration state: {pg_row_count} rows already in {pg_table}" ) if not resume: raise ValueError( f"Migration already in progress for {pg_table}. " f"Use --resume to continue from last checkpoint, or delete data to restart." ) logger.info(f"Resuming migration - found {pg_row_count} existing rows") if last_completed_partition: logger.info(f"Last completed partition: {last_completed_partition}") # Progress bar tracks MySQL rows processed (before consolidation) # Consolidation reduces count but not the rows we need to fetch rows_to_migrate = total_rows else: previous_migrated_count = 0 last_completed_partition = None rows_to_migrate = total_rows if dry_run: logger.info(f"[DRY RUN] Would migrate {rows_to_migrate} rows") return rows_to_migrate migrated = previous_migrated_count migration_start_time = datetime.utcnow().isoformat() batch_count = 0 last_processed_partition = last_completed_partition # Track last partition we process with ProgressTracker( rows_to_migrate, f"Migrating {mysql_table}" ) as progress: columns = DataTransformer.get_column_order(pg_table) # Get list of partitions and process each one partitions = mysql_conn.get_table_partitions(mysql_table) logger.info(f"Found {len(partitions)} partitions for {mysql_table}") for partition_idx, partition in enumerate(partitions, 1): # Skip partitions already completed in previous run if last_completed_partition and partition <= last_completed_partition: logger.info(f"[{partition_idx}/{len(partitions)}] Skipping partition {partition} (already completed)") continue logger.info(f"[{partition_idx}/{len(partitions)}] Processing partition {partition}...") partition_group_count = 0 # If resuming and this is NOT the last completed partition, # it means it was only partially processed - clean it up first start_id = None if resume and last_completed_partition and partition > last_completed_partition: # This partition was started but not completed - delete its partial data logger.warning( f"Partition {partition} was partially processed in previous run. " f"Cleaning up partial data before resume..." ) try: with pg_conn.connection.cursor() as cursor: # Get the primary key column name for this table pk_column = self.config.get("postgres_pk", "id") # Delete rows from this partition that were inserted from MySQL rows # We identify them by looking for rows inserted after the migration started # This is safe because we're re-processing the entire partition # Note: This is a simplified approach - in production you might want more granular tracking last_id = self._get_last_migrated_id(pg_conn, pg_table) if last_id: cursor.execute( f"DELETE FROM {pg_table} WHERE {pk_column} > %s", (last_id,) ) pg_conn.connection.commit() logger.info(f"Cleaned up partial data for partition {partition}") # Recalculate migrated count based on actual data in database cursor.execute(f"SELECT COUNT(*) FROM {pg_table}") actual_count = cursor.fetchone()[0] migrated = actual_count logger.info(f"Recalculated total_rows_migrated: {migrated} (actual rows in database)") except Exception as e: logger.warning(f"Failed to clean up partial data: {e}") # Continue anyway - might be able to deduplicate later elif resume and last_completed_partition == partition and previous_migrated_count > 0: # Resuming within the same partition - continue from last ID start_id = self._get_last_migrated_id(pg_conn, pg_table) if start_id: logger.info(f"Resuming partition {partition} from ID > {start_id}") # Accumulate rows for batch insertion to reduce database round-trips insert_buffer = [] # Use smaller batch size for more frequent updates: batch_size * 5 = 50k rows insert_buffer_size = self.settings.migration.batch_size * 5 fetched_in_buffer = 0 # Track MySQL rows fetched (before consolidation) # Fetch consolidation groups from partition # Each group is a list of rows with the same (unit, tool, date, time) for group_rows in mysql_conn.fetch_consolidation_groups_from_partition( mysql_table, partition, start_id=start_id ): if not group_rows: break # Consolidate the group transformed = DataTransformer.transform_batch( mysql_table, group_rows, consolidate=True ) # Add to insert buffer instead of inserting immediately insert_buffer.extend(transformed) partition_group_count += len(transformed) fetched_in_buffer += len(group_rows) # When buffer is full, flush to database if len(insert_buffer) >= insert_buffer_size: inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns) if inserted > 0: migrated += inserted batch_count += 1 progress.update(fetched_in_buffer) # Update migration state after every batch flush # Do NOT set last_completed_partition yet - partition is still being processed self._update_migration_state( pg_conn, migrated, None, migration_start_time ) logger.debug( f"Partition {partition}: flushed {inserted} rows, " f"total migrated: {migrated}" ) insert_buffer = [] fetched_in_buffer = 0 # Flush remaining rows in buffer for this partition if insert_buffer: inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns) if inserted > 0: migrated += inserted batch_count += 1 progress.update(fetched_in_buffer) # Still don't set last_completed_partition - partition is still being finalized self._update_migration_state( pg_conn, migrated, None, migration_start_time ) logger.debug( f"Partition {partition} final flush: {inserted} rows, " f"total migrated: {migrated}" ) # NOW partition is complete - update with completed partition logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated") last_processed_partition = partition # Track this partition as processed self._update_migration_state( pg_conn, migrated, None, migration_start_time, last_partition=partition ) # Get final actual count from PostgreSQL final_count = pg_conn.get_row_count(pg_table) logger.info(f"Final count from PostgreSQL: {final_count}") # Update migration state with final count and mark as completed # Get the actual last ID from the table using correct PK column try: with pg_conn.connection.cursor() as cursor: pk_column = self.config.get("postgres_pk", "id") cursor.execute( f"SELECT MAX({pk_column}) FROM {pg_table}" ) result = cursor.fetchone() final_last_id = result[0] if result and result[0] else None logger.info(f"Final last ID from table: {final_last_id}") except Exception as e: logger.warning(f"Failed to get final last ID: {e}") final_last_id = None logger.info(f"About to update migration_state with count={final_count}, last_id={final_last_id}, last_partition={last_processed_partition}") self._update_migration_state( pg_conn, final_count, final_last_id, migration_start_time, is_final=True, last_partition=last_processed_partition ) logger.info(f"Migration state update complete") logger.info( f"✓ Migration complete: {final_count} total rows in {pg_table}" ) return final_count except Exception as e: logger.error(f"Migration failed: {e}") raise def _get_last_migrated_id(self, pg_conn: PostgreSQLConnector, pg_table: str) -> Optional[int]: """Get the last migrated MySQL ID from migration_state table. Args: pg_conn: PostgreSQL connection pg_table: PostgreSQL table name Returns: Last migrated MySQL ID or None if no previous migration """ try: with pg_conn.connection.cursor() as cursor: cursor.execute( "SELECT last_migrated_id FROM migration_state WHERE table_name = %s", (pg_table,) ) result = cursor.fetchone() if result and result[0]: return result[0] except Exception: pass return None def _get_previous_migrated_count(self, pg_conn: PostgreSQLConnector, pg_table: str) -> int: """Get the total rows migrated so far from migration_state table. Args: pg_conn: PostgreSQL connection pg_table: PostgreSQL table name Returns: Total rows migrated so far (0 if no previous migration) """ try: with pg_conn.connection.cursor() as cursor: cursor.execute( "SELECT total_rows_migrated FROM migration_state WHERE table_name = %s", (pg_table,) ) result = cursor.fetchone() if result and result[0]: return result[0] except Exception: pass return 0 def _get_last_completed_partition(self, pg_conn: PostgreSQLConnector, pg_table: str) -> Optional[str]: """Get the last completed partition from migration_state table. Args: pg_conn: PostgreSQL connection pg_table: PostgreSQL table name Returns: Last completed partition name or None if no previous migration """ try: with pg_conn.connection.cursor() as cursor: cursor.execute( "SELECT last_completed_partition FROM migration_state WHERE table_name = %s", (pg_table,) ) result = cursor.fetchone() if result and result[0]: return result[0] except Exception: pass return None def _update_migration_state( self, pg_conn: PostgreSQLConnector, rows_migrated: int, last_id: Optional[int] = None, migration_start_time: Optional[str] = None, is_final: bool = False, last_partition: Optional[str] = None ) -> None: """Update migration state in PostgreSQL and state file. Args: pg_conn: PostgreSQL connection rows_migrated: Total number of rows migrated so far last_id: Last ID that was migrated (for resume capability) migration_start_time: When the migration started (ISO format) is_final: If True, mark migration as completed last_partition: Name of the last completed partition """ pg_table = self.config["postgres_table"] now = datetime.utcnow() status = "completed" if is_final else "in_progress" # Update PostgreSQL migration_state table try: with pg_conn.connection.cursor() as cursor: logger.info(f"About to update migration_state: table={pg_table}, last_partition={last_partition}, last_id={last_id}, rows={rows_migrated}") query = f""" INSERT INTO migration_state (table_name, last_migrated_timestamp, last_migrated_id, total_rows_migrated, migration_completed_at, status, last_completed_partition) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (table_name) DO UPDATE SET last_migrated_timestamp = EXCLUDED.last_migrated_timestamp, last_migrated_id = EXCLUDED.last_migrated_id, total_rows_migrated = EXCLUDED.total_rows_migrated, migration_completed_at = EXCLUDED.migration_completed_at, status = EXCLUDED.status, last_completed_partition = EXCLUDED.last_completed_partition """ cursor.execute( query, ( pg_table, migration_start_time or now.isoformat(), last_id, rows_migrated, now if status == "completed" else None, status, last_partition ) ) pg_conn.connection.commit() logger.info(f"Migration state updated successfully: {rows_migrated} rows, last_partition={last_partition}, last_id={last_id}") except Exception as e: logger.error(f"Failed to update migration state in PostgreSQL: {e}") raise # Also save to state file for incremental migrations try: self.state.set_last_timestamp(pg_table, migration_start_time or now.isoformat()) self.state.increment_migration_count(pg_table, rows_migrated) logger.debug("Migration state saved to file") except Exception as e: logger.warning(f"Failed to save migration state to file: {e}") def run_full_migration( table: str, dry_run: bool = False, resume: bool = False ) -> int: """Run full migration for a table. Args: table: Table name to migrate dry_run: If True, show what would be done without modifying data resume: If True, resume from last checkpoint instead of starting fresh Returns: Number of rows migrated in this run """ migrator = FullMigrator(table) return migrator.migrate(dry_run=dry_run, resume=resume)