Problems identified: 1. Buffer size of batch_size * 10 (100k rows) was too large, causing migration_state to not update for several minutes on low-consolidation partitions 2. State updates only happened every 10 batches, not reflecting actual progress Changes: - Reduce insert_buffer_size from 10x to 5x batch_size (50k rows) - Update migration_state after EVERY batch flush, not every 10 batches - Add debug logging showing flush operations and total migrated count - This provides better visibility into migration progress and checkpointing For partitions with low consolidation ratio (like d0 with 1.1x), this ensures migration_state is updated more frequently, supporting better resume capability and providing visibility into actual progress. 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
334 lines
15 KiB
Python
334 lines
15 KiB
Python
"""Full migration from MySQL to PostgreSQL."""
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
import json
|
|
|
|
from config import get_settings, TABLE_CONFIGS
|
|
from src.connectors.mysql_connector import MySQLConnector
|
|
from src.connectors.postgres_connector import PostgreSQLConnector
|
|
from src.transformers.data_transformer import DataTransformer
|
|
from src.utils.logger import get_logger, setup_logger
|
|
from src.utils.progress import ProgressTracker
|
|
from src.migrator.state import MigrationState
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class FullMigrator:
|
|
"""Perform full migration of a table from MySQL to PostgreSQL."""
|
|
|
|
def __init__(self, table: str):
|
|
"""Initialize migrator for a table.
|
|
|
|
Args:
|
|
table: Table name to migrate ('RAWDATACOR' or 'ELABDATADISP')
|
|
"""
|
|
if table not in TABLE_CONFIGS:
|
|
raise ValueError(f"Unknown table: {table}")
|
|
|
|
self.table = table
|
|
self.config = TABLE_CONFIGS[table]
|
|
self.settings = get_settings()
|
|
self.state = MigrationState()
|
|
|
|
def migrate(self, dry_run: bool = False, resume: bool = False) -> int:
|
|
"""Perform full migration of the table with resume capability.
|
|
|
|
Args:
|
|
dry_run: If True, log what would be done but don't modify data
|
|
resume: If True, resume from last checkpoint; if False, check for conflicts
|
|
|
|
Returns:
|
|
Total number of rows migrated in this run
|
|
"""
|
|
setup_logger(__name__)
|
|
|
|
mysql_table = self.config["mysql_table"]
|
|
pg_table = self.config["postgres_table"]
|
|
|
|
logger.info(f"Starting full migration of {mysql_table} -> {pg_table}")
|
|
|
|
try:
|
|
with MySQLConnector() as mysql_conn:
|
|
# Get total row count
|
|
total_rows = mysql_conn.get_row_count(mysql_table)
|
|
logger.info(f"Total rows in source: {total_rows}")
|
|
|
|
with PostgreSQLConnector() as pg_conn:
|
|
# Check if table exists
|
|
if not pg_conn.table_exists(pg_table):
|
|
raise ValueError(
|
|
f"PostgreSQL table {pg_table} does not exist. "
|
|
"Run 'setup --create-schema' first."
|
|
)
|
|
|
|
# Check for previous migration state
|
|
# Note: With partition-based consolidation, we track progress differently
|
|
# than with ID-based pagination. The resume capability is simplified:
|
|
# - If data exists in table, migration was in progress
|
|
# - Resume will continue from where we left off
|
|
# - Full restart requires clearing the table
|
|
|
|
previous_migrated_count = self._get_previous_migrated_count(pg_conn, pg_table)
|
|
|
|
if previous_migrated_count > 0:
|
|
pg_row_count = pg_conn.get_row_count(pg_table)
|
|
logger.warning(
|
|
f"Found previous migration state: {pg_row_count} rows already in {pg_table}"
|
|
)
|
|
if not resume:
|
|
raise ValueError(
|
|
f"Migration already in progress for {pg_table}. "
|
|
f"Use --resume to continue from last checkpoint, or delete data to restart."
|
|
)
|
|
logger.info(f"Resuming migration - found {pg_row_count} existing rows")
|
|
# Progress bar tracks MySQL rows processed (before consolidation)
|
|
# Consolidation reduces count but not the rows we need to fetch
|
|
rows_to_migrate = total_rows
|
|
else:
|
|
previous_migrated_count = 0
|
|
rows_to_migrate = total_rows
|
|
|
|
if dry_run:
|
|
logger.info(f"[DRY RUN] Would migrate {rows_to_migrate} rows")
|
|
return rows_to_migrate
|
|
|
|
migrated = previous_migrated_count
|
|
migration_start_time = datetime.utcnow().isoformat()
|
|
batch_count = 0
|
|
|
|
with ProgressTracker(
|
|
rows_to_migrate,
|
|
f"Migrating {mysql_table}"
|
|
) as progress:
|
|
columns = DataTransformer.get_column_order(pg_table)
|
|
|
|
# Get list of partitions and process each one
|
|
partitions = mysql_conn.get_table_partitions(mysql_table)
|
|
logger.info(f"Found {len(partitions)} partitions for {mysql_table}")
|
|
|
|
for partition_idx, partition in enumerate(partitions, 1):
|
|
logger.info(f"[{partition_idx}/{len(partitions)}] Processing partition {partition}...")
|
|
partition_group_count = 0
|
|
|
|
# Accumulate rows for batch insertion to reduce database round-trips
|
|
insert_buffer = []
|
|
# Use smaller batch size for more frequent updates: batch_size * 5 = 50k rows
|
|
insert_buffer_size = self.settings.migration.batch_size * 5
|
|
fetched_in_buffer = 0 # Track MySQL rows fetched (before consolidation)
|
|
|
|
# Fetch consolidation groups from partition
|
|
# Each group is a list of rows with the same (unit, tool, date, time)
|
|
for group_rows in mysql_conn.fetch_consolidation_groups_from_partition(
|
|
mysql_table,
|
|
partition
|
|
):
|
|
if not group_rows:
|
|
break
|
|
|
|
# Consolidate the group
|
|
transformed = DataTransformer.transform_batch(
|
|
mysql_table,
|
|
group_rows,
|
|
consolidate=True
|
|
)
|
|
|
|
# Add to insert buffer instead of inserting immediately
|
|
insert_buffer.extend(transformed)
|
|
partition_group_count += len(transformed)
|
|
fetched_in_buffer += len(group_rows)
|
|
|
|
# When buffer is full, flush to database
|
|
if len(insert_buffer) >= insert_buffer_size:
|
|
inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns)
|
|
if inserted > 0:
|
|
migrated += inserted
|
|
batch_count += 1
|
|
progress.update(fetched_in_buffer)
|
|
# Update migration state after every batch flush
|
|
self._update_migration_state(
|
|
pg_conn, migrated, None, migration_start_time
|
|
)
|
|
logger.debug(
|
|
f"Partition {partition}: flushed {inserted} rows, "
|
|
f"total migrated: {migrated}"
|
|
)
|
|
|
|
insert_buffer = []
|
|
fetched_in_buffer = 0
|
|
|
|
# Flush remaining rows in buffer for this partition
|
|
if insert_buffer:
|
|
inserted = pg_conn.insert_batch(pg_table, insert_buffer, columns)
|
|
if inserted > 0:
|
|
migrated += inserted
|
|
batch_count += 1
|
|
progress.update(fetched_in_buffer)
|
|
self._update_migration_state(
|
|
pg_conn, migrated, None, migration_start_time
|
|
)
|
|
logger.debug(
|
|
f"Partition {partition} final flush: {inserted} rows, "
|
|
f"total migrated: {migrated}"
|
|
)
|
|
|
|
logger.info(f"Partition {partition} complete: {partition_group_count} groups consolidated")
|
|
|
|
# Get final actual count from PostgreSQL
|
|
final_count = pg_conn.get_row_count(pg_table)
|
|
logger.info(f"Final count from PostgreSQL: {final_count}")
|
|
|
|
# Update migration state with final count and mark as completed
|
|
# Get the actual last ID from the table
|
|
try:
|
|
with pg_conn.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
f"SELECT MAX(id) FROM {pg_table}"
|
|
)
|
|
result = cursor.fetchone()
|
|
final_last_id = result[0] if result and result[0] else None
|
|
logger.info(f"Final last ID from table: {final_last_id}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get final last ID: {e}")
|
|
final_last_id = None
|
|
|
|
logger.info(f"About to update migration_state with count={final_count}, last_id={final_last_id}")
|
|
self._update_migration_state(
|
|
pg_conn, final_count, final_last_id, migration_start_time, is_final=True
|
|
)
|
|
logger.info(f"Migration state update complete")
|
|
|
|
logger.info(
|
|
f"✓ Migration complete: {final_count} total rows in {pg_table}"
|
|
)
|
|
|
|
return final_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Migration failed: {e}")
|
|
raise
|
|
|
|
def _get_last_migrated_id(self, pg_conn: PostgreSQLConnector, pg_table: str) -> Optional[int]:
|
|
"""Get the last migrated MySQL ID from migration_state table.
|
|
|
|
Args:
|
|
pg_conn: PostgreSQL connection
|
|
pg_table: PostgreSQL table name
|
|
|
|
Returns:
|
|
Last migrated MySQL ID or None if no previous migration
|
|
"""
|
|
try:
|
|
with pg_conn.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"SELECT last_migrated_id FROM migration_state WHERE table_name = %s",
|
|
(pg_table,)
|
|
)
|
|
result = cursor.fetchone()
|
|
if result and result[0]:
|
|
return result[0]
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _get_previous_migrated_count(self, pg_conn: PostgreSQLConnector, pg_table: str) -> int:
|
|
"""Get the total rows migrated so far from migration_state table.
|
|
|
|
Args:
|
|
pg_conn: PostgreSQL connection
|
|
pg_table: PostgreSQL table name
|
|
|
|
Returns:
|
|
Total rows migrated so far (0 if no previous migration)
|
|
"""
|
|
try:
|
|
with pg_conn.connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"SELECT total_rows_migrated FROM migration_state WHERE table_name = %s",
|
|
(pg_table,)
|
|
)
|
|
result = cursor.fetchone()
|
|
if result and result[0]:
|
|
return result[0]
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
|
|
def _update_migration_state(
|
|
self,
|
|
pg_conn: PostgreSQLConnector,
|
|
rows_migrated: int,
|
|
last_id: Optional[int] = None,
|
|
migration_start_time: Optional[str] = None,
|
|
is_final: bool = False
|
|
) -> None:
|
|
"""Update migration state in PostgreSQL and state file.
|
|
|
|
Args:
|
|
pg_conn: PostgreSQL connection
|
|
rows_migrated: Total number of rows migrated so far
|
|
last_id: Last ID that was migrated (for resume capability)
|
|
migration_start_time: When the migration started (ISO format)
|
|
is_final: If True, mark migration as completed
|
|
"""
|
|
pg_table = self.config["postgres_table"]
|
|
now = datetime.utcnow()
|
|
status = "completed" if is_final else "in_progress"
|
|
|
|
# Update PostgreSQL migration_state table
|
|
try:
|
|
with pg_conn.connection.cursor() as cursor:
|
|
query = f"""
|
|
INSERT INTO migration_state
|
|
(table_name, last_migrated_timestamp, last_migrated_id, total_rows_migrated, migration_completed_at, status)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (table_name) DO UPDATE SET
|
|
last_migrated_timestamp = EXCLUDED.last_migrated_timestamp,
|
|
last_migrated_id = EXCLUDED.last_migrated_id,
|
|
total_rows_migrated = EXCLUDED.total_rows_migrated,
|
|
migration_completed_at = EXCLUDED.migration_completed_at,
|
|
status = EXCLUDED.status
|
|
"""
|
|
cursor.execute(
|
|
query,
|
|
(
|
|
pg_table,
|
|
migration_start_time or now.isoformat(),
|
|
last_id,
|
|
rows_migrated,
|
|
now if status == "completed" else None,
|
|
status
|
|
)
|
|
)
|
|
pg_conn.connection.commit()
|
|
logger.debug(f"Migration state updated: {rows_migrated} rows total, last_id={last_id}, status={status}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to update migration state in PostgreSQL: {e}")
|
|
|
|
# Also save to state file for incremental migrations
|
|
try:
|
|
self.state.set_last_timestamp(pg_table, migration_start_time or now.isoformat())
|
|
self.state.increment_migration_count(pg_table, rows_migrated)
|
|
logger.debug("Migration state saved to file")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to save migration state to file: {e}")
|
|
|
|
|
|
def run_full_migration(
|
|
table: str,
|
|
dry_run: bool = False,
|
|
resume: bool = False
|
|
) -> int:
|
|
"""Run full migration for a table.
|
|
|
|
Args:
|
|
table: Table name to migrate
|
|
dry_run: If True, show what would be done without modifying data
|
|
resume: If True, resume from last checkpoint instead of starting fresh
|
|
|
|
Returns:
|
|
Number of rows migrated in this run
|
|
"""
|
|
migrator = FullMigrator(table)
|
|
return migrator.migrate(dry_run=dry_run, resume=resume)
|