From 5c9df3d06f6381963f2231577569b8d2fd0a3274 Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 30 Dec 2025 15:16:54 +0100 Subject: [PATCH] fix incremental --- config.py | 25 +- main.py | 43 ++- src/connectors/mysql_connector.py | 478 +++++++++++++++++++++++-- src/connectors/postgres_connector.py | 311 ++++++++++++---- src/migrator/__init__.py | 25 ++ src/migrator/consolidator.py | 330 +++++++++++++++++ src/migrator/full_migrator.py | 240 +++++++++++++ src/migrator/incremental_migrator.py | 324 +++++++++++++++++ src/migrator/parallel_migrator.py | 220 ++++++++++++ src/migrator/partition_migrator.py | 355 ++++++++++++++++++ src/migrator/state_manager.py | 347 ++++++++++++++++++ src/transformers/schema_transformer.py | 163 ++++++--- src/utils/progress.py | 116 +++--- src/utils/validation.py | 157 ++++++++ 14 files changed, 2901 insertions(+), 233 deletions(-) create mode 100644 src/migrator/consolidator.py create mode 100644 src/migrator/full_migrator.py create mode 100644 src/migrator/incremental_migrator.py create mode 100644 src/migrator/parallel_migrator.py create mode 100644 src/migrator/partition_migrator.py create mode 100644 src/migrator/state_manager.py create mode 100644 src/utils/validation.py diff --git a/config.py b/config.py index c81ceab..4be7abc 100644 --- a/config.py +++ b/config.py @@ -52,6 +52,7 @@ class MigrationSettings(BaseSettings): consolidation_group_limit: int = 10000 log_level: str = "INFO" dry_run: bool = False + progress_log_interval: int = 50000 class BenchmarkSettings(BaseSettings): @@ -152,20 +153,32 @@ ELABDATADISP_FIELD_MAPPING = { # PostgreSQL Partition years (from both tables) PARTITION_YEARS = list(range(2014, 2032)) # 2014-2031 +# Consolidation key definition (same for both tables) +# Multiple MySQL rows with same key but different NodeNum → 1 PostgreSQL row +# MySQL source fields +CONSOLIDATION_KEY_FIELDS = ["UnitName", "ToolNameID", "EventDate", "EventTime"] +# Keys for tracking in migration_state.last_key (NOT actual PostgreSQL target columns) +# Note: In PostgreSQL target, EventDate+EventTime become event_timestamp +CONSOLIDATION_KEY_PG_FIELDS = ["unit_name", "tool_name_id", "event_date", "event_time"] + # Table configurations - support both uppercase and lowercase keys _rawdatacor_config = { "mysql_table": "RAWDATACOR", "postgres_table": "rawdatacor", - "primary_key": "id", - "postgres_pk": "id", # Primary key column name in PostgreSQL - "partition_key": "event_timestamp", + "mysql_pk": "id", # MySQL primary key + "postgres_pk": "id", # PostgreSQL auto-increment primary key + "mysql_max_id_field": "id", # Field to track max ID from MySQL + "consolidation_key": CONSOLIDATION_KEY_FIELDS, + "consolidation_key_pg": CONSOLIDATION_KEY_PG_FIELDS, } _elabdatadisp_config = { "mysql_table": "ELABDATADISP", "postgres_table": "elabdatadisp", - "primary_key": "idElabData", - "postgres_pk": "id_elab_data", # Primary key column name in PostgreSQL - "partition_key": "event_timestamp", + "mysql_pk": "idElabData", # MySQL primary key + "postgres_pk": "id", # PostgreSQL auto-increment primary key + "mysql_max_id_field": "idElabData", # Field to track max ID from MySQL + "consolidation_key": CONSOLIDATION_KEY_FIELDS, + "consolidation_key_pg": CONSOLIDATION_KEY_PG_FIELDS, } TABLE_CONFIGS = { diff --git a/main.py b/main.py index df87fde..67c7655 100644 --- a/main.py +++ b/main.py @@ -6,8 +6,9 @@ from pathlib import Path from config import get_settings from src.utils.logger import setup_logger, get_logger from src.transformers.schema_transformer import get_full_schema_script -from src.migrator.full_migration import run_full_migration -from src.migrator.incremental_migration import run_incremental_migration +from src.migrator.full_migrator import run_full_migration +from src.migrator.incremental_migrator import run_incremental_migration +from src.migrator.parallel_migrator import run_parallel_migration from src.benchmark.performance_test import run_benchmark from src.connectors.postgres_connector import PostgreSQLConnector @@ -80,18 +81,36 @@ def migrate(): default=None, help="Only migrate this partition (for testing/debugging)" ) -def full(table, dry_run, resume, partition): +@click.option( + "--parallel", + type=int, + default=None, + help="Number of parallel workers (e.g., --parallel 5 for 5 workers)" +) +def full(table, dry_run, resume, partition, parallel): """Perform full migration of all data.""" setup_logger(__name__) tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table] + # Validate options + if parallel and partition: + click.echo("✗ Cannot use --parallel with --partition", err=True) + sys.exit(1) + try: total_migrated = 0 for tbl in tables: - click.echo(f"\nMigrating {tbl}" + (f" (partition {partition})" if partition else "") + "...") - migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume, partition=partition) + if parallel: + # Parallel migration mode + click.echo(f"\nMigrating {tbl} with {parallel} parallel workers...") + migrated = run_parallel_migration(tbl, num_workers=parallel, dry_run=dry_run, resume=resume) + else: + # Sequential migration mode + click.echo(f"\nMigrating {tbl}" + (f" (partition {partition})" if partition else "") + "...") + migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume, partition=partition) + total_migrated += migrated click.echo(f"✓ {tbl}: {migrated} rows migrated") @@ -115,14 +134,9 @@ def full(table, dry_run, resume, partition): is_flag=True, help="Show what would be done without modifying data" ) -@click.option( - "--state-file", - default="migration_state.json", - help="Path to migration state file" -) -def incremental(table, dry_run, state_file): - """Perform incremental migration since last sync.""" - setup_logger(__name__) +def incremental(table, dry_run): + """Perform incremental migration since last sync (based on consolidation keys).""" + setup_logger("") # Set up root logger so all child loggers work tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table] @@ -131,7 +145,7 @@ def incremental(table, dry_run, state_file): for tbl in tables: click.echo(f"\nIncremental migration for {tbl}...") - migrated = run_incremental_migration(tbl, dry_run=dry_run, state_file=state_file) + migrated = run_incremental_migration(tbl, dry_run=dry_run) total_migrated += migrated if migrated > 0: click.echo(f"✓ {tbl}: {migrated} rows migrated") @@ -196,6 +210,7 @@ def info(): click.echo("\n[Migration Settings]") click.echo(f" Batch Size: {settings.migration.batch_size}") + click.echo(f" Consolidation Group Limit: {settings.migration.consolidation_group_limit}") click.echo(f" Log Level: {settings.migration.log_level}") click.echo(f" Dry Run: {settings.migration.dry_run}") diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index 1ca8850..001f1ef 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -1,6 +1,6 @@ """MySQL database connector.""" import pymysql -from typing import List, Dict, Any, Optional, Generator +from typing import List, Dict, Any, Optional, Generator, Iterator from config import get_settings from src.utils.logger import get_logger @@ -10,6 +10,8 @@ logger = get_logger(__name__) class MySQLConnector: """Connector for MySQL database.""" + MAX_RETRIES = 3 # Number of retries for transient connection errors + def __init__(self): """Initialize MySQL connector with settings.""" self.settings = get_settings() @@ -38,6 +40,16 @@ class MySQLConnector: logger.error(f"Failed to connect to MySQL: {e}") raise + def _reconnect(self) -> None: + """Reconnect to MySQL database after connection loss.""" + try: + self.disconnect() + self.connect() + logger.info("Successfully reconnected to MySQL") + except Exception as e: + logger.error(f"Failed to reconnect to MySQL: {e}") + raise + def disconnect(self) -> None: """Close connection to MySQL database.""" if self.connection: @@ -62,14 +74,21 @@ class MySQLConnector: Returns: Number of rows in the table """ - try: - with self.connection.cursor() as cursor: - cursor.execute(f"SELECT COUNT(*) as count FROM `{table}`") - result = cursor.fetchone() - return result["count"] - except pymysql.Error as e: - logger.error(f"Failed to get row count for {table}: {e}") - raise + retries = 0 + while retries < self.MAX_RETRIES: + try: + with self.connection.cursor() as cursor: + cursor.execute(f"SELECT COUNT(*) as count FROM `{table}`") + result = cursor.fetchone() + return result["count"] + except pymysql.Error as e: + retries += 1 + if retries >= self.MAX_RETRIES: + logger.error(f"Failed to get row count for {table} after {self.MAX_RETRIES} retries: {e}") + raise + else: + logger.warning(f"Get row count failed (retry {retries}/{self.MAX_RETRIES}): {e}") + self._reconnect() def fetch_rows_since( self, @@ -195,19 +214,26 @@ class MySQLConnector: Returns: List of partition names """ - try: - with self.connection.cursor() as cursor: - cursor.execute(""" - SELECT PARTITION_NAME - FROM INFORMATION_SCHEMA.PARTITIONS - WHERE TABLE_NAME = %s - AND TABLE_SCHEMA = %s - ORDER BY PARTITION_NAME - """, (table, self.settings.mysql.database)) - return [row["PARTITION_NAME"] for row in cursor.fetchall()] - except pymysql.Error as e: - logger.error(f"Failed to get partitions for {table}: {e}") - raise + retries = 0 + while retries < self.MAX_RETRIES: + try: + with self.connection.cursor() as cursor: + cursor.execute(""" + SELECT PARTITION_NAME + FROM INFORMATION_SCHEMA.PARTITIONS + WHERE TABLE_NAME = %s + AND TABLE_SCHEMA = %s + ORDER BY PARTITION_NAME + """, (table, self.settings.mysql.database)) + return [row["PARTITION_NAME"] for row in cursor.fetchall()] + except pymysql.Error as e: + retries += 1 + if retries >= self.MAX_RETRIES: + logger.error(f"Failed to get partitions for {table} after {self.MAX_RETRIES} retries: {e}") + raise + else: + logger.warning(f"Get table partitions failed (retry {retries}/{self.MAX_RETRIES}): {e}") + self._reconnect() def fetch_consolidation_groups_from_partition( self, @@ -215,7 +241,8 @@ class MySQLConnector: partition: str, limit: Optional[int] = None, offset: int = 0, - start_id: Optional[int] = None + start_id: Optional[int] = None, + start_key: Optional[tuple] = None ) -> Generator[List[Dict[str, Any]], None, None]: """Fetch consolidation groups from a partition. @@ -231,6 +258,8 @@ class MySQLConnector: limit: Batch size for consolidation (uses config default if None) offset: Starting offset for pagination (unused, kept for compatibility) start_id: Resume from this ID (fetch id > start_id). If None, starts from beginning + start_key: Resume AFTER this consolidation key (unit_name, tool_name_id, event_date, event_time). + If provided, skips all keys <= start_key in MySQL query (efficient resume). Yields: Lists of rows grouped by consolidation key (complete groups only) @@ -243,8 +272,9 @@ class MySQLConnector: # Determine ID column name id_column = "idElabData" if table == "ELABDATADISP" else "id" - max_retries = 3 - last_completed_key = None # Last key we fully yielded (not incomplete) + + # Initialize last_completed_key with start_key for efficient resume + last_completed_key = start_key if start_key else None # CRITICAL: These must be OUTSIDE the while loop to persist across batch iterations # If they're inside the loop, buffered incomplete groups from previous batches get lost @@ -253,7 +283,7 @@ class MySQLConnector: while True: retries = 0 - while retries < max_retries: + while retries < self.MAX_RETRIES: try: with self.connection.cursor() as cursor: # ORDER BY consolidation key @@ -330,15 +360,391 @@ class MySQLConnector: except pymysql.Error as e: retries += 1 - if retries >= max_retries: - logger.error(f"Failed to fetch consolidation groups from {table} partition {partition} after {max_retries} retries: {e}") + if retries >= self.MAX_RETRIES: + logger.error(f"Failed to fetch consolidation groups from {table} partition {partition} after {self.MAX_RETRIES} retries: {e}") raise else: - logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}") - # Reconnect and retry - try: - self.disconnect() - self.connect() - except Exception as reconnect_error: - logger.error(f"Failed to reconnect: {reconnect_error}") - raise + logger.warning(f"Fetch consolidation groups failed (retry {retries}/{self.MAX_RETRIES}): {e}") + self._reconnect() + + def stream_partition_rows( + self, + table: str, + partition: str, + batch_size: int = 10000, + resume_from_key: Optional[Dict[str, Any]] = None + ) -> Iterator[Dict[str, Any]]: + """Stream all rows from a partition ordered by consolidation key. + + This eliminates the N+1 query pattern by fetching all data in a single + streaming query ordered by consolidation key. Rows are yielded one at a time. + + Query pattern: + SELECT * + FROM table PARTITION (partition) + WHERE (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?) -- if resuming + ORDER BY UnitName, ToolNameID, EventDate, EventTime, NodeNum + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + partition: Partition name (e.g., 'p2024') + batch_size: Number of rows to fetch at a time from server + resume_from_key: Last processed key to resume from (optional) + Format: {"unit_name": ..., "tool_name_id": ..., "event_date": ..., "event_time": ...} + + Yields: + MySQL row dicts, ordered by consolidation key then NodeNum + """ + if table not in ("RAWDATACOR", "ELABDATADISP"): + raise ValueError(f"Consolidation not supported for table {table}") + + retries = 0 + while retries < self.MAX_RETRIES: + try: + # Use regular DictCursor (client-side) to avoid filling /tmp on MySQL server + # SSCursor creates temp files on MySQL server which can fill /tmp + with self.connection.cursor() as cursor: + # Build WHERE clause for resume + where_clause = "" + params = [] + + if resume_from_key: + # Resume from last key using tuple comparison + where_clause = """ + WHERE (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s) + """ + params = [ + resume_from_key.get("unit_name"), + resume_from_key.get("tool_name_id"), + resume_from_key.get("event_date"), + resume_from_key.get("event_time"), + ] + + query = f""" + SELECT * + FROM `{table}` PARTITION (`{partition}`) + {where_clause} + ORDER BY UnitName, ToolNameID, EventDate, EventTime, NodeNum + """ + + logger.info( + f"Starting stream from {table} partition {partition}" + + (f" (resuming from key)" if resume_from_key else "") + ) + + cursor.execute(query, tuple(params) if params else None) + + # Fetch all rows (client-side cursor fetches to local memory) + # This avoids creating temp files on MySQL server + rows = cursor.fetchall() + + logger.info(f"Fetched {len(rows)} rows from partition {partition}") + + # Yield rows in batches for memory efficiency + for i in range(0, len(rows), batch_size): + batch = rows[i:i+batch_size] + for row in batch: + yield row + + logger.info(f"Finished streaming partition {partition}") + return # Success + + except pymysql.Error as e: + retries += 1 + if retries >= self.MAX_RETRIES: + logger.error( + f"Failed to stream from {table} partition {partition} " + f"after {self.MAX_RETRIES} retries: {e}" + ) + raise + else: + logger.warning( + f"Stream failed (retry {retries}/{self.MAX_RETRIES}): {e}" + ) + self._reconnect() + + def fetch_consolidation_keys_from_partition( + self, + table: str, + partition: str, + limit: Optional[int] = None, + offset: int = 0 + ) -> List[Dict[str, Any]]: + """Fetch distinct consolidation keys from a partition. + + Query pattern: + SELECT UnitName, ToolNameID, EventDate, EventTime + FROM table PARTITION (partition) + GROUP BY UnitName, ToolNameID, EventDate, EventTime + ORDER BY UnitName, ToolNameID, EventDate, EventTime + LIMIT X OFFSET Y + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + partition: Partition name (e.g., 'p2024') + limit: Number of keys to fetch (uses CONSOLIDATION_GROUP_LIMIT if None) + offset: Starting offset for pagination + + Returns: + List of dicts with keys: UnitName, ToolNameID, EventDate, EventTime + """ + if table not in ("RAWDATACOR", "ELABDATADISP"): + raise ValueError(f"Consolidation not supported for table {table}") + + if limit is None: + limit = self.settings.migration.consolidation_group_limit + + retries = 0 + while retries < self.MAX_RETRIES: + try: + with self.connection.cursor() as cursor: + query = f""" + SELECT UnitName, ToolNameID, EventDate, EventTime + FROM `{table}` PARTITION (`{partition}`) + GROUP BY UnitName, ToolNameID, EventDate, EventTime + ORDER BY UnitName, ToolNameID, EventDate, EventTime + LIMIT %s OFFSET %s + """ + cursor.execute(query, (limit, offset)) + keys = cursor.fetchall() + return keys + + except pymysql.Error as e: + retries += 1 + if retries >= self.MAX_RETRIES: + logger.error( + f"Failed to fetch consolidation keys from {table} " + f"partition {partition} (offset={offset}) after {self.MAX_RETRIES} retries: {e}" + ) + raise + else: + logger.warning( + f"Fetch consolidation keys failed (retry {retries}/{self.MAX_RETRIES}): {e}" + ) + self._reconnect() + + def fetch_records_for_key( + self, + table: str, + partition: str, + unit_name: Any, + tool_name_id: Any, + event_date: Any, + event_time: Any + ) -> List[Dict[str, Any]]: + """Fetch all records for a specific consolidation key. + + Query pattern: + SELECT * + FROM table PARTITION (partition) + WHERE UnitName = ? AND ToolNameID = ? + AND EventDate = ? AND EventTime = ? + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + partition: Partition name + unit_name: UnitName value + tool_name_id: ToolNameID value + event_date: EventDate value + event_time: EventTime value + + Returns: + List of all MySQL rows matching the key (different NodeNum) + """ + if table not in ("RAWDATACOR", "ELABDATADISP"): + raise ValueError(f"Consolidation not supported for table {table}") + + retries = 0 + while retries < self.MAX_RETRIES: + try: + with self.connection.cursor() as cursor: + query = f""" + SELECT * + FROM `{table}` PARTITION (`{partition}`) + WHERE UnitName = %s + AND ToolNameID = %s + AND EventDate = %s + AND EventTime = %s + ORDER BY NodeNum + """ + cursor.execute(query, (unit_name, tool_name_id, event_date, event_time)) + rows = cursor.fetchall() + return rows + + except pymysql.Error as e: + retries += 1 + if retries >= self.MAX_RETRIES: + logger.error( + f"Failed to fetch records for key " + f"({unit_name}, {tool_name_id}, {event_date}, {event_time}) " + f"from {table} partition {partition} after {self.MAX_RETRIES} retries: {e}" + ) + raise + else: + logger.warning( + f"Fetch records for key failed (retry {retries}/{self.MAX_RETRIES}): {e}" + ) + self._reconnect() + + def fetch_consolidation_keys_after( + self, + table: str, + after_key: Optional[Dict[str, Any]] = None, + min_mysql_id: int = 0, + limit: Optional[int] = None, + offset: int = 0 + ) -> List[Dict[str, Any]]: + """Fetch distinct consolidation keys after a specific key (for incremental migration). + + Query pattern: + SELECT UnitName, ToolNameID, EventDate, EventTime + FROM table + WHERE id > min_mysql_id + AND (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?) + GROUP BY UnitName, ToolNameID, EventDate, EventTime + ORDER BY UnitName, ToolNameID, EventDate, EventTime + LIMIT X OFFSET Y + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + after_key: Start after this key (dict with unit_name, tool_name_id, event_date, event_time) + min_mysql_id: Only fetch rows with id > this value (optimization to avoid scanning already migrated data) + limit: Number of keys to fetch (uses CONSOLIDATION_GROUP_LIMIT if None) + offset: Starting offset for pagination + + Returns: + List of dicts with keys: UnitName, ToolNameID, EventDate, EventTime + """ + if table not in ("RAWDATACOR", "ELABDATADISP"): + raise ValueError(f"Consolidation not supported for table {table}") + + if limit is None: + limit = self.settings.migration.consolidation_group_limit + + # Determine ID column name based on table + id_column = "idElabData" if table == "ELABDATADISP" else "id" + + retries = 0 + while retries < self.MAX_RETRIES: + try: + with self.connection.cursor() as cursor: + if after_key: + # Incremental: fetch keys AFTER the last migrated key + # Filter by ID first for performance (uses PRIMARY KEY index) + query = f""" + SELECT UnitName, ToolNameID, EventDate, EventTime + FROM `{table}` + WHERE `{id_column}` > %s + AND (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s) + GROUP BY UnitName, ToolNameID, EventDate, EventTime + ORDER BY UnitName, ToolNameID, EventDate, EventTime + LIMIT %s OFFSET %s + """ + cursor.execute( + query, + ( + min_mysql_id, + after_key.get("unit_name"), + after_key.get("tool_name_id"), + after_key.get("event_date"), + after_key.get("event_time"), + limit, + offset + ) + ) + else: + # No after_key: fetch from beginning + # Still filter by ID if min_mysql_id is provided + if min_mysql_id > 0: + query = f""" + SELECT UnitName, ToolNameID, EventDate, EventTime + FROM `{table}` + WHERE `{id_column}` > %s + GROUP BY UnitName, ToolNameID, EventDate, EventTime + ORDER BY UnitName, ToolNameID, EventDate, EventTime + LIMIT %s OFFSET %s + """ + cursor.execute(query, (min_mysql_id, limit, offset)) + else: + query = f""" + SELECT UnitName, ToolNameID, EventDate, EventTime + FROM `{table}` + GROUP BY UnitName, ToolNameID, EventDate, EventTime + ORDER BY UnitName, ToolNameID, EventDate, EventTime + LIMIT %s OFFSET %s + """ + cursor.execute(query, (limit, offset)) + + keys = cursor.fetchall() + return keys + + except pymysql.Error as e: + retries += 1 + if retries >= self.MAX_RETRIES: + logger.error( + f"Failed to fetch consolidation keys from {table} " + f"(after_key={after_key}, offset={offset}) after {self.MAX_RETRIES} retries: {e}" + ) + raise + else: + logger.warning( + f"Fetch consolidation keys after failed (retry {retries}/{self.MAX_RETRIES}): {e}" + ) + self._reconnect() + + def fetch_records_for_key_all_partitions( + self, + table: str, + unit_name: Any, + tool_name_id: Any, + event_date: Any, + event_time: Any + ) -> List[Dict[str, Any]]: + """Fetch all records for a specific consolidation key across all partitions. + + Used for incremental migration where we don't know which partition the key is in. + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + unit_name: UnitName value + tool_name_id: ToolNameID value + event_date: EventDate value + event_time: EventTime value + + Returns: + List of all MySQL rows matching the key (different NodeNum) + """ + if table not in ("RAWDATACOR", "ELABDATADISP"): + raise ValueError(f"Consolidation not supported for table {table}") + + retries = 0 + while retries < self.MAX_RETRIES: + try: + with self.connection.cursor() as cursor: + query = f""" + SELECT * + FROM `{table}` + WHERE UnitName = %s + AND ToolNameID = %s + AND EventDate = %s + AND EventTime = %s + ORDER BY NodeNum + """ + cursor.execute(query, (unit_name, tool_name_id, event_date, event_time)) + rows = cursor.fetchall() + return rows + + except pymysql.Error as e: + retries += 1 + if retries >= self.MAX_RETRIES: + logger.error( + f"Failed to fetch records for key " + f"({unit_name}, {tool_name_id}, {event_date}, {event_time}) " + f"from {table} after {self.MAX_RETRIES} retries: {e}" + ) + raise + else: + logger.warning( + f"Fetch records for key (all partitions) failed (retry {retries}/{self.MAX_RETRIES}): {e}" + ) + self._reconnect() diff --git a/src/connectors/postgres_connector.py b/src/connectors/postgres_connector.py index 8ffc68c..5ca535b 100644 --- a/src/connectors/postgres_connector.py +++ b/src/connectors/postgres_connector.py @@ -1,7 +1,6 @@ """PostgreSQL database connector.""" import psycopg -from typing import List, Dict, Any, Optional, Iterator -from psycopg import sql +from typing import List, Dict, Any import json from config import get_settings from src.utils.logger import get_logger @@ -13,72 +12,151 @@ class PostgreSQLConnector: """Connector for PostgreSQL database.""" def __init__(self): - """Initialize PostgreSQL connector with settings.""" + """Initialize the PostgreSQL connector.""" self.settings = get_settings() self.connection = None - def connect(self) -> None: - """Establish connection to PostgreSQL database.""" + def connect(self): + """Connect to PostgreSQL.""" try: self.connection = psycopg.connect( host=self.settings.postgres.host, port=self.settings.postgres.port, + dbname=self.settings.postgres.database, user=self.settings.postgres.user, password=self.settings.postgres.password, - dbname=self.settings.postgres.database, autocommit=False, ) logger.info( - f"Connected to PostgreSQL: {self.settings.postgres.host}:" - f"{self.settings.postgres.port}/{self.settings.postgres.database}" + f"Connected to PostgreSQL: {self.settings.postgres.host}:{self.settings.postgres.port}/{self.settings.postgres.database}" ) except psycopg.Error as e: logger.error(f"Failed to connect to PostgreSQL: {e}") raise - def disconnect(self) -> None: - """Close connection to PostgreSQL database.""" + def disconnect(self): + """Disconnect from PostgreSQL.""" if self.connection: self.connection.close() logger.info("Disconnected from PostgreSQL") def __enter__(self): - """Context manager entry.""" + """Context manager entry - connect to database.""" self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - if exc_type is None: - # No exception, commit before closing - try: - self.connection.commit() - except Exception as e: - logger.warning(f"Failed to commit on exit: {e}") - else: - # Exception occurred, rollback - try: - self.connection.rollback() - except Exception as e: - logger.warning(f"Failed to rollback on exit: {e}") + """Context manager exit - disconnect from database.""" self.disconnect() + return False - def execute(self, query: str, params: Optional[tuple] = None) -> None: - """Execute a query without returning results with retry logic. + def execute_query( + self, query: str, params: tuple = None, fetch: bool = False + ) -> List[Dict[str, Any]]: + """Execute a query and optionally fetch results. Args: - query: SQL query + query: SQL query to execute params: Query parameters + fetch: Whether to fetch and return results + + Returns: + List of result rows as dictionaries (if fetch=True) """ + try: + with self.connection.cursor() as cursor: + cursor.execute(query, params) + if fetch: + columns = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() + return [dict(zip(columns, row)) for row in rows] + self.connection.commit() + return [] + except psycopg.Error as e: + self.connection.rollback() + logger.error(f"Query execution failed: {e}") + raise + + def execute_script(self, script: str): + """Execute a SQL script (multiple statements). + + Args: + script: SQL script to execute + """ + try: + with self.connection.cursor() as cursor: + cursor.execute(script) + self.connection.commit() + logger.debug("Script executed successfully") + except psycopg.Error as e: + self.connection.rollback() + logger.error(f"Script execution failed: {e}") + raise + + def copy_from( + self, + table: str, + rows: List[Dict[str, Any]], + columns: List[str] + ) -> int: + """Insert a batch of rows using PostgreSQL COPY (10-100x faster than INSERT). + + Args: + table: Table name + rows: List of row dictionaries + columns: Column names in order + + Returns: + Number of rows inserted + """ + if not rows: + return 0 + max_retries = 3 retries = 0 while retries < max_retries: try: with self.connection.cursor() as cursor: - cursor.execute(query, params) + # Prepare data for COPY + # COPY expects tab-separated text with one row per line + from io import StringIO + + copy_data = StringIO() + for row in rows: + values = [] + for col in columns: + val = row.get(col) + # Handle None/NULL + if val is None: + values.append("\\N") + # Convert JSONB dicts to JSON strings + elif isinstance(val, (dict, list)): + # Escape special characters in JSON + json_str = json.dumps(val).replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") + values.append(json_str) + # Convert bool to PostgreSQL format + elif isinstance(val, bool): + values.append("t" if val else "f") + # All other values as strings + else: + # Escape special characters + str_val = str(val).replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") + values.append(str_val) + + copy_data.write("\t".join(values) + "\n") + + copy_data.seek(0) + + # Use COPY command + with cursor.copy(f"COPY {table} ({','.join(columns)}) FROM STDIN") as copy: + copy.write(copy_data.getvalue()) + self.connection.commit() - return # Success + + logger.debug(f"COPY inserted {len(rows)} rows into {table}") + return len(rows) + except psycopg.Error as e: try: self.connection.rollback() @@ -87,11 +165,129 @@ class PostgreSQLConnector: retries += 1 if retries >= max_retries: - logger.error(f"Query execution failed after {max_retries} retries: {e}\nQuery: {query}") + logger.error(f"COPY insert failed after {max_retries} retries: {e}") raise else: logger.warning( - f"Query execution failed (retry {retries}/{max_retries}): {e}. " + f"COPY insert failed (retry {retries}/{max_retries}): {e}. " + f"Reconnecting and retrying..." + ) + # Reconnect before retry + try: + self.disconnect() + except Exception: + pass + try: + self.connect() + except Exception as reconnect_error: + logger.error(f"Failed to reconnect: {reconnect_error}") + if retries >= max_retries: + raise + + def copy_from_with_conflict( + self, + table: str, + rows: List[Dict[str, Any]], + columns: List[str], + conflict_columns: List[str] + ) -> int: + """Insert a batch of rows using COPY + ON CONFLICT for duplicate handling. + + This method is fast (uses COPY) but handles UNIQUE constraint violations: + 1. COPY data into a temporary table (fast bulk load) + 2. INSERT from temp table with ON CONFLICT DO NOTHING (skip duplicates) + + Args: + table: Target table name + rows: List of row dictionaries + columns: Column names in order + conflict_columns: Columns that form the UNIQUE constraint + + Returns: + Number of rows inserted (excludes skipped duplicates) + """ + if not rows: + return 0 + + max_retries = 3 + retries = 0 + + while retries < max_retries: + try: + with self.connection.cursor() as cursor: + # Create temporary table with same structure but without id column + # The id column is auto-generated and shouldn't be in the COPY data + temp_table = f"{table}_temp_{id(rows)}" + cursor.execute(f""" + CREATE TEMP TABLE {temp_table} + (LIKE {table} INCLUDING DEFAULTS EXCLUDING IDENTITY) + ON COMMIT DROP + """) + # Drop the id column from temp table since we don't provide it in COPY + cursor.execute(f"ALTER TABLE {temp_table} DROP COLUMN IF EXISTS id") + + # COPY into temp table + from io import StringIO + + copy_data = StringIO() + for row in rows: + values = [] + for col in columns: + val = row.get(col) + if val is None: + values.append("\\N") + elif isinstance(val, (dict, list)): + json_str = json.dumps(val).replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") + values.append(json_str) + elif isinstance(val, bool): + values.append("t" if val else "f") + else: + str_val = str(val).replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") + values.append(str_val) + + copy_data.write("\t".join(values) + "\n") + + copy_data.seek(0) + + with cursor.copy(f"COPY {temp_table} ({','.join(columns)}) FROM STDIN") as copy: + copy.write(copy_data.getvalue()) + + # INSERT from temp table with ON CONFLICT + conflict_clause = f"({','.join(conflict_columns)})" + insert_sql = f""" + INSERT INTO {table} ({','.join(columns)}) + SELECT {','.join(columns)} + FROM {temp_table} + ON CONFLICT {conflict_clause} DO NOTHING + """ + cursor.execute(insert_sql) + inserted_count = cursor.rowcount + + self.connection.commit() + + if inserted_count < len(rows): + logger.debug( + f"COPY+ON CONFLICT inserted {inserted_count}/{len(rows)} rows into {table} " + f"({len(rows) - inserted_count} duplicates skipped)" + ) + else: + logger.debug(f"COPY+ON CONFLICT inserted {inserted_count} rows into {table}") + + return inserted_count + + except psycopg.Error as e: + try: + self.connection.rollback() + except Exception: + pass + + retries += 1 + if retries >= max_retries: + logger.error(f"COPY+ON CONFLICT insert failed after {max_retries} retries: {e}") + raise + else: + logger.warning( + f"COPY+ON CONFLICT insert failed (retry {retries}/{max_retries}): {e}. " f"Reconnecting and retrying..." ) try: @@ -105,22 +301,6 @@ class PostgreSQLConnector: if retries >= max_retries: raise - def execute_script(self, script: str) -> None: - """Execute multiple SQL statements (script). - - Args: - script: SQL script with multiple statements - """ - try: - with self.connection.cursor() as cursor: - cursor.execute(script) - self.connection.commit() - logger.debug("Script executed successfully") - except psycopg.Error as e: - self.connection.rollback() - logger.error(f"Script execution failed: {e}") - raise - def insert_batch( self, table: str, @@ -204,48 +384,21 @@ class PostgreSQLConnector: table: Table name Returns: - True if table exists, False otherwise + True if table exists """ try: with self.connection.cursor() as cursor: cursor.execute( - "SELECT EXISTS(" - " SELECT 1 FROM information_schema.tables " - " WHERE table_name = %s" - ")", - (table,) + "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = %s)", + (table,), ) return cursor.fetchone()[0] except psycopg.Error as e: - logger.error(f"Failed to check if table exists: {e}") - raise - - def get_max_timestamp( - self, - table: str, - timestamp_col: str = "created_at" - ) -> Optional[str]: - """Get the maximum timestamp from a table. - - Args: - table: Table name - timestamp_col: Timestamp column name - - Returns: - ISO format timestamp or None if table is empty - """ - try: - with self.connection.cursor() as cursor: - query = f"SELECT MAX({timestamp_col})::text FROM {table}" - cursor.execute(query) - result = cursor.fetchone() - return result[0] if result and result[0] else None - except psycopg.Error as e: - logger.error(f"Failed to get max timestamp: {e}") + logger.error(f"Failed to check table existence: {e}") raise def get_row_count(self, table: str) -> int: - """Get row count for a table. + """Get the number of rows in a table. Args: table: Table name diff --git a/src/migrator/__init__.py b/src/migrator/__init__.py index e69de29..e8ccbb3 100644 --- a/src/migrator/__init__.py +++ b/src/migrator/__init__.py @@ -0,0 +1,25 @@ +"""Migration modules for MySQL to PostgreSQL migration. + +New architecture: +- consolidator: Consolidate MySQL rows by NodeNum into single PostgreSQL row +- state_manager: Track migration progress in PostgreSQL migration_state table +- partition_migrator: Migrate single MySQL partition +- full_migrator: Orchestrate full migration of all partitions +- incremental_migrator: Migrate only new data since last migration +""" +from src.migrator.consolidator import Consolidator, consolidate_rows +from src.migrator.state_manager import StateManager +from src.migrator.partition_migrator import PartitionMigrator +from src.migrator.full_migrator import FullMigrator, run_full_migration +from src.migrator.incremental_migrator import IncrementalMigrator, run_incremental_migration + +__all__ = [ + "Consolidator", + "consolidate_rows", + "StateManager", + "PartitionMigrator", + "FullMigrator", + "run_full_migration", + "IncrementalMigrator", + "run_incremental_migration", +] diff --git a/src/migrator/consolidator.py b/src/migrator/consolidator.py new file mode 100644 index 0000000..2d49b16 --- /dev/null +++ b/src/migrator/consolidator.py @@ -0,0 +1,330 @@ +"""Consolidation logic for MySQL to PostgreSQL migration. + +Consolidates multiple MySQL rows (different NodeNum) with same consolidation key +into a single PostgreSQL row with measurements stored in JSONB. + +Consolidation key: (UnitName, ToolNameID, EventDate, EventTime) +Multiple nodes → single JSONB structure with node_X keys +""" +from typing import List, Dict, Any, Optional +from datetime import datetime, date, time, timedelta +from decimal import Decimal + +from config import ( + TABLE_CONFIGS, + RAWDATACOR_COLUMNS, + ELABDATADISP_FIELD_MAPPING, +) +from src.utils.logger import get_logger + +logger = get_logger(__name__) + + +def _combine_datetime(event_date: Any, event_time: Any) -> Optional[datetime]: + """Combine MySQL EventDate and EventTime into a single datetime. + + Args: + event_date: MySQL EventDate (date object or string) + event_time: MySQL EventTime (time object or string) + + Returns: + Combined datetime or None if inputs are invalid + """ + if event_date is None or event_time is None: + return None + + try: + # Handle if already datetime objects + if isinstance(event_date, datetime): + event_date = event_date.date() + elif isinstance(event_date, str): + event_date = datetime.fromisoformat(event_date).date() + + if isinstance(event_time, str): + event_time = datetime.strptime(event_time, "%H:%M:%S").time() + elif isinstance(event_time, datetime): + event_time = event_time.time() + elif isinstance(event_time, timedelta): + # Convert timedelta to time (MySQL TIME type returns timedelta) + total_seconds = int(event_time.total_seconds()) + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + seconds = total_seconds % 60 + event_time = time(hours, minutes, seconds) + + # Combine date and time + return datetime.combine(event_date, event_time) + except Exception as e: + logger.warning(f"Failed to combine date {event_date} and time {event_time}: {e}") + return None + + +class Consolidator: + """Consolidate MySQL rows into PostgreSQL format with node-based JSONB.""" + + @staticmethod + def consolidate_rawdatacor(mysql_rows: List[Dict[str, Any]]) -> Dict[str, Any]: + """Consolidate RAWDATACOR rows by NodeNum into single PostgreSQL row. + + Args: + mysql_rows: List of MySQL rows with same (UnitName, ToolNameID, EventDate, EventTime) + but different NodeNum + + Returns: + Single PostgreSQL row dict with consolidated measurements + + Example measurements structure: + { + "node_1": { + "0": {"value": "123.45", "unit": "°C"}, + "1": {"value": "67.89", "unit": "bar"}, + ... + "F": {"value": "11.22", "unit": "m/s"} + }, + "node_2": { ... } + } + """ + if not mysql_rows: + raise ValueError("Cannot consolidate empty row list") + + # Get consolidation key from first row (all rows have same key) + first_row = mysql_rows[0] + config = TABLE_CONFIGS["RAWDATACOR"] + + # Build measurements JSONB by node + measurements = {} + mysql_ids = [] + + for row in mysql_rows: + node_num = row.get("NodeNum") + if node_num is None: + logger.warning(f"Row missing NodeNum, skipping: {row.get(config['mysql_pk'])}") + continue + + # Track MySQL IDs for max calculation + mysql_id = row.get(config["mysql_pk"]) + if mysql_id: + mysql_ids.append(int(mysql_id)) + + # Create node key (e.g., "node_1", "node_2") + node_key = f"node_{node_num}" + + # Build measurements for this node + node_measurements = {} + val_columns = RAWDATACOR_COLUMNS["val_columns"] + unit_columns = RAWDATACOR_COLUMNS["unit_columns"] + + for idx, val_col in enumerate(val_columns): + unit_col = unit_columns[idx] + val = row.get(val_col) + unit = row.get(unit_col) + + # Only include non-NULL values + if val is not None: + # Get hex key (0, 1, 2, ..., F) + hex_key = val_col.replace("Val", "") + + measurement = {} + # Convert Decimal to float for JSON serialization + if isinstance(val, Decimal): + measurement["value"] = float(val) + else: + measurement["value"] = val + + if unit is not None and unit != "": + measurement["unit"] = unit + + node_measurements[hex_key] = measurement + + measurements[node_key] = node_measurements + + # Calculate max MySQL ID + mysql_max_id = max(mysql_ids) if mysql_ids else 0 + + # Combine EventDate + EventTime into event_timestamp + event_timestamp = _combine_datetime( + first_row.get("EventDate"), + first_row.get("EventTime") + ) + + # Extract year from event_timestamp for partition key + event_year = event_timestamp.year if event_timestamp else None + + # Calculate created_at (minimum from all rows) + # Note: RAWDATACOR does not have updated_at field + created_at_values = [row.get("created_at") for row in mysql_rows if row.get("created_at") is not None] + created_at = min(created_at_values) if created_at_values else None + + # Build PostgreSQL row + pg_row = { + "mysql_max_id": mysql_max_id, + "unit_name": first_row.get("UnitName"), + "tool_name_id": first_row.get("ToolNameID"), + "event_timestamp": event_timestamp, + "event_year": event_year, + "measurements": measurements, + "bat_level": first_row.get("BatLevel"), + "temperature": first_row.get("Temperature"), + "bat_level_module": first_row.get("BatLevel_module"), + "temperature_module": first_row.get("Temperature_module"), + "rssi_module": first_row.get("RSSI_module"), + "created_at": created_at, + } + + return pg_row + + @staticmethod + def consolidate_elabdatadisp(mysql_rows: List[Dict[str, Any]]) -> Dict[str, Any]: + """Consolidate ELABDATADISP rows by NodeNum into single PostgreSQL row. + + Args: + mysql_rows: List of MySQL rows with same (UnitName, ToolNameID, EventDate, EventTime) + but different NodeNum + + Returns: + Single PostgreSQL row dict with consolidated measurements + + Example measurements structure: + { + "node_1": { + "shifts": {"x": 1.234, "y": 2.345, "z": 3.456, ...}, + "coordinates": {"x": 10.123, "y": 20.234, ...}, + "kinematics": {"speed": 1.111, ...}, + "sensors": {"t_node": 25.5, ...}, + "calculated": {"alfa_x": 0.123, ...} + }, + "node_2": { ... } + } + """ + if not mysql_rows: + raise ValueError("Cannot consolidate empty row list") + + # Get consolidation key from first row (all rows have same key) + first_row = mysql_rows[0] + config = TABLE_CONFIGS["ELABDATADISP"] + + # Build measurements JSONB by node + measurements = {} + mysql_ids = [] + + for row in mysql_rows: + node_num = row.get("NodeNum") + if node_num is None: + logger.warning(f"Row missing NodeNum, skipping: {row.get(config['mysql_pk'])}") + continue + + # Track MySQL IDs for max calculation + mysql_id = row.get(config["mysql_pk"]) + if mysql_id: + mysql_ids.append(int(mysql_id)) + + # Create node key (e.g., "node_1", "node_2") + node_key = f"node_{node_num}" + + # Build measurements for this node using field mapping + node_measurements = { + "shifts": {}, + "coordinates": {}, + "kinematics": {}, + "sensors": {}, + "calculated": {}, + } + + # Add state and calc_err if present + if "State" in row and row["State"] is not None: + node_measurements["state"] = row["State"] + # MySQL field is 'calcerr' (lowercase), not 'CalcErr' + if "calcerr" in row and row["calcerr"] is not None: + node_measurements["calc_err"] = row["calcerr"] + + # Map MySQL fields to JSONB structure + for mysql_field, (category, json_key) in ELABDATADISP_FIELD_MAPPING.items(): + value = row.get(mysql_field) + + # Only include non-NULL values + if value is not None: + # Convert Decimal to float for JSON serialization + if isinstance(value, Decimal): + value = float(value) + + node_measurements[category][json_key] = value + + # Remove empty categories + node_measurements = { + k: v for k, v in node_measurements.items() + if v and (not isinstance(v, dict) or len(v) > 0) + } + + measurements[node_key] = node_measurements + + # Calculate max MySQL ID + mysql_max_id = max(mysql_ids) if mysql_ids else 0 + + # Combine EventDate + EventTime into event_timestamp + event_timestamp = _combine_datetime( + first_row.get("EventDate"), + first_row.get("EventTime") + ) + + # Extract year from event_timestamp for partition key + event_year = event_timestamp.year if event_timestamp else None + + # Calculate created_at (minimum from all rows) and updated_at (maximum from all rows) + created_at_values = [row.get("created_at") for row in mysql_rows if row.get("created_at") is not None] + updated_at_values = [row.get("updated_at") for row in mysql_rows if row.get("updated_at") is not None] + + created_at = min(created_at_values) if created_at_values else None + updated_at = max(updated_at_values) if updated_at_values else None + + # Build PostgreSQL row + pg_row = { + "mysql_max_id": mysql_max_id, + "unit_name": first_row.get("UnitName"), + "tool_name_id": first_row.get("ToolNameID"), + "event_timestamp": event_timestamp, + "event_year": event_year, + "measurements": measurements, + "created_at": created_at, + "updated_at": updated_at, + } + + return pg_row + + @staticmethod + def consolidate(table: str, mysql_rows: List[Dict[str, Any]]) -> Dict[str, Any]: + """Consolidate rows for the specified table. + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + mysql_rows: List of MySQL rows to consolidate + + Returns: + Consolidated PostgreSQL row + + Raises: + ValueError: If table is unknown or rows are empty + """ + if not mysql_rows: + raise ValueError("Cannot consolidate empty row list") + + table_upper = table.upper() + + if table_upper == "RAWDATACOR": + return Consolidator.consolidate_rawdatacor(mysql_rows) + elif table_upper == "ELABDATADISP": + return Consolidator.consolidate_elabdatadisp(mysql_rows) + else: + raise ValueError(f"Unknown table: {table}") + + +def consolidate_rows(table: str, mysql_rows: List[Dict[str, Any]]) -> Dict[str, Any]: + """Convenience function to consolidate rows. + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + mysql_rows: List of MySQL rows with same consolidation key + + Returns: + Single consolidated PostgreSQL row + """ + return Consolidator.consolidate(table, mysql_rows) diff --git a/src/migrator/full_migrator.py b/src/migrator/full_migrator.py new file mode 100644 index 0000000..d6628cb --- /dev/null +++ b/src/migrator/full_migrator.py @@ -0,0 +1,240 @@ +"""Full migration orchestrator for MySQL to PostgreSQL. + +Coordinates migration of all partitions or a specific partition. +""" +from typing import Optional, List + +from config import get_settings, TABLE_CONFIGS +from src.connectors.mysql_connector import MySQLConnector +from src.connectors.postgres_connector import PostgreSQLConnector +from src.migrator.partition_migrator import PartitionMigrator +from src.migrator.state_manager import StateManager +from src.utils.logger import get_logger + +logger = get_logger(__name__) + + +class FullMigrator: + """Orchestrate full migration of all partitions.""" + + def __init__(self, table: str): + """Initialize full migrator. + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + """ + if table.upper() not in ("RAWDATACOR", "ELABDATADISP"): + raise ValueError(f"Unknown table: {table}") + + self.table = table.upper() + self.config = TABLE_CONFIGS[self.table] + self.settings = get_settings() + self.partition_migrator = PartitionMigrator(table) + + def migrate( + self, + partition: Optional[str] = None, + dry_run: bool = False, + resume: bool = False + ) -> int: + """Perform full migration. + + Args: + partition: If specified, migrate only this partition (for testing) + dry_run: If True, log what would be done without modifying data + resume: If True, resume from last checkpoint + + Returns: + Total number of PostgreSQL rows migrated + """ + mysql_table = self.config["mysql_table"] + pg_table = self.config["postgres_table"] + + logger.info(f"Starting full migration of {mysql_table} -> {pg_table}") + + try: + with MySQLConnector() as mysql_conn: + with PostgreSQLConnector() as pg_conn: + # Check PostgreSQL table exists + if not pg_conn.table_exists(pg_table): + raise ValueError( + f"PostgreSQL table {pg_table} does not exist. " + "Run 'setup --create-schema' first." + ) + + # Initialize state manager (using _global for table-level state) + global_state_mgr = StateManager(pg_conn, pg_table, "_global") + + # Get partitions to migrate + if partition: + # Single partition mode + partitions = [partition] + # Check if this partition is in-progress and should be resumed + in_progress_partitions = global_state_mgr.get_in_progress_partitions() + should_resume = resume and partition in in_progress_partitions + logger.info(f"Single partition mode: {partition}" + + (f" (resuming from checkpoint)" if should_resume else "")) + else: + # Full migration: all partitions + all_partitions = mysql_conn.get_table_partitions(mysql_table) + logger.info(f"Found {len(all_partitions)} partitions: {all_partitions}") + + # Check resume - get list of completed and in-progress partitions + if resume: + completed_partitions = global_state_mgr.get_completed_partitions() + in_progress_partitions = global_state_mgr.get_in_progress_partitions() + + # Skip partitions already completed + partitions = [p for p in all_partitions if p not in completed_partitions] + + logger.info( + f"Resuming migration. Completed: {len(completed_partitions)}, " + f"In-progress: {len(in_progress_partitions)}, " + f"Remaining: {len(partitions)} partitions" + ) + + if in_progress_partitions: + logger.info(f"Will resume in-progress partitions: {in_progress_partitions}") + else: + partitions = all_partitions + in_progress_partitions = [] + + if dry_run: + logger.info( + f"[DRY RUN] Would migrate {len(partitions)} partition(s): " + f"{partitions}" + ) + return 0 + + # Migrate each partition + total_migrated = 0 + for idx, part_name in enumerate(partitions, 1): + # Determine if this partition should be resumed + # In single partition mode, use should_resume from above + # In full mode, check if partition is in-progress + if partition: + # Single partition mode - use the should_resume flag set earlier + do_resume = should_resume + else: + # Full migration mode - resume if partition is in-progress + do_resume = resume and part_name in in_progress_partitions + + resume_msg = " (resuming from checkpoint)" if do_resume else "" + logger.info( + f"[{idx}/{len(partitions)}] Migrating partition: {part_name}{resume_msg}" + ) + + try: + migrated = self.partition_migrator.migrate_partition( + mysql_conn, + pg_conn, + part_name, + dry_run=dry_run, + resume=do_resume + ) + total_migrated += migrated + logger.info( + f"✓ Partition {part_name} complete: " + f"{migrated} rows, total: {total_migrated}" + ) + + except Exception as e: + logger.error( + f"Failed to migrate partition {part_name}: {e}" + ) + raise + + # Get final row count from PostgreSQL + final_count = pg_conn.get_row_count(pg_table) + + # Only mark global migration complete if ALL partitions are completed + # Check if there are any in-progress or pending partitions remaining + if partition: + # Single partition mode - don't update global state + logger.info( + f"✓ Partition migration complete: {total_migrated:,} rows migrated in this run, " + f"{final_count:,} total rows in {pg_table}" + ) + else: + # Full migration mode - check if everything is really done + completed_partitions = global_state_mgr.get_completed_partitions() + in_progress_check = global_state_mgr.get_in_progress_partitions() + all_partitions_check = mysql_conn.get_table_partitions(mysql_table) + + if len(completed_partitions) == len(all_partitions_check) and len(in_progress_check) == 0: + # All partitions are completed - mark global as complete + # Get the maximum last_key across all partitions for incremental migration + max_last_key = global_state_mgr.get_max_last_key_across_partitions() + if max_last_key: + logger.info( + f"Setting global last_key: " + f"({max_last_key.get('unit_name')}, {max_last_key.get('tool_name_id')}, " + f"{max_last_key.get('event_date')}, {max_last_key.get('event_time')})" + ) + global_state_mgr.update_state(last_key=max_last_key) + + global_state_mgr.mark_completed(final_count) + logger.info( + f"✓ Full migration complete: {total_migrated:,} rows migrated in this run, " + f"{final_count:,} total rows in {pg_table}" + ) + else: + logger.info( + f"✓ Migration batch complete: {total_migrated:,} rows migrated in this run, " + f"{final_count:,} total rows in {pg_table}. " + f"{len(completed_partitions)}/{len(all_partitions_check)} partitions completed." + ) + + return total_migrated + + except Exception as e: + logger.error(f"Full migration failed: {e}") + raise + + def _get_remaining_partitions( + self, + all_partitions: List[str], + last_completed: str + ) -> List[str]: + """Get list of partitions that still need to be migrated. + + Args: + all_partitions: All partition names from MySQL + last_completed: Last partition that was completed + + Returns: + List of partition names to migrate + """ + try: + # Find index of last completed partition + idx = all_partitions.index(last_completed) + # Return partitions after it + return all_partitions[idx + 1:] + except ValueError: + # last_completed not in list, return all + logger.warning( + f"Last completed partition {last_completed} not found in partition list. " + "Starting from beginning." + ) + return all_partitions + + +def run_full_migration( + table: str, + partition: Optional[str] = None, + dry_run: bool = False, + resume: bool = False +) -> int: + """Run full migration for a table. + + Args: + table: Table name to migrate (RAWDATACOR or ELABDATADISP) + partition: If specified, migrate only this partition + dry_run: If True, show what would be done without modifying data + resume: If True, resume from last checkpoint + + Returns: + Number of rows migrated + """ + migrator = FullMigrator(table) + return migrator.migrate(partition=partition, dry_run=dry_run, resume=resume) diff --git a/src/migrator/incremental_migrator.py b/src/migrator/incremental_migrator.py new file mode 100644 index 0000000..b3085d4 --- /dev/null +++ b/src/migrator/incremental_migrator.py @@ -0,0 +1,324 @@ +"""Incremental migration from MySQL to PostgreSQL. + +Migrates only new data since last full/incremental migration based on consolidation keys. +""" +from typing import Optional + +from config import get_settings, TABLE_CONFIGS +from src.connectors.mysql_connector import MySQLConnector +from src.connectors.postgres_connector import PostgreSQLConnector +from src.migrator.consolidator import consolidate_rows +from src.migrator.state_manager import StateManager +from src.utils.logger import get_logger +from src.utils.progress import ProgressTracker + +logger = get_logger(__name__) + + +class IncrementalMigrator: + """Perform incremental migration based on consolidation keys.""" + + def __init__(self, table: str): + """Initialize incremental migrator. + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + """ + if table.upper() not in ("RAWDATACOR", "ELABDATADISP"): + raise ValueError(f"Unknown table: {table}") + + self.table = table.upper() + self.config = TABLE_CONFIGS[self.table] + self.settings = get_settings() + + def migrate(self, dry_run: bool = False) -> int: + """Perform incremental migration since last migration. + + Migrates only consolidation keys that come AFTER the last migrated key. + + Args: + dry_run: If True, log what would be done without modifying data + + Returns: + Number of PostgreSQL rows migrated + """ + mysql_table = self.config["mysql_table"] + pg_table = self.config["postgres_table"] + + logger.info(f"Starting incremental migration of {mysql_table} -> {pg_table}") + + try: + with MySQLConnector() as mysql_conn: + with PostgreSQLConnector() as pg_conn: + # Check PostgreSQL table exists + if not pg_conn.table_exists(pg_table): + raise ValueError( + f"PostgreSQL table {pg_table} does not exist. " + "Run 'setup --create-schema' first." + ) + + # Initialize state manager + state_mgr = StateManager(pg_conn, pg_table) + + # Get last migrated key from migration_state + # This was saved during the last full/incremental migration + last_key = state_mgr.get_last_key() + + if last_key is None: + logger.info( + f"No previous migration found for {pg_table}. " + "Run 'migrate full' for initial migration." + ) + return 0 + + logger.info( + f"Last migrated key (from migration_state): " + f"({last_key.get('unit_name')}, {last_key.get('tool_name_id')}, " + f"{last_key.get('event_date')}, {last_key.get('event_time')})" + ) + + # Get max MySQL ID already migrated to optimize query performance + cursor = pg_conn.connection.cursor() + cursor.execute(f"SELECT MAX(mysql_max_id) FROM {pg_table}") + result = cursor.fetchone() + max_mysql_id = result[0] if result and result[0] else 0 + + logger.info(f"Max MySQL ID already migrated: {max_mysql_id}") + + if dry_run: + # In dry-run, check how many new keys exist in MySQL + logger.info("[DRY RUN] Checking for new keys in MySQL...") + + # Sample first 100 keys to check if there are new records + sample_keys = mysql_conn.fetch_consolidation_keys_after( + mysql_table, + after_key=last_key, + min_mysql_id=max_mysql_id, + limit=100, + offset=0 + ) + + if sample_keys: + # If we found 100 keys in the sample, there might be many more + # Try to get a rough count by checking larger offsets + if len(sample_keys) == 100: + # There are at least 100 keys, check if there are more + logger.info( + f"[DRY RUN] Found at least 100 new keys, checking total count..." + ) + # Sample at different offsets to estimate total + test_batch = mysql_conn.fetch_consolidation_keys_after( + mysql_table, + after_key=last_key, + min_mysql_id=max_mysql_id, + limit=1, + offset=1000 + ) + if test_batch: + logger.info(f"[DRY RUN] Estimated: More than 1000 new keys to migrate") + else: + logger.info(f"[DRY RUN] Estimated: Between 100-1000 new keys to migrate") + else: + logger.info(f"[DRY RUN] Found {len(sample_keys)} new keys to migrate") + + logger.info("[DRY RUN] First 3 keys:") + for i, key in enumerate(sample_keys[:3]): + logger.info( + f" {i+1}. ({key.get('UnitName')}, {key.get('ToolNameID')}, " + f"{key.get('EventDate')}, {key.get('EventTime')})" + ) + logger.info( + f"[DRY RUN] Run without --dry-run to perform actual migration" + ) + # Return a positive number to indicate there's data to migrate + return len(sample_keys) + else: + logger.info("[DRY RUN] No new keys found - database is up to date") + return 0 + + # Migrate new keys + migrated_rows = 0 + offset = 0 + insert_buffer = [] + buffer_size = self.settings.migration.consolidation_group_limit // 10 + + with ProgressTracker( + total=None, # Unknown total + description=f"Incremental migration of {mysql_table}" + ) as progress: + + # Get column order for PostgreSQL insert + pg_columns = self._get_pg_columns() + + while True: + # Fetch batch of consolidation keys AFTER last_key + logger.debug(f"Fetching keys after last_key with offset={offset}") + keys = mysql_conn.fetch_consolidation_keys_after( + mysql_table, + after_key=last_key, + min_mysql_id=max_mysql_id, + limit=self.settings.migration.consolidation_group_limit, + offset=offset + ) + + if not keys: + logger.info("No more new keys to migrate") + break + + logger.info(f"Processing {len(keys)} new keys (offset={offset})") + + # Process each consolidation key + keys_processed = 0 + for key in keys: + keys_processed += 1 + # Log progress every 1000 keys + if keys_processed % 1000 == 0: + logger.info(f" Processed {keys_processed}/{len(keys)} keys in this batch...") + unit_name = key.get("UnitName") + tool_name_id = key.get("ToolNameID") + event_date = key.get("EventDate") + event_time = key.get("EventTime") + + # Fetch all MySQL rows for this key (all nodes, all partitions) + mysql_rows = mysql_conn.fetch_records_for_key_all_partitions( + mysql_table, + unit_name, + tool_name_id, + event_date, + event_time + ) + + if not mysql_rows: + logger.warning( + f"No records found for key: " + f"({unit_name}, {tool_name_id}, {event_date}, {event_time})" + ) + continue + + # Consolidate into single PostgreSQL row + try: + pg_row = consolidate_rows(self.table, mysql_rows) + except Exception as e: + logger.error( + f"Failed to consolidate key " + f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}" + ) + continue + + # Add to insert buffer + insert_buffer.append(pg_row) + + # Flush buffer when full + if len(insert_buffer) >= buffer_size: + # Use COPY with ON CONFLICT to handle duplicates + inserted = pg_conn.copy_from_with_conflict( + pg_table, + insert_buffer, + pg_columns, + conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"] + ) + migrated_rows += inserted + progress.update(inserted) + + # Update state with last key + last_processed_key = { + "unit_name": unit_name, + "tool_name_id": tool_name_id, + "event_date": str(event_date) if event_date else None, + "event_time": str(event_time) if event_time else None, + } + state_mgr.update_state( + last_key=last_processed_key, + total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows + ) + + logger.debug( + f"Flushed {inserted} rows, total new: {migrated_rows}" + ) + insert_buffer = [] + + # Move to next batch of keys + offset += len(keys) + + # If we got fewer keys than requested, we're done + if len(keys) < self.settings.migration.consolidation_group_limit: + break + + # Flush remaining buffer + if insert_buffer: + # Use COPY with ON CONFLICT to handle duplicates + inserted = pg_conn.copy_from_with_conflict( + pg_table, + insert_buffer, + pg_columns, + conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"] + ) + migrated_rows += inserted + progress.update(inserted) + logger.debug(f"Final flush: {inserted} rows") + + # Get final row count + final_count = pg_conn.get_row_count(pg_table) + logger.info(f"Total PostgreSQL rows: {final_count}") + + logger.info( + f"✓ Incremental migration complete: " + f"{migrated_rows} new rows migrated to {pg_table}" + ) + + return migrated_rows + + except Exception as e: + logger.error(f"Incremental migration failed: {e}") + raise + + def _get_pg_columns(self) -> list[str]: + """Get PostgreSQL column names in insertion order. + + Returns: + List of column names + """ + # Base columns for both tables + columns = [ + "mysql_max_id", + "unit_name", + "tool_name_id", + "event_timestamp", + "event_year", + "measurements", + ] + + # Add table-specific columns + if self.table == "RAWDATACOR": + columns.extend([ + "bat_level", + "temperature", + "bat_level_module", + "temperature_module", + "rssi_module", + "created_at", + ]) + elif self.table == "ELABDATADISP": + columns.extend([ + "created_at", + "updated_at", + ]) + + return columns + + +def run_incremental_migration( + table: str, + dry_run: bool = False +) -> int: + """Run incremental migration for a table. + + Args: + table: Table name to migrate (RAWDATACOR or ELABDATADISP) + dry_run: If True, show what would be done without modifying data + + Returns: + Number of rows migrated + """ + migrator = IncrementalMigrator(table) + return migrator.migrate(dry_run=dry_run) diff --git a/src/migrator/parallel_migrator.py b/src/migrator/parallel_migrator.py new file mode 100644 index 0000000..05af7f3 --- /dev/null +++ b/src/migrator/parallel_migrator.py @@ -0,0 +1,220 @@ +"""Parallel migration orchestrator for MySQL to PostgreSQL. + +Runs multiple partition migrations in parallel using multiprocessing. +Each process migrates a different partition independently. +""" +import multiprocessing as mp +from typing import Optional, List +import sys + +from config import get_settings, TABLE_CONFIGS +from src.connectors.mysql_connector import MySQLConnector +from src.connectors.postgres_connector import PostgreSQLConnector +from src.migrator.partition_migrator import PartitionMigrator +from src.migrator.state_manager import StateManager +from src.utils.logger import get_logger + +logger = get_logger(__name__) + + +def migrate_partition_worker(table: str, partition_name: str, dry_run: bool = False, resume: bool = False) -> tuple[str, int, bool]: + """Worker function to migrate a single partition. + + Runs in a separate process. + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + partition_name: Partition to migrate + dry_run: If True, simulate without writing + resume: If True, resume from last checkpoint if exists + + Returns: + Tuple of (partition_name, rows_migrated, success) + """ + # Configure logging for this worker process (multiprocessing requires per-process setup) + # Set up root logger so all child loggers (PartitionMigrator, ProgressTracker, etc.) work + from src.utils.logger import setup_logger + setup_logger("") # Configure root logger for this process + + # Now get a logger for this worker + from src.utils.logger import get_logger + worker_logger = get_logger(f"worker.{partition_name}") + + try: + resume_msg = " (resuming)" if resume else "" + worker_logger.info(f"Worker starting: {partition_name}{resume_msg}") + + with MySQLConnector() as mysql_conn: + with PostgreSQLConnector() as pg_conn: + migrator = PartitionMigrator(table) + rows = migrator.migrate_partition( + mysql_conn, + pg_conn, + partition_name, + dry_run=dry_run, + resume=resume + ) + + worker_logger.info(f"Worker completed: {partition_name} ({rows} rows)") + return (partition_name, rows, True) + + except Exception as e: + worker_logger.error(f"Worker failed for {partition_name}: {e}") + return (partition_name, 0, False) + + +class ParallelMigrator: + """Orchestrate parallel migration of multiple partitions.""" + + def __init__(self, table: str): + """Initialize parallel migrator. + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + """ + if table.upper() not in ("RAWDATACOR", "ELABDATADISP"): + raise ValueError(f"Unknown table: {table}") + + self.table = table.upper() + self.config = TABLE_CONFIGS[self.table] + self.settings = get_settings() + + def migrate( + self, + num_workers: int = 5, + dry_run: bool = False, + resume: bool = False + ) -> int: + """Perform parallel migration of all partitions. + + Args: + num_workers: Number of parallel workers + dry_run: If True, log what would be done without modifying data + resume: If True, skip already completed partitions + + Returns: + Total number of PostgreSQL rows migrated + """ + mysql_table = self.config["mysql_table"] + pg_table = self.config["postgres_table"] + + logger.info( + f"Starting parallel migration of {mysql_table} -> {pg_table} " + f"with {num_workers} workers" + ) + + try: + with MySQLConnector() as mysql_conn: + with PostgreSQLConnector() as pg_conn: + # Check PostgreSQL table exists + if not pg_conn.table_exists(pg_table): + raise ValueError( + f"PostgreSQL table {pg_table} does not exist. " + "Run 'setup --create-schema' first." + ) + + # Get all partitions + all_partitions = mysql_conn.get_table_partitions(mysql_table) + logger.info(f"Found {len(all_partitions)} partitions: {all_partitions}") + + # Filter out completed partitions if resuming + partitions_to_migrate = all_partitions + if resume: + state_mgr = StateManager(pg_conn, pg_table, "_global") + completed = state_mgr.get_completed_partitions() + if completed: + partitions_to_migrate = [p for p in all_partitions if p not in completed] + logger.info( + f"Resuming: {len(completed)} partitions already completed, " + f"{len(partitions_to_migrate)} remaining" + ) + + if not partitions_to_migrate: + logger.info("All partitions already migrated") + return 0 + + if dry_run: + logger.info( + f"[DRY RUN] Would migrate {len(partitions_to_migrate)} partitions " + f"in parallel with {num_workers} workers" + ) + return 0 + + # Run migrations in parallel using multiprocessing pool + logger.info(f"Launching {num_workers} worker processes...") + + with mp.Pool(processes=num_workers) as pool: + # Create tasks for each partition + tasks = [ + (self.table, partition_name, dry_run, resume) + for partition_name in partitions_to_migrate + ] + + # Run migrations in parallel + results = pool.starmap(migrate_partition_worker, tasks) + + # Collect results + total_migrated = 0 + failed_partitions = [] + + for partition_name, rows, success in results: + if success: + total_migrated += rows + logger.info(f"✓ {partition_name}: {rows} rows") + else: + failed_partitions.append(partition_name) + logger.error(f"✗ {partition_name}: FAILED") + + if failed_partitions: + raise Exception( + f"Migration failed for {len(failed_partitions)} partitions: " + f"{', '.join(failed_partitions)}" + ) + + # Only mark global migration complete if ALL partitions are completed + with PostgreSQLConnector() as pg_conn: + final_count = pg_conn.get_row_count(pg_table) + global_state_mgr = StateManager(pg_conn, pg_table, "_global") + + # Verify all partitions are actually completed + completed = global_state_mgr.get_completed_partitions() + in_progress = global_state_mgr.get_in_progress_partitions() + + if len(in_progress) == 0: + # All partitions completed successfully + global_state_mgr.mark_completed(final_count) + logger.info( + f"✓ Parallel migration complete: {final_count} total rows in {pg_table}" + ) + else: + logger.warning( + f"Migration finished but {len(in_progress)} partitions still in-progress: {in_progress}. " + f"Not marking global as completed." + ) + + return final_count + + except Exception as e: + logger.error(f"Parallel migration failed: {e}") + raise + + +def run_parallel_migration( + table: str, + num_workers: int = 5, + dry_run: bool = False, + resume: bool = False +) -> int: + """Run parallel migration for a table. + + Args: + table: Table name to migrate (RAWDATACOR or ELABDATADISP) + num_workers: Number of parallel workers + dry_run: If True, show what would be done without modifying data + resume: If True, skip already completed partitions + + Returns: + Number of rows migrated + """ + migrator = ParallelMigrator(table) + return migrator.migrate(num_workers=num_workers, dry_run=dry_run, resume=resume) diff --git a/src/migrator/partition_migrator.py b/src/migrator/partition_migrator.py new file mode 100644 index 0000000..e5894c9 --- /dev/null +++ b/src/migrator/partition_migrator.py @@ -0,0 +1,355 @@ +"""Partition-based migration for MySQL to PostgreSQL. + +Streaming Strategy: +1. Stream all rows from partition ordered by consolidation key (single query) +2. Group rows by consolidation key as they arrive +3. Validate and consolidate each group +4. Batch insert using PostgreSQL COPY (10-100x faster than INSERT) +5. Log invalid keys to error file +6. Update migration state periodically with resume support +""" +from typing import Optional, List, Dict, Any +from datetime import datetime + +from config import get_settings, TABLE_CONFIGS +from src.connectors.mysql_connector import MySQLConnector +from src.connectors.postgres_connector import PostgreSQLConnector +from src.migrator.consolidator import consolidate_rows +from src.migrator.state_manager import StateManager +from src.utils.logger import get_logger +from src.utils.progress import ProgressTracker +from src.utils.validation import validate_consolidation_key, ErrorLogger + +logger = get_logger(__name__) + + +class PartitionMigrator: + """Migrate a single MySQL partition to PostgreSQL with consolidation.""" + + def __init__(self, table: str): + """Initialize partition migrator. + + Args: + table: Table name (RAWDATACOR or ELABDATADISP) + """ + if table.upper() not in ("RAWDATACOR", "ELABDATADISP"): + raise ValueError(f"Unknown table: {table}") + + self.table = table.upper() + self.config = TABLE_CONFIGS[self.table] + self.settings = get_settings() + + def migrate_partition_streaming( + self, + mysql_conn: MySQLConnector, + pg_conn: PostgreSQLConnector, + partition_name: str, + dry_run: bool = False, + resume: bool = False + ) -> int: + """Migrate a partition using streaming (OPTIMIZED - eliminates N+1 queries). + + This method streams all rows from the partition in a single query, + groups them by consolidation key, and uses PostgreSQL COPY for fast inserts. + + Performance improvement: 10-100x faster than old N+1 query approach. + + Args: + mysql_conn: MySQL connector (already connected) + pg_conn: PostgreSQL connector (already connected) + partition_name: MySQL partition name (e.g., 'p2024') + dry_run: If True, log what would be done without modifying data + resume: If True, resume from last checkpoint + + Returns: + Number of PostgreSQL rows inserted (consolidated) + """ + mysql_table = self.config["mysql_table"] + pg_table = self.config["postgres_table"] + + logger.info(f"Starting STREAMING migration of partition {partition_name} from {mysql_table}") + + # Initialize state manager for this partition + state_mgr = StateManager(pg_conn, pg_table, partition_name) + + # Initialize error logger + error_logger = ErrorLogger(pg_table, partition_name) + + # Check resume state + resume_from_key = None + start_key_tuple = None + if resume: + last_key = state_mgr.get_last_key() + if last_key: + resume_from_key = last_key + # Convert to tuple for MySQL query + start_key_tuple = ( + last_key.get("unit_name"), + last_key.get("tool_name_id"), + last_key.get("event_date"), + last_key.get("event_time") + ) + logger.info( + f"Resuming AFTER last key: " + f"({start_key_tuple[0]}, {start_key_tuple[1]}, " + f"{start_key_tuple[2]}, {start_key_tuple[3]})" + ) + + if dry_run: + logger.info(f"[DRY RUN] Would stream partition {partition_name}") + return 0 + + # Track migration + state_mgr.mark_in_progress() + migrated_rows = 0 + insert_buffer = [] + buffer_size = 10000 # Larger buffer for COPY performance + + # Get column order for PostgreSQL insert + pg_columns = self._get_pg_columns() + + # Group rows by consolidation key + current_group: List[Dict[str, Any]] = [] + current_key = None + rows_processed = 0 + + migration_completed = False + try: + with ProgressTracker( + total=None, # Unknown total + description=f"Streaming {mysql_table} partition {partition_name}" + ) as progress: + + # Use fetch_consolidation_groups_from_partition with start_key for efficient resume + # MySQL will skip all keys <= start_key using WHERE clause (no unnecessary data transfer) + for group in mysql_conn.fetch_consolidation_groups_from_partition( + mysql_table, + partition_name, + limit=self.settings.migration.consolidation_group_limit, + offset=0, + start_key=start_key_tuple # Resume AFTER this key + ): + rows_processed += len(group) + + # Extract consolidation key from first row + if not group: + continue + + first_row = group[0] + key = ( + first_row.get("UnitName"), + first_row.get("ToolNameID"), + first_row.get("EventDate"), + first_row.get("EventTime") + ) + + # No need to skip here anymore - MySQL query already handles resume + # (keys are filtered in the database with WHERE clause) + + # Validate and process group + self._process_group( + group, + key, + insert_buffer, + buffer_size, + pg_conn, + pg_table, + pg_columns, + state_mgr, + error_logger, + progress + ) + + # Update migrated count + if len(insert_buffer) == 0: # Just flushed + migrated_rows = state_mgr.get_state().get("total_rows_migrated", migrated_rows) + + # Flush remaining buffer + if insert_buffer: + inserted = pg_conn.copy_from_with_conflict( + pg_table, + insert_buffer, + pg_columns, + conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"] + ) + migrated_rows += inserted + progress.update(inserted) + logger.debug(f"Final flush: {inserted} rows") + + # If we reach here, migration completed successfully + migration_completed = True + + finally: + # Close error logger + error_logger.close() + + # Only mark as completed if migration actually finished + if migration_completed: + state_mgr.update_state( + total_rows_migrated=migrated_rows, + status="completed", + mark_completed=True + ) + else: + # Migration was interrupted - save progress but keep status as in_progress + logger.warning(f"Migration of partition {partition_name} was interrupted") + state_mgr.update_state( + total_rows_migrated=migrated_rows, + status="in_progress" + ) + + logger.info( + f"✓ Partition {partition_name} STREAMING migration complete: " + f"{migrated_rows} PostgreSQL rows inserted, " + f"{rows_processed} MySQL rows processed, " + f"{error_logger.get_error_count()} invalid keys skipped" + ) + + return migrated_rows + + def _process_group( + self, + mysql_rows: List[Dict[str, Any]], + key: tuple, + insert_buffer: List[Dict[str, Any]], + buffer_size: int, + pg_conn: PostgreSQLConnector, + pg_table: str, + pg_columns: List[str], + state_mgr: StateManager, + error_logger: ErrorLogger, + progress: ProgressTracker + ) -> None: + """Process a consolidation group: validate, consolidate, and buffer. + + Args: + mysql_rows: List of MySQL rows with same consolidation key + key: Consolidation key tuple (unit_name, tool_name_id, event_date, event_time) + insert_buffer: Buffer to add consolidated row to + buffer_size: Max buffer size before flush + pg_conn: PostgreSQL connector + pg_table: PostgreSQL table name + pg_columns: Column names for insert + state_mgr: State manager + error_logger: Error logger + progress: Progress tracker + """ + unit_name, tool_name_id, event_date, event_time = key + + # Validate consolidation key + is_valid, error_reason = validate_consolidation_key( + unit_name, tool_name_id, event_date, event_time + ) + + if not is_valid: + # Log invalid key and skip + error_logger.log_invalid_key( + unit_name, tool_name_id, event_date, event_time, error_reason + ) + return + + # Consolidate into single PostgreSQL row + try: + pg_row = consolidate_rows(self.table, mysql_rows) + except Exception as e: + logger.error( + f"Failed to consolidate key " + f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}" + ) + error_logger.log_invalid_key( + unit_name, tool_name_id, event_date, event_time, + f"Consolidation failed: {e}" + ) + return + + # Add to insert buffer + insert_buffer.append(pg_row) + + # Flush buffer when full + if len(insert_buffer) >= buffer_size: + # Use COPY with ON CONFLICT to handle UNIQUE constraint + # (unit_name, tool_name_id, event_timestamp, event_year) + inserted = pg_conn.copy_from_with_conflict( + pg_table, + insert_buffer, + pg_columns, + conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"] + ) + progress.update(inserted) + + # Update state with last key + last_processed_key = { + "unit_name": unit_name, + "tool_name_id": tool_name_id, + "event_date": str(event_date) if event_date else None, + "event_time": str(event_time) if event_time else None, + } + current_total = state_mgr.get_state().get("total_rows_migrated", 0) + state_mgr.update_state( + last_key=last_processed_key, + total_rows_migrated=current_total + inserted + ) + + logger.debug( + f"Flushed {inserted} rows using COPY, " + f"total migrated: {current_total + inserted}" + ) + insert_buffer.clear() + + def migrate_partition( + self, + mysql_conn: MySQLConnector, + pg_conn: PostgreSQLConnector, + partition_name: str, + dry_run: bool = False, + resume: bool = False + ) -> int: + """Migrate a single partition from MySQL to PostgreSQL using streaming. + + Args: + mysql_conn: MySQL connector (already connected) + pg_conn: PostgreSQL connector (already connected) + partition_name: MySQL partition name (e.g., 'p2024') + dry_run: If True, log what would be done without modifying data + resume: If True, resume from last checkpoint + + Returns: + Number of PostgreSQL rows inserted (consolidated) + """ + return self.migrate_partition_streaming( + mysql_conn, pg_conn, partition_name, dry_run, resume + ) + + def _get_pg_columns(self) -> list[str]: + """Get PostgreSQL column names in insertion order. + + Returns: + List of column names + """ + # Base columns for both tables + columns = [ + "mysql_max_id", + "unit_name", + "tool_name_id", + "event_timestamp", + "event_year", + "measurements", + ] + + # Add table-specific columns + if self.table == "RAWDATACOR": + columns.extend([ + "bat_level", + "temperature", + "bat_level_module", + "temperature_module", + "rssi_module", + "created_at", + ]) + elif self.table == "ELABDATADISP": + columns.extend([ + "created_at", + "updated_at", + ]) + + return columns diff --git a/src/migrator/state_manager.py b/src/migrator/state_manager.py new file mode 100644 index 0000000..90fd4d6 --- /dev/null +++ b/src/migrator/state_manager.py @@ -0,0 +1,347 @@ +"""Migration state management using PostgreSQL migration_state table. + +Tracks migration progress with: +- last_key: Last consolidation key migrated (UnitName, ToolNameID, EventDate, EventTime) +- last_completed_partition: Last partition that was fully migrated +- total_rows_migrated: Count of PostgreSQL rows (consolidated) +- status: pending, in_progress, completed +""" +from typing import Optional, Dict, Any +from datetime import datetime +import json + +from src.connectors.postgres_connector import PostgreSQLConnector +from src.utils.logger import get_logger + +logger = get_logger(__name__) + + +class StateManager: + """Manage migration state in PostgreSQL migration_state table. + + Supports per-partition state tracking for parallel migration. + """ + + def __init__(self, pg_conn: PostgreSQLConnector, table_name: str, partition_name: str = "_global"): + """Initialize state manager. + + Args: + pg_conn: PostgreSQL connector + table_name: PostgreSQL table name (rawdatacor or elabdatadisp) + partition_name: MySQL partition name (e.g., 'd0', 'part0') or '_global' for global state + """ + self.pg_conn = pg_conn + self.table_name = table_name + self.partition_name = partition_name + + def get_last_key(self) -> Optional[Dict[str, Any]]: + """Get last consolidation key that was migrated for this partition. + + Returns: + Dict with keys: unit_name, tool_name_id, event_date, event_time + Or None if no previous migration + """ + try: + with self.pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT last_key FROM migration_state WHERE table_name = %s AND partition_name = %s", + (self.table_name, self.partition_name) + ) + result = cursor.fetchone() + if result and result[0]: + # last_key is stored as JSONB, psycopg returns it as dict + return result[0] + return None + except Exception as e: + logger.debug(f"Could not get last_key from migration_state: {e}") + return None + + def get_total_rows_migrated(self) -> int: + """Get total rows migrated for this partition. + + Returns: + Count of PostgreSQL rows (consolidated) + """ + try: + with self.pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT total_rows_migrated FROM migration_state WHERE table_name = %s AND partition_name = %s", + (self.table_name, self.partition_name) + ) + result = cursor.fetchone() + return result[0] if result and result[0] else 0 + except Exception as e: + logger.debug(f"Could not get total_rows_migrated from migration_state: {e}") + return 0 + + def get_status(self) -> str: + """Get migration status for this partition. + + Returns: + Status string: pending, in_progress, or completed + """ + try: + with self.pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT status FROM migration_state WHERE table_name = %s AND partition_name = %s", + (self.table_name, self.partition_name) + ) + result = cursor.fetchone() + return result[0] if result and result[0] else "pending" + except Exception as e: + logger.debug(f"Could not get status from migration_state: {e}") + return "pending" + + def get_state(self) -> Dict[str, Any]: + """Get complete migration state for this partition. + + Returns: + Dict with keys: last_key, total_rows_migrated, status, migration_completed_at + """ + try: + with self.pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT last_key, total_rows_migrated, status, migration_completed_at " + "FROM migration_state WHERE table_name = %s AND partition_name = %s", + (self.table_name, self.partition_name) + ) + result = cursor.fetchone() + if result: + return { + "last_key": result[0], + "total_rows_migrated": result[1] if result[1] else 0, + "status": result[2] if result[2] else "pending", + "migration_completed_at": result[3] + } + return { + "last_key": None, + "total_rows_migrated": 0, + "status": "pending", + "migration_completed_at": None + } + except Exception as e: + logger.debug(f"Could not get state from migration_state: {e}") + return { + "last_key": None, + "total_rows_migrated": 0, + "status": "pending", + "migration_completed_at": None + } + + def get_completed_partitions(self) -> list[str]: + """Get list of all completed partitions for this table. + + Returns: + List of partition names that have been completed + """ + try: + with self.pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT partition_name FROM migration_state WHERE table_name = %s AND status = 'completed'", + (self.table_name,) + ) + results = cursor.fetchall() + return [row[0] for row in results if row[0] != "_global"] + except Exception as e: + logger.debug(f"Could not get completed partitions from migration_state: {e}") + return [] + + def get_in_progress_partitions(self) -> list[str]: + """Get list of all in-progress partitions for this table. + + Returns: + List of partition names that are currently in progress + """ + try: + with self.pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT partition_name FROM migration_state WHERE table_name = %s AND status = 'in_progress'", + (self.table_name,) + ) + results = cursor.fetchall() + return [row[0] for row in results if row[0] != "_global"] + except Exception as e: + logger.debug(f"Could not get in-progress partitions from migration_state: {e}") + return [] + + def update_state( + self, + last_key: Optional[Dict[str, Any]] = None, + total_rows_migrated: Optional[int] = None, + status: Optional[str] = None, + mark_completed: bool = False + ) -> None: + """Update migration state for this partition. + + Args: + last_key: Last consolidation key migrated + total_rows_migrated: Total PostgreSQL rows migrated for this partition + status: Migration status (pending, in_progress, completed) + mark_completed: If True, mark migration as completed with timestamp + """ + try: + with self.pg_conn.connection.cursor() as cursor: + # Build dynamic UPDATE based on provided parameters + updates = [] + params = [] + + if last_key is not None: + updates.append("last_key = %s::jsonb") + params.append(json.dumps(last_key)) + + if total_rows_migrated is not None: + updates.append("total_rows_migrated = %s") + params.append(total_rows_migrated) + + if status is not None: + updates.append("status = %s") + params.append(status) + + if mark_completed: + updates.append("migration_completed_at = %s") + params.append(datetime.utcnow()) + if status is None: + updates.append("status = 'completed'") + + if not updates: + logger.warning("update_state called with no parameters to update") + return + + # Upsert: INSERT or UPDATE + query = f""" + INSERT INTO migration_state (table_name, partition_name, {', '.join([u.split('=')[0].strip() for u in updates])}) + VALUES (%s, %s, {', '.join(['%s'] * len(updates))}) + ON CONFLICT (table_name, partition_name) DO UPDATE SET + {', '.join(updates)} + """ + + all_params = [self.table_name, self.partition_name] + params + params + cursor.execute(query, tuple(all_params)) + self.pg_conn.connection.commit() + + logger.debug( + f"Migration state updated for {self.table_name}/{self.partition_name}: " + f"last_key={last_key}, total={total_rows_migrated}, status={status}" + ) + + except Exception as e: + logger.error(f"Failed to update migration state: {e}") + self.pg_conn.connection.rollback() + raise + + def reset_state(self) -> None: + """Reset migration state for this partition. + + Deletes the state record for this specific partition. + """ + try: + with self.pg_conn.connection.cursor() as cursor: + cursor.execute( + "DELETE FROM migration_state WHERE table_name = %s AND partition_name = %s", + (self.table_name, self.partition_name) + ) + self.pg_conn.connection.commit() + logger.info(f"Migration state reset for {self.table_name}/{self.partition_name}") + except Exception as e: + logger.error(f"Failed to reset migration state: {e}") + self.pg_conn.connection.rollback() + raise + + def mark_in_progress(self) -> None: + """Mark migration as in progress.""" + self.update_state(status="in_progress") + + def mark_completed(self, total_rows: int) -> None: + """Mark migration as completed. + + Args: + total_rows: Final count of migrated rows + """ + self.update_state( + total_rows_migrated=total_rows, + status="completed", + mark_completed=True + ) + + def get_max_last_key_across_partitions(self) -> Optional[Dict[str, Any]]: + """Get the maximum (latest) last_key across all partitions for this table. + + Used when completing full migration to set the global last_key for incremental migration. + + The consolidation key ordering is: (UnitName, ToolNameID, EventDate, EventTime) + To find the maximum, we order by these fields in DESC order. + + Returns: + The latest last_key across all partitions, or None if no partitions have keys + """ + try: + with self.pg_conn.connection.cursor() as cursor: + # Order by the full consolidation key (UnitName, ToolNameID, EventDate, EventTime) DESC + # This matches the ordering used in MySQL fetch_consolidation_keys_after + cursor.execute( + """ + SELECT last_key + FROM migration_state + WHERE table_name = %s AND partition_name != '_global' AND last_key IS NOT NULL + ORDER BY + last_key->>'unit_name' DESC, + last_key->>'tool_name_id' DESC, + (last_key->>'event_date')::date DESC, + (last_key->>'event_time')::time DESC + LIMIT 1 + """, + (self.table_name,) + ) + result = cursor.fetchone() + if result and result[0]: + return result[0] + return None + except Exception as e: + logger.debug(f"Could not get max last_key across partitions: {e}") + return None + + def get_max_last_key_from_data(self) -> Optional[Dict[str, Any]]: + """Get the consolidation key of the most recent event in PostgreSQL. + + This queries the target table directly (not migration_state) to find the row with + the maximum event_timestamp (most recent event), then returns its consolidation key. + + We order by event_timestamp only (not the full consolidation key) because: + 1. Ordering by unit_name can pick up corrupted data (Java error strings) + 2. event_timestamp represents the actual chronological order of events + 3. This avoids re-processing old data that happens to sort later alphabetically + + Corrupted data (like '[Ljava.lang.String;@...' in unit_name) is explicitly excluded. + + Returns: + The consolidation key of the most recent event, or None if table is empty + """ + try: + with self.pg_conn.connection.cursor() as cursor: + # Find the row with maximum event_timestamp (most recent event) + # We use ONLY event_timestamp because ordering by unit_name can pick up + # corrupted data (like Java error strings) which sort incorrectly + cursor.execute( + f""" + SELECT unit_name, tool_name_id, + DATE(event_timestamp)::text as event_date, + event_timestamp::time::text as event_time, + event_timestamp + FROM {self.table_name} + WHERE unit_name NOT LIKE '[L%' -- Exclude corrupted Java strings + ORDER BY event_timestamp DESC + LIMIT 1 + """ + ) + result = cursor.fetchone() + if result: + return { + "unit_name": result[0], + "tool_name_id": result[1], + "event_date": result[2], + "event_time": result[3] + } + return None + except Exception as e: + logger.error(f"Could not get max last_key from data table {self.table_name}: {e}") + return None diff --git a/src/transformers/schema_transformer.py b/src/transformers/schema_transformer.py index 1608c8b..58a9da8 100644 --- a/src/transformers/schema_transformer.py +++ b/src/transformers/schema_transformer.py @@ -1,4 +1,12 @@ -"""PostgreSQL schema creation from MySQL structure.""" +"""PostgreSQL schema creation for MySQL to PostgreSQL migration. + +New design: +- id: BIGSERIAL PRIMARY KEY (auto-increment) +- mysql_max_id: max(idElabData) from consolidated MySQL records +- event_timestamp: TIMESTAMP created from MySQL EventDate + EventTime +- Consolidation key MySQL: (UnitName, ToolNameID, EventDate, EventTime) +- NodeNum consolidated in measurements JSONB +""" from config import PARTITION_YEARS from src.utils.logger import get_logger @@ -8,31 +16,37 @@ logger = get_logger(__name__) def create_rawdatacor_schema() -> str: """Create PostgreSQL schema for RAWDATACOR table. + Schema design: + - id: Auto-increment primary key (PostgreSQL sequence) + - mysql_max_id: Max ID from MySQL records that were consolidated + - Consolidation: Multiple MySQL rows (different NodeNum) → 1 PostgreSQL row + - measurements JSONB structure: + { + "node_1": {"0": {"value": "123.45", "unit": "°C"}, ...}, + "node_2": {"0": {"value": "67.89", "unit": "bar"}, ...}, + ... + } + Returns: SQL script to create the table with partitions """ sql = """ --- Create sequence for id auto-increment -CREATE SEQUENCE IF NOT EXISTS rawdatacor_id_seq; - --- Create RAWDATACOR table with partitioning --- Note: node_num is stored in measurements JSONB, not as a separate column +-- Create RAWDATACOR table with partitioning by year CREATE TABLE IF NOT EXISTS rawdatacor ( - id BIGINT NOT NULL DEFAULT nextval('rawdatacor_id_seq'), - unit_name VARCHAR(32), - tool_name_id VARCHAR(32) NOT NULL, + id bigint GENERATED BY DEFAULT AS IDENTITY, + mysql_max_id INTEGER, + unit_name VARCHAR(50) NOT NULL, + tool_name_id VARCHAR(50) NOT NULL, event_timestamp TIMESTAMP NOT NULL, - bat_level NUMERIC(4,2) NOT NULL, - temperature NUMERIC(5,2) NOT NULL, - measurements JSONB, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + event_year smallint NOT NULL, + measurements JSONB NOT NULL, + bat_level NUMERIC(4,2), + temperature NUMERIC(5,2), bat_level_module NUMERIC(4,2), temperature_module NUMERIC(5,2), - rssi_module INTEGER -) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp)); - --- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints --- with RANGE partitioning on expressions. Using sequence for id auto-increment. + rssi_module INTEGER, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +) PARTITION BY RANGE (event_year); -- Create partitions for each year """ @@ -54,21 +68,21 @@ CREATE TABLE IF NOT EXISTS rawdatacor_default # Add indexes sql += """ --- Create indexes -CREATE INDEX IF NOT EXISTS idx_unit_tool_datetime_raw - ON rawdatacor(unit_name, tool_name_id, event_timestamp); +-- Create indexes for efficient queries +-- UNIQUE constraint on (id, event_year) for partitioned table primary key +CREATE UNIQUE INDEX IF NOT EXISTS rawdatacor_pkey + ON rawdatacor(id, event_year); -CREATE INDEX IF NOT EXISTS idx_unit_tool_raw +-- UNIQUE constraint on consolidation key to prevent duplicates +-- This is the key used to consolidate MySQL rows (UnitName, ToolNameID, EventDate, EventTime) +CREATE UNIQUE INDEX IF NOT EXISTS rawdatacor_consolidation_key_unique + ON rawdatacor(unit_name, tool_name_id, event_timestamp, event_year); + +CREATE INDEX IF NOT EXISTS idx_rawdatacor_unit_tool ON rawdatacor(unit_name, tool_name_id); -CREATE INDEX IF NOT EXISTS idx_measurements_gin_raw +CREATE INDEX IF NOT EXISTS idx_rawdatacor_measurements_gin ON rawdatacor USING GIN (measurements); - -CREATE INDEX IF NOT EXISTS idx_event_timestamp_raw - ON rawdatacor(event_timestamp); - -CREATE INDEX IF NOT EXISTS idx_id_raw - ON rawdatacor(id); """ return sql @@ -77,27 +91,39 @@ CREATE INDEX IF NOT EXISTS idx_id_raw def create_elabdatadisp_schema() -> str: """Create PostgreSQL schema for ELABDATADISP table. + Schema design: + - id: Auto-increment primary key (PostgreSQL sequence) + - mysql_max_id: Max idElabData from MySQL records that were consolidated + - Consolidation: Multiple MySQL rows (different NodeNum) → 1 PostgreSQL row + - measurements JSONB structure: + { + "node_1": { + "shifts": {"x": 1.234, "y": 2.345, ...}, + "coordinates": {"x": 10.123, "y": 20.234, ...}, + "kinematics": {...}, + "sensors": {...}, + "calculated": {...} + }, + "node_2": { ... }, + ... + } + Returns: SQL script to create the table with partitions """ sql = """ --- Create sequence for id_elab_data auto-increment -CREATE SEQUENCE IF NOT EXISTS elabdatadisp_id_seq; - --- Create ELABDATADISP table with partitioning --- Note: node_num, state, and calc_err are stored in measurements JSONB, not as separate columns +-- Create ELABDATADISP table with partitioning by year CREATE TABLE IF NOT EXISTS elabdatadisp ( - id_elab_data BIGINT NOT NULL DEFAULT nextval('elabdatadisp_id_seq'), - unit_name VARCHAR(32), - tool_name_id VARCHAR(32) NOT NULL, + id bigint GENERATED BY DEFAULT AS IDENTITY, + mysql_max_id INTEGER NOT NULL, + unit_name VARCHAR(50), + tool_name_id VARCHAR(50), event_timestamp TIMESTAMP NOT NULL, - measurements JSONB, + event_year smallint NOT NULL, + measurements JSONB NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp)); - --- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints --- with RANGE partitioning on expressions. Using sequence for id_elab_data auto-increment. +) PARTITION BY RANGE (event_year); -- Create partitions for each year """ @@ -119,21 +145,21 @@ CREATE TABLE IF NOT EXISTS elabdatadisp_default # Add indexes sql += """ --- Create indexes -CREATE INDEX IF NOT EXISTS idx_unit_tool_datetime_elab - ON elabdatadisp(unit_name, tool_name_id, event_timestamp); +-- Create indexes for efficient queries +-- UNIQUE constraint on (id, event_year) for partitioned table primary key +CREATE UNIQUE INDEX IF NOT EXISTS elabdatadisp_pkey + ON elabdatadisp(id, event_year); -CREATE INDEX IF NOT EXISTS idx_unit_tool_elab +-- UNIQUE constraint on consolidation key to prevent duplicates +-- This is the key used to consolidate MySQL rows (UnitName, ToolNameID, EventDate, EventTime) +CREATE UNIQUE INDEX IF NOT EXISTS elabdatadisp_consolidation_key_unique + ON elabdatadisp(unit_name, tool_name_id, event_timestamp, event_year); + +CREATE INDEX IF NOT EXISTS idx_elabdatadisp_unit_tool ON elabdatadisp(unit_name, tool_name_id); -CREATE INDEX IF NOT EXISTS idx_measurements_gin_elab +CREATE INDEX IF NOT EXISTS idx_elabdatadisp_measurements_gin ON elabdatadisp USING GIN (measurements); - -CREATE INDEX IF NOT EXISTS idx_event_timestamp_elab - ON elabdatadisp(event_timestamp); - -CREATE INDEX IF NOT EXISTS idx_id_elab_data - ON elabdatadisp(id_elab_data); """ return sql @@ -142,21 +168,42 @@ CREATE INDEX IF NOT EXISTS idx_id_elab_data def create_migration_state_table() -> str: """Create table to track migration state. + Tracks migration progress per partition for parallel processing: + - table_name + partition_name: Composite primary key for per-partition tracking + - last_key: Last consolidated key migrated (unit_name, tool_name_id, event_date, event_time) + - total_rows_migrated: Count of PostgreSQL rows (consolidated) for this partition + - status: pending, in_progress, completed + Returns: SQL to create migration_state table """ sql = """ --- Create table to track migration state +-- Create table to track migration state (supports parallel partition migration) CREATE TABLE IF NOT EXISTS migration_state ( - table_name VARCHAR(255) PRIMARY KEY, - last_migrated_timestamp TIMESTAMP, - last_migrated_id BIGINT, + table_name VARCHAR(255) NOT NULL, + partition_name VARCHAR(255) NOT NULL, + last_key JSONB, migration_started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, migration_completed_at TIMESTAMP, total_rows_migrated BIGINT DEFAULT 0, status VARCHAR(32) DEFAULT 'pending', - last_completed_partition VARCHAR(255) + PRIMARY KEY (table_name, partition_name), + CONSTRAINT last_key_structure CHECK ( + last_key IS NULL OR ( + last_key ? 'unit_name' AND + last_key ? 'tool_name_id' AND + last_key ? 'event_date' AND + last_key ? 'event_time' + ) + ) ); + +-- Index for querying migration state +CREATE INDEX IF NOT EXISTS idx_migration_state_status + ON migration_state(table_name, status); + +CREATE INDEX IF NOT EXISTS idx_migration_state_partition + ON migration_state(partition_name); """ return sql diff --git a/src/utils/progress.py b/src/utils/progress.py index 99f632a..f4ae36b 100644 --- a/src/utils/progress.py +++ b/src/utils/progress.py @@ -1,73 +1,109 @@ -"""Progress tracking utility.""" -from rich.progress import ( - Progress, - SpinnerColumn, - BarColumn, - TaskProgressColumn, - TimeRemainingColumn, - TimeElapsedColumn, - TransferSpeedColumn, -) -from rich.console import Console +"""Progress tracking utility - lightweight logging version.""" import time +from typing import Optional +from src.utils.logger import get_logger + +logger = get_logger(__name__) class ProgressTracker: - """Track migration progress with Rich progress bar.""" + """Track migration progress with periodic text logging (lightweight).""" - def __init__(self, total: int, description: str = "Migrating"): + def __init__(self, total: Optional[int] = None, description: str = "Migrating", log_interval: Optional[int] = None): """Initialize progress tracker. Args: - total: Total number of items to process + total: Total number of items to process (None if unknown) description: Description of the task + log_interval: Log progress every N items (if None, reads from config) """ self.total = total self.description = description - self.progress = Progress( - SpinnerColumn(), - BarColumn(), - TaskProgressColumn(), - TimeElapsedColumn(), - TimeRemainingColumn(), - TransferSpeedColumn(), - console=Console(), - ) - self.task_id = None + + # Read log_interval from config if not provided + if log_interval is None: + from config import get_settings + self.log_interval = get_settings().migration.progress_log_interval + else: + self.log_interval = log_interval + self.start_time = None self.processed = 0 + self.last_log_count = 0 def __enter__(self): """Context manager entry.""" - self.progress.start() - self.task_id = self.progress.add_task( - self.description, total=self.total - ) self.start_time = time.time() + logger.info(f"Starting: {self.description}") return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" - self.progress.stop() if exc_type is None: elapsed = time.time() - self.start_time rate = self.processed / elapsed if elapsed > 0 else 0 - self.progress.console.print( - f"[green]✓ Completed: {self.processed}/{self.total} items " - f"in {elapsed:.2f}s ({rate:.0f} items/sec)[/green]" - ) + + # Format elapsed time + if elapsed < 60: + elapsed_str = f"{elapsed:.1f}s" + elif elapsed < 3600: + elapsed_str = f"{elapsed/60:.1f}m" + else: + elapsed_str = f"{elapsed/3600:.1f}h" + + if self.total is not None: + logger.info( + f"✓ Completed: {self.processed:,}/{self.total:,} items " + f"in {elapsed_str} ({rate:.0f} items/sec)" + ) + else: + logger.info( + f"✓ Completed: {self.processed:,} items " + f"in {elapsed_str} ({rate:.0f} items/sec)" + ) def update(self, advance: int = 1): - """Update progress. + """Update progress and log periodically. Args: advance: Number of items processed """ - if self.task_id is not None: - self.progress.update(self.task_id, advance=advance) - self.processed += advance + self.processed += advance + + # Log every log_interval items + if self.processed - self.last_log_count >= self.log_interval: + self._log_progress() + self.last_log_count = self.processed + + def _log_progress(self): + """Log current progress.""" + elapsed = time.time() - self.start_time + rate = self.processed / elapsed if elapsed > 0 else 0 + + # Format elapsed time + if elapsed < 60: + elapsed_str = f"{elapsed:.0f}s" + elif elapsed < 3600: + mins = int(elapsed / 60) + secs = int(elapsed % 60) + elapsed_str = f"{mins}m {secs}s" + else: + hours = int(elapsed / 3600) + mins = int((elapsed % 3600) / 60) + elapsed_str = f"{hours}h {mins}m" + + if self.total is not None: + progress_pct = (self.processed / self.total) * 100 + logger.info( + f"{self.description}: {self.processed:,}/{self.total:,} items " + f"({progress_pct:.1f}%) - elapsed: {elapsed_str} - rate: {rate:.0f} items/sec" + ) + else: + logger.info( + f"{self.description}: {self.processed:,} items " + f"- elapsed: {elapsed_str} - rate: {rate:.0f} items/sec" + ) def print_status(self, message: str): - """Print a status message without interrupting progress bar.""" - if self.task_id is not None: - self.progress.print(message) + """Print a status message.""" + logger.info(message) diff --git a/src/utils/validation.py b/src/utils/validation.py new file mode 100644 index 0000000..2b82c82 --- /dev/null +++ b/src/utils/validation.py @@ -0,0 +1,157 @@ +"""Data validation utilities for migration.""" +from typing import Dict, Any, Optional, Tuple +from datetime import datetime, date +import os +from src.utils.logger import get_logger + +logger = get_logger(__name__) + + +class ErrorLogger: + """Log invalid migration keys to a file.""" + + def __init__(self, table: str, partition: str): + """Initialize error logger. + + Args: + table: Table name + partition: Partition name + """ + self.table = table + self.partition = partition + self.error_file = f"migration_errors_{table}_{partition}.log" + self.error_count = 0 + + # Create error file with header + with open(self.error_file, "w") as f: + f.write(f"# Migration errors for {table} partition {partition}\n") + f.write("# Format: UnitName|ToolNameID|EventDate|EventTime|Reason\n\n") + + logger.info(f"Error log file created: {self.error_file}") + + def log_invalid_key( + self, + unit_name: Any, + tool_name_id: Any, + event_date: Any, + event_time: Any, + reason: str + ) -> None: + """Log an invalid consolidation key. + + Args: + unit_name: UnitName value + tool_name_id: ToolNameID value + event_date: EventDate value + event_time: EventTime value + reason: Reason for rejection + """ + with open(self.error_file, "a") as f: + f.write(f"{unit_name}|{tool_name_id}|{event_date}|{event_time}|{reason}\n") + + self.error_count += 1 + + if self.error_count % 100 == 0: + logger.warning(f"Logged {self.error_count} invalid keys to {self.error_file}") + + def get_error_count(self) -> int: + """Get total number of errors logged. + + Returns: + Number of errors logged + """ + return self.error_count + + def close(self) -> None: + """Close error logger and log summary.""" + if self.error_count > 0: + logger.warning( + f"Total invalid keys for {self.table} partition {self.partition}: " + f"{self.error_count} (see {self.error_file})" + ) + else: + logger.info(f"No invalid keys found for {self.table} partition {self.partition}") + # Remove empty error file + if os.path.exists(self.error_file): + os.remove(self.error_file) + + +def validate_consolidation_key( + unit_name: Any, + tool_name_id: Any, + event_date: Any, + event_time: Any +) -> Tuple[bool, Optional[str]]: + """Validate a consolidation key. + + Args: + unit_name: UnitName value + tool_name_id: ToolNameID value + event_date: EventDate value + event_time: EventTime value + + Returns: + Tuple of (is_valid, error_reason) + If valid: (True, None) + If invalid: (False, "reason description") + """ + # Check for NULL unit_name or tool_name_id + if unit_name is None or unit_name == "": + return False, "UnitName is NULL or empty" + + if tool_name_id is None or tool_name_id == "": + return False, "ToolNameID is NULL or empty" + + # Check for NULL or invalid dates + if event_date is None: + return False, "EventDate is NULL" + + # Check for invalid date like '0000-00-00' + try: + if isinstance(event_date, str): + if event_date.startswith("0000-00-00"): + return False, f"EventDate is invalid: {event_date}" + # Try to parse + parsed_date = datetime.strptime(event_date, "%Y-%m-%d").date() + elif isinstance(event_date, (date, datetime)): + parsed_date = event_date if isinstance(event_date, date) else event_date.date() + # Check for zero date + if parsed_date.year == 0: + return False, f"EventDate year is 0: {event_date}" + else: + return False, f"EventDate has invalid type: {type(event_date)}" + except (ValueError, AttributeError) as e: + return False, f"EventDate parsing failed: {event_date} ({e})" + + # Check for NULL event_time + if event_time is None: + return False, "EventTime is NULL" + + return True, None + + +def validate_mysql_row(row: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """Validate a complete MySQL row for migration. + + Args: + row: MySQL row dictionary + + Returns: + Tuple of (is_valid, error_reason) + """ + # Validate consolidation key + is_valid, reason = validate_consolidation_key( + row.get("UnitName"), + row.get("ToolNameID"), + row.get("EventDate"), + row.get("EventTime") + ) + + if not is_valid: + return False, reason + + # Check for NodeNum + if row.get("NodeNum") is None: + return False, "NodeNum is NULL" + + return True, None