fix incremental

This commit is contained in:
2025-12-30 15:16:54 +01:00
parent 79cd4f4559
commit 5c9df3d06f
14 changed files with 2901 additions and 233 deletions

View File

@@ -52,6 +52,7 @@ class MigrationSettings(BaseSettings):
consolidation_group_limit: int = 10000 consolidation_group_limit: int = 10000
log_level: str = "INFO" log_level: str = "INFO"
dry_run: bool = False dry_run: bool = False
progress_log_interval: int = 50000
class BenchmarkSettings(BaseSettings): class BenchmarkSettings(BaseSettings):
@@ -152,20 +153,32 @@ ELABDATADISP_FIELD_MAPPING = {
# PostgreSQL Partition years (from both tables) # PostgreSQL Partition years (from both tables)
PARTITION_YEARS = list(range(2014, 2032)) # 2014-2031 PARTITION_YEARS = list(range(2014, 2032)) # 2014-2031
# Consolidation key definition (same for both tables)
# Multiple MySQL rows with same key but different NodeNum → 1 PostgreSQL row
# MySQL source fields
CONSOLIDATION_KEY_FIELDS = ["UnitName", "ToolNameID", "EventDate", "EventTime"]
# Keys for tracking in migration_state.last_key (NOT actual PostgreSQL target columns)
# Note: In PostgreSQL target, EventDate+EventTime become event_timestamp
CONSOLIDATION_KEY_PG_FIELDS = ["unit_name", "tool_name_id", "event_date", "event_time"]
# Table configurations - support both uppercase and lowercase keys # Table configurations - support both uppercase and lowercase keys
_rawdatacor_config = { _rawdatacor_config = {
"mysql_table": "RAWDATACOR", "mysql_table": "RAWDATACOR",
"postgres_table": "rawdatacor", "postgres_table": "rawdatacor",
"primary_key": "id", "mysql_pk": "id", # MySQL primary key
"postgres_pk": "id", # Primary key column name in PostgreSQL "postgres_pk": "id", # PostgreSQL auto-increment primary key
"partition_key": "event_timestamp", "mysql_max_id_field": "id", # Field to track max ID from MySQL
"consolidation_key": CONSOLIDATION_KEY_FIELDS,
"consolidation_key_pg": CONSOLIDATION_KEY_PG_FIELDS,
} }
_elabdatadisp_config = { _elabdatadisp_config = {
"mysql_table": "ELABDATADISP", "mysql_table": "ELABDATADISP",
"postgres_table": "elabdatadisp", "postgres_table": "elabdatadisp",
"primary_key": "idElabData", "mysql_pk": "idElabData", # MySQL primary key
"postgres_pk": "id_elab_data", # Primary key column name in PostgreSQL "postgres_pk": "id", # PostgreSQL auto-increment primary key
"partition_key": "event_timestamp", "mysql_max_id_field": "idElabData", # Field to track max ID from MySQL
"consolidation_key": CONSOLIDATION_KEY_FIELDS,
"consolidation_key_pg": CONSOLIDATION_KEY_PG_FIELDS,
} }
TABLE_CONFIGS = { TABLE_CONFIGS = {

43
main.py
View File

@@ -6,8 +6,9 @@ from pathlib import Path
from config import get_settings from config import get_settings
from src.utils.logger import setup_logger, get_logger from src.utils.logger import setup_logger, get_logger
from src.transformers.schema_transformer import get_full_schema_script from src.transformers.schema_transformer import get_full_schema_script
from src.migrator.full_migration import run_full_migration from src.migrator.full_migrator import run_full_migration
from src.migrator.incremental_migration import run_incremental_migration from src.migrator.incremental_migrator import run_incremental_migration
from src.migrator.parallel_migrator import run_parallel_migration
from src.benchmark.performance_test import run_benchmark from src.benchmark.performance_test import run_benchmark
from src.connectors.postgres_connector import PostgreSQLConnector from src.connectors.postgres_connector import PostgreSQLConnector
@@ -80,18 +81,36 @@ def migrate():
default=None, default=None,
help="Only migrate this partition (for testing/debugging)" help="Only migrate this partition (for testing/debugging)"
) )
def full(table, dry_run, resume, partition): @click.option(
"--parallel",
type=int,
default=None,
help="Number of parallel workers (e.g., --parallel 5 for 5 workers)"
)
def full(table, dry_run, resume, partition, parallel):
"""Perform full migration of all data.""" """Perform full migration of all data."""
setup_logger(__name__) setup_logger(__name__)
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table] tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
# Validate options
if parallel and partition:
click.echo("✗ Cannot use --parallel with --partition", err=True)
sys.exit(1)
try: try:
total_migrated = 0 total_migrated = 0
for tbl in tables: for tbl in tables:
click.echo(f"\nMigrating {tbl}" + (f" (partition {partition})" if partition else "") + "...") if parallel:
migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume, partition=partition) # Parallel migration mode
click.echo(f"\nMigrating {tbl} with {parallel} parallel workers...")
migrated = run_parallel_migration(tbl, num_workers=parallel, dry_run=dry_run, resume=resume)
else:
# Sequential migration mode
click.echo(f"\nMigrating {tbl}" + (f" (partition {partition})" if partition else "") + "...")
migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume, partition=partition)
total_migrated += migrated total_migrated += migrated
click.echo(f"{tbl}: {migrated} rows migrated") click.echo(f"{tbl}: {migrated} rows migrated")
@@ -115,14 +134,9 @@ def full(table, dry_run, resume, partition):
is_flag=True, is_flag=True,
help="Show what would be done without modifying data" help="Show what would be done without modifying data"
) )
@click.option( def incremental(table, dry_run):
"--state-file", """Perform incremental migration since last sync (based on consolidation keys)."""
default="migration_state.json", setup_logger("") # Set up root logger so all child loggers work
help="Path to migration state file"
)
def incremental(table, dry_run, state_file):
"""Perform incremental migration since last sync."""
setup_logger(__name__)
tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table] tables = ["RAWDATACOR", "ELABDATADISP"] if table == "all" else [table]
@@ -131,7 +145,7 @@ def incremental(table, dry_run, state_file):
for tbl in tables: for tbl in tables:
click.echo(f"\nIncremental migration for {tbl}...") click.echo(f"\nIncremental migration for {tbl}...")
migrated = run_incremental_migration(tbl, dry_run=dry_run, state_file=state_file) migrated = run_incremental_migration(tbl, dry_run=dry_run)
total_migrated += migrated total_migrated += migrated
if migrated > 0: if migrated > 0:
click.echo(f"{tbl}: {migrated} rows migrated") click.echo(f"{tbl}: {migrated} rows migrated")
@@ -196,6 +210,7 @@ def info():
click.echo("\n[Migration Settings]") click.echo("\n[Migration Settings]")
click.echo(f" Batch Size: {settings.migration.batch_size}") click.echo(f" Batch Size: {settings.migration.batch_size}")
click.echo(f" Consolidation Group Limit: {settings.migration.consolidation_group_limit}")
click.echo(f" Log Level: {settings.migration.log_level}") click.echo(f" Log Level: {settings.migration.log_level}")
click.echo(f" Dry Run: {settings.migration.dry_run}") click.echo(f" Dry Run: {settings.migration.dry_run}")

View File

@@ -1,6 +1,6 @@
"""MySQL database connector.""" """MySQL database connector."""
import pymysql import pymysql
from typing import List, Dict, Any, Optional, Generator from typing import List, Dict, Any, Optional, Generator, Iterator
from config import get_settings from config import get_settings
from src.utils.logger import get_logger from src.utils.logger import get_logger
@@ -10,6 +10,8 @@ logger = get_logger(__name__)
class MySQLConnector: class MySQLConnector:
"""Connector for MySQL database.""" """Connector for MySQL database."""
MAX_RETRIES = 3 # Number of retries for transient connection errors
def __init__(self): def __init__(self):
"""Initialize MySQL connector with settings.""" """Initialize MySQL connector with settings."""
self.settings = get_settings() self.settings = get_settings()
@@ -38,6 +40,16 @@ class MySQLConnector:
logger.error(f"Failed to connect to MySQL: {e}") logger.error(f"Failed to connect to MySQL: {e}")
raise raise
def _reconnect(self) -> None:
"""Reconnect to MySQL database after connection loss."""
try:
self.disconnect()
self.connect()
logger.info("Successfully reconnected to MySQL")
except Exception as e:
logger.error(f"Failed to reconnect to MySQL: {e}")
raise
def disconnect(self) -> None: def disconnect(self) -> None:
"""Close connection to MySQL database.""" """Close connection to MySQL database."""
if self.connection: if self.connection:
@@ -62,14 +74,21 @@ class MySQLConnector:
Returns: Returns:
Number of rows in the table Number of rows in the table
""" """
try: retries = 0
with self.connection.cursor() as cursor: while retries < self.MAX_RETRIES:
cursor.execute(f"SELECT COUNT(*) as count FROM `{table}`") try:
result = cursor.fetchone() with self.connection.cursor() as cursor:
return result["count"] cursor.execute(f"SELECT COUNT(*) as count FROM `{table}`")
except pymysql.Error as e: result = cursor.fetchone()
logger.error(f"Failed to get row count for {table}: {e}") return result["count"]
raise except pymysql.Error as e:
retries += 1
if retries >= self.MAX_RETRIES:
logger.error(f"Failed to get row count for {table} after {self.MAX_RETRIES} retries: {e}")
raise
else:
logger.warning(f"Get row count failed (retry {retries}/{self.MAX_RETRIES}): {e}")
self._reconnect()
def fetch_rows_since( def fetch_rows_since(
self, self,
@@ -195,19 +214,26 @@ class MySQLConnector:
Returns: Returns:
List of partition names List of partition names
""" """
try: retries = 0
with self.connection.cursor() as cursor: while retries < self.MAX_RETRIES:
cursor.execute(""" try:
SELECT PARTITION_NAME with self.connection.cursor() as cursor:
FROM INFORMATION_SCHEMA.PARTITIONS cursor.execute("""
WHERE TABLE_NAME = %s SELECT PARTITION_NAME
AND TABLE_SCHEMA = %s FROM INFORMATION_SCHEMA.PARTITIONS
ORDER BY PARTITION_NAME WHERE TABLE_NAME = %s
""", (table, self.settings.mysql.database)) AND TABLE_SCHEMA = %s
return [row["PARTITION_NAME"] for row in cursor.fetchall()] ORDER BY PARTITION_NAME
except pymysql.Error as e: """, (table, self.settings.mysql.database))
logger.error(f"Failed to get partitions for {table}: {e}") return [row["PARTITION_NAME"] for row in cursor.fetchall()]
raise except pymysql.Error as e:
retries += 1
if retries >= self.MAX_RETRIES:
logger.error(f"Failed to get partitions for {table} after {self.MAX_RETRIES} retries: {e}")
raise
else:
logger.warning(f"Get table partitions failed (retry {retries}/{self.MAX_RETRIES}): {e}")
self._reconnect()
def fetch_consolidation_groups_from_partition( def fetch_consolidation_groups_from_partition(
self, self,
@@ -215,7 +241,8 @@ class MySQLConnector:
partition: str, partition: str,
limit: Optional[int] = None, limit: Optional[int] = None,
offset: int = 0, offset: int = 0,
start_id: Optional[int] = None start_id: Optional[int] = None,
start_key: Optional[tuple] = None
) -> Generator[List[Dict[str, Any]], None, None]: ) -> Generator[List[Dict[str, Any]], None, None]:
"""Fetch consolidation groups from a partition. """Fetch consolidation groups from a partition.
@@ -231,6 +258,8 @@ class MySQLConnector:
limit: Batch size for consolidation (uses config default if None) limit: Batch size for consolidation (uses config default if None)
offset: Starting offset for pagination (unused, kept for compatibility) offset: Starting offset for pagination (unused, kept for compatibility)
start_id: Resume from this ID (fetch id > start_id). If None, starts from beginning start_id: Resume from this ID (fetch id > start_id). If None, starts from beginning
start_key: Resume AFTER this consolidation key (unit_name, tool_name_id, event_date, event_time).
If provided, skips all keys <= start_key in MySQL query (efficient resume).
Yields: Yields:
Lists of rows grouped by consolidation key (complete groups only) Lists of rows grouped by consolidation key (complete groups only)
@@ -243,8 +272,9 @@ class MySQLConnector:
# Determine ID column name # Determine ID column name
id_column = "idElabData" if table == "ELABDATADISP" else "id" id_column = "idElabData" if table == "ELABDATADISP" else "id"
max_retries = 3
last_completed_key = None # Last key we fully yielded (not incomplete) # Initialize last_completed_key with start_key for efficient resume
last_completed_key = start_key if start_key else None
# CRITICAL: These must be OUTSIDE the while loop to persist across batch iterations # CRITICAL: These must be OUTSIDE the while loop to persist across batch iterations
# If they're inside the loop, buffered incomplete groups from previous batches get lost # If they're inside the loop, buffered incomplete groups from previous batches get lost
@@ -253,7 +283,7 @@ class MySQLConnector:
while True: while True:
retries = 0 retries = 0
while retries < max_retries: while retries < self.MAX_RETRIES:
try: try:
with self.connection.cursor() as cursor: with self.connection.cursor() as cursor:
# ORDER BY consolidation key # ORDER BY consolidation key
@@ -330,15 +360,391 @@ class MySQLConnector:
except pymysql.Error as e: except pymysql.Error as e:
retries += 1 retries += 1
if retries >= max_retries: if retries >= self.MAX_RETRIES:
logger.error(f"Failed to fetch consolidation groups from {table} partition {partition} after {max_retries} retries: {e}") logger.error(f"Failed to fetch consolidation groups from {table} partition {partition} after {self.MAX_RETRIES} retries: {e}")
raise raise
else: else:
logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}") logger.warning(f"Fetch consolidation groups failed (retry {retries}/{self.MAX_RETRIES}): {e}")
# Reconnect and retry self._reconnect()
try:
self.disconnect() def stream_partition_rows(
self.connect() self,
except Exception as reconnect_error: table: str,
logger.error(f"Failed to reconnect: {reconnect_error}") partition: str,
raise batch_size: int = 10000,
resume_from_key: Optional[Dict[str, Any]] = None
) -> Iterator[Dict[str, Any]]:
"""Stream all rows from a partition ordered by consolidation key.
This eliminates the N+1 query pattern by fetching all data in a single
streaming query ordered by consolidation key. Rows are yielded one at a time.
Query pattern:
SELECT *
FROM table PARTITION (partition)
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?) -- if resuming
ORDER BY UnitName, ToolNameID, EventDate, EventTime, NodeNum
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
partition: Partition name (e.g., 'p2024')
batch_size: Number of rows to fetch at a time from server
resume_from_key: Last processed key to resume from (optional)
Format: {"unit_name": ..., "tool_name_id": ..., "event_date": ..., "event_time": ...}
Yields:
MySQL row dicts, ordered by consolidation key then NodeNum
"""
if table not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Consolidation not supported for table {table}")
retries = 0
while retries < self.MAX_RETRIES:
try:
# Use regular DictCursor (client-side) to avoid filling /tmp on MySQL server
# SSCursor creates temp files on MySQL server which can fill /tmp
with self.connection.cursor() as cursor:
# Build WHERE clause for resume
where_clause = ""
params = []
if resume_from_key:
# Resume from last key using tuple comparison
where_clause = """
WHERE (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s)
"""
params = [
resume_from_key.get("unit_name"),
resume_from_key.get("tool_name_id"),
resume_from_key.get("event_date"),
resume_from_key.get("event_time"),
]
query = f"""
SELECT *
FROM `{table}` PARTITION (`{partition}`)
{where_clause}
ORDER BY UnitName, ToolNameID, EventDate, EventTime, NodeNum
"""
logger.info(
f"Starting stream from {table} partition {partition}"
+ (f" (resuming from key)" if resume_from_key else "")
)
cursor.execute(query, tuple(params) if params else None)
# Fetch all rows (client-side cursor fetches to local memory)
# This avoids creating temp files on MySQL server
rows = cursor.fetchall()
logger.info(f"Fetched {len(rows)} rows from partition {partition}")
# Yield rows in batches for memory efficiency
for i in range(0, len(rows), batch_size):
batch = rows[i:i+batch_size]
for row in batch:
yield row
logger.info(f"Finished streaming partition {partition}")
return # Success
except pymysql.Error as e:
retries += 1
if retries >= self.MAX_RETRIES:
logger.error(
f"Failed to stream from {table} partition {partition} "
f"after {self.MAX_RETRIES} retries: {e}"
)
raise
else:
logger.warning(
f"Stream failed (retry {retries}/{self.MAX_RETRIES}): {e}"
)
self._reconnect()
def fetch_consolidation_keys_from_partition(
self,
table: str,
partition: str,
limit: Optional[int] = None,
offset: int = 0
) -> List[Dict[str, Any]]:
"""Fetch distinct consolidation keys from a partition.
Query pattern:
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM table PARTITION (partition)
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT X OFFSET Y
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
partition: Partition name (e.g., 'p2024')
limit: Number of keys to fetch (uses CONSOLIDATION_GROUP_LIMIT if None)
offset: Starting offset for pagination
Returns:
List of dicts with keys: UnitName, ToolNameID, EventDate, EventTime
"""
if table not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Consolidation not supported for table {table}")
if limit is None:
limit = self.settings.migration.consolidation_group_limit
retries = 0
while retries < self.MAX_RETRIES:
try:
with self.connection.cursor() as cursor:
query = f"""
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM `{table}` PARTITION (`{partition}`)
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT %s OFFSET %s
"""
cursor.execute(query, (limit, offset))
keys = cursor.fetchall()
return keys
except pymysql.Error as e:
retries += 1
if retries >= self.MAX_RETRIES:
logger.error(
f"Failed to fetch consolidation keys from {table} "
f"partition {partition} (offset={offset}) after {self.MAX_RETRIES} retries: {e}"
)
raise
else:
logger.warning(
f"Fetch consolidation keys failed (retry {retries}/{self.MAX_RETRIES}): {e}"
)
self._reconnect()
def fetch_records_for_key(
self,
table: str,
partition: str,
unit_name: Any,
tool_name_id: Any,
event_date: Any,
event_time: Any
) -> List[Dict[str, Any]]:
"""Fetch all records for a specific consolidation key.
Query pattern:
SELECT *
FROM table PARTITION (partition)
WHERE UnitName = ? AND ToolNameID = ?
AND EventDate = ? AND EventTime = ?
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
partition: Partition name
unit_name: UnitName value
tool_name_id: ToolNameID value
event_date: EventDate value
event_time: EventTime value
Returns:
List of all MySQL rows matching the key (different NodeNum)
"""
if table not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Consolidation not supported for table {table}")
retries = 0
while retries < self.MAX_RETRIES:
try:
with self.connection.cursor() as cursor:
query = f"""
SELECT *
FROM `{table}` PARTITION (`{partition}`)
WHERE UnitName = %s
AND ToolNameID = %s
AND EventDate = %s
AND EventTime = %s
ORDER BY NodeNum
"""
cursor.execute(query, (unit_name, tool_name_id, event_date, event_time))
rows = cursor.fetchall()
return rows
except pymysql.Error as e:
retries += 1
if retries >= self.MAX_RETRIES:
logger.error(
f"Failed to fetch records for key "
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}) "
f"from {table} partition {partition} after {self.MAX_RETRIES} retries: {e}"
)
raise
else:
logger.warning(
f"Fetch records for key failed (retry {retries}/{self.MAX_RETRIES}): {e}"
)
self._reconnect()
def fetch_consolidation_keys_after(
self,
table: str,
after_key: Optional[Dict[str, Any]] = None,
min_mysql_id: int = 0,
limit: Optional[int] = None,
offset: int = 0
) -> List[Dict[str, Any]]:
"""Fetch distinct consolidation keys after a specific key (for incremental migration).
Query pattern:
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM table
WHERE id > min_mysql_id
AND (UnitName, ToolNameID, EventDate, EventTime) > (?, ?, ?, ?)
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT X OFFSET Y
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
after_key: Start after this key (dict with unit_name, tool_name_id, event_date, event_time)
min_mysql_id: Only fetch rows with id > this value (optimization to avoid scanning already migrated data)
limit: Number of keys to fetch (uses CONSOLIDATION_GROUP_LIMIT if None)
offset: Starting offset for pagination
Returns:
List of dicts with keys: UnitName, ToolNameID, EventDate, EventTime
"""
if table not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Consolidation not supported for table {table}")
if limit is None:
limit = self.settings.migration.consolidation_group_limit
# Determine ID column name based on table
id_column = "idElabData" if table == "ELABDATADISP" else "id"
retries = 0
while retries < self.MAX_RETRIES:
try:
with self.connection.cursor() as cursor:
if after_key:
# Incremental: fetch keys AFTER the last migrated key
# Filter by ID first for performance (uses PRIMARY KEY index)
query = f"""
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM `{table}`
WHERE `{id_column}` > %s
AND (UnitName, ToolNameID, EventDate, EventTime) > (%s, %s, %s, %s)
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT %s OFFSET %s
"""
cursor.execute(
query,
(
min_mysql_id,
after_key.get("unit_name"),
after_key.get("tool_name_id"),
after_key.get("event_date"),
after_key.get("event_time"),
limit,
offset
)
)
else:
# No after_key: fetch from beginning
# Still filter by ID if min_mysql_id is provided
if min_mysql_id > 0:
query = f"""
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM `{table}`
WHERE `{id_column}` > %s
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT %s OFFSET %s
"""
cursor.execute(query, (min_mysql_id, limit, offset))
else:
query = f"""
SELECT UnitName, ToolNameID, EventDate, EventTime
FROM `{table}`
GROUP BY UnitName, ToolNameID, EventDate, EventTime
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT %s OFFSET %s
"""
cursor.execute(query, (limit, offset))
keys = cursor.fetchall()
return keys
except pymysql.Error as e:
retries += 1
if retries >= self.MAX_RETRIES:
logger.error(
f"Failed to fetch consolidation keys from {table} "
f"(after_key={after_key}, offset={offset}) after {self.MAX_RETRIES} retries: {e}"
)
raise
else:
logger.warning(
f"Fetch consolidation keys after failed (retry {retries}/{self.MAX_RETRIES}): {e}"
)
self._reconnect()
def fetch_records_for_key_all_partitions(
self,
table: str,
unit_name: Any,
tool_name_id: Any,
event_date: Any,
event_time: Any
) -> List[Dict[str, Any]]:
"""Fetch all records for a specific consolidation key across all partitions.
Used for incremental migration where we don't know which partition the key is in.
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
unit_name: UnitName value
tool_name_id: ToolNameID value
event_date: EventDate value
event_time: EventTime value
Returns:
List of all MySQL rows matching the key (different NodeNum)
"""
if table not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Consolidation not supported for table {table}")
retries = 0
while retries < self.MAX_RETRIES:
try:
with self.connection.cursor() as cursor:
query = f"""
SELECT *
FROM `{table}`
WHERE UnitName = %s
AND ToolNameID = %s
AND EventDate = %s
AND EventTime = %s
ORDER BY NodeNum
"""
cursor.execute(query, (unit_name, tool_name_id, event_date, event_time))
rows = cursor.fetchall()
return rows
except pymysql.Error as e:
retries += 1
if retries >= self.MAX_RETRIES:
logger.error(
f"Failed to fetch records for key "
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}) "
f"from {table} after {self.MAX_RETRIES} retries: {e}"
)
raise
else:
logger.warning(
f"Fetch records for key (all partitions) failed (retry {retries}/{self.MAX_RETRIES}): {e}"
)
self._reconnect()

View File

@@ -1,7 +1,6 @@
"""PostgreSQL database connector.""" """PostgreSQL database connector."""
import psycopg import psycopg
from typing import List, Dict, Any, Optional, Iterator from typing import List, Dict, Any
from psycopg import sql
import json import json
from config import get_settings from config import get_settings
from src.utils.logger import get_logger from src.utils.logger import get_logger
@@ -13,72 +12,151 @@ class PostgreSQLConnector:
"""Connector for PostgreSQL database.""" """Connector for PostgreSQL database."""
def __init__(self): def __init__(self):
"""Initialize PostgreSQL connector with settings.""" """Initialize the PostgreSQL connector."""
self.settings = get_settings() self.settings = get_settings()
self.connection = None self.connection = None
def connect(self) -> None: def connect(self):
"""Establish connection to PostgreSQL database.""" """Connect to PostgreSQL."""
try: try:
self.connection = psycopg.connect( self.connection = psycopg.connect(
host=self.settings.postgres.host, host=self.settings.postgres.host,
port=self.settings.postgres.port, port=self.settings.postgres.port,
dbname=self.settings.postgres.database,
user=self.settings.postgres.user, user=self.settings.postgres.user,
password=self.settings.postgres.password, password=self.settings.postgres.password,
dbname=self.settings.postgres.database,
autocommit=False, autocommit=False,
) )
logger.info( logger.info(
f"Connected to PostgreSQL: {self.settings.postgres.host}:" f"Connected to PostgreSQL: {self.settings.postgres.host}:{self.settings.postgres.port}/{self.settings.postgres.database}"
f"{self.settings.postgres.port}/{self.settings.postgres.database}"
) )
except psycopg.Error as e: except psycopg.Error as e:
logger.error(f"Failed to connect to PostgreSQL: {e}") logger.error(f"Failed to connect to PostgreSQL: {e}")
raise raise
def disconnect(self) -> None: def disconnect(self):
"""Close connection to PostgreSQL database.""" """Disconnect from PostgreSQL."""
if self.connection: if self.connection:
self.connection.close() self.connection.close()
logger.info("Disconnected from PostgreSQL") logger.info("Disconnected from PostgreSQL")
def __enter__(self): def __enter__(self):
"""Context manager entry.""" """Context manager entry - connect to database."""
self.connect() self.connect()
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit.""" """Context manager exit - disconnect from database."""
if exc_type is None:
# No exception, commit before closing
try:
self.connection.commit()
except Exception as e:
logger.warning(f"Failed to commit on exit: {e}")
else:
# Exception occurred, rollback
try:
self.connection.rollback()
except Exception as e:
logger.warning(f"Failed to rollback on exit: {e}")
self.disconnect() self.disconnect()
return False
def execute(self, query: str, params: Optional[tuple] = None) -> None: def execute_query(
"""Execute a query without returning results with retry logic. self, query: str, params: tuple = None, fetch: bool = False
) -> List[Dict[str, Any]]:
"""Execute a query and optionally fetch results.
Args: Args:
query: SQL query query: SQL query to execute
params: Query parameters params: Query parameters
fetch: Whether to fetch and return results
Returns:
List of result rows as dictionaries (if fetch=True)
""" """
try:
with self.connection.cursor() as cursor:
cursor.execute(query, params)
if fetch:
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return [dict(zip(columns, row)) for row in rows]
self.connection.commit()
return []
except psycopg.Error as e:
self.connection.rollback()
logger.error(f"Query execution failed: {e}")
raise
def execute_script(self, script: str):
"""Execute a SQL script (multiple statements).
Args:
script: SQL script to execute
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(script)
self.connection.commit()
logger.debug("Script executed successfully")
except psycopg.Error as e:
self.connection.rollback()
logger.error(f"Script execution failed: {e}")
raise
def copy_from(
self,
table: str,
rows: List[Dict[str, Any]],
columns: List[str]
) -> int:
"""Insert a batch of rows using PostgreSQL COPY (10-100x faster than INSERT).
Args:
table: Table name
rows: List of row dictionaries
columns: Column names in order
Returns:
Number of rows inserted
"""
if not rows:
return 0
max_retries = 3 max_retries = 3
retries = 0 retries = 0
while retries < max_retries: while retries < max_retries:
try: try:
with self.connection.cursor() as cursor: with self.connection.cursor() as cursor:
cursor.execute(query, params) # Prepare data for COPY
# COPY expects tab-separated text with one row per line
from io import StringIO
copy_data = StringIO()
for row in rows:
values = []
for col in columns:
val = row.get(col)
# Handle None/NULL
if val is None:
values.append("\\N")
# Convert JSONB dicts to JSON strings
elif isinstance(val, (dict, list)):
# Escape special characters in JSON
json_str = json.dumps(val).replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
values.append(json_str)
# Convert bool to PostgreSQL format
elif isinstance(val, bool):
values.append("t" if val else "f")
# All other values as strings
else:
# Escape special characters
str_val = str(val).replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
values.append(str_val)
copy_data.write("\t".join(values) + "\n")
copy_data.seek(0)
# Use COPY command
with cursor.copy(f"COPY {table} ({','.join(columns)}) FROM STDIN") as copy:
copy.write(copy_data.getvalue())
self.connection.commit() self.connection.commit()
return # Success
logger.debug(f"COPY inserted {len(rows)} rows into {table}")
return len(rows)
except psycopg.Error as e: except psycopg.Error as e:
try: try:
self.connection.rollback() self.connection.rollback()
@@ -87,11 +165,129 @@ class PostgreSQLConnector:
retries += 1 retries += 1
if retries >= max_retries: if retries >= max_retries:
logger.error(f"Query execution failed after {max_retries} retries: {e}\nQuery: {query}") logger.error(f"COPY insert failed after {max_retries} retries: {e}")
raise raise
else: else:
logger.warning( logger.warning(
f"Query execution failed (retry {retries}/{max_retries}): {e}. " f"COPY insert failed (retry {retries}/{max_retries}): {e}. "
f"Reconnecting and retrying..."
)
# Reconnect before retry
try:
self.disconnect()
except Exception:
pass
try:
self.connect()
except Exception as reconnect_error:
logger.error(f"Failed to reconnect: {reconnect_error}")
if retries >= max_retries:
raise
def copy_from_with_conflict(
self,
table: str,
rows: List[Dict[str, Any]],
columns: List[str],
conflict_columns: List[str]
) -> int:
"""Insert a batch of rows using COPY + ON CONFLICT for duplicate handling.
This method is fast (uses COPY) but handles UNIQUE constraint violations:
1. COPY data into a temporary table (fast bulk load)
2. INSERT from temp table with ON CONFLICT DO NOTHING (skip duplicates)
Args:
table: Target table name
rows: List of row dictionaries
columns: Column names in order
conflict_columns: Columns that form the UNIQUE constraint
Returns:
Number of rows inserted (excludes skipped duplicates)
"""
if not rows:
return 0
max_retries = 3
retries = 0
while retries < max_retries:
try:
with self.connection.cursor() as cursor:
# Create temporary table with same structure but without id column
# The id column is auto-generated and shouldn't be in the COPY data
temp_table = f"{table}_temp_{id(rows)}"
cursor.execute(f"""
CREATE TEMP TABLE {temp_table}
(LIKE {table} INCLUDING DEFAULTS EXCLUDING IDENTITY)
ON COMMIT DROP
""")
# Drop the id column from temp table since we don't provide it in COPY
cursor.execute(f"ALTER TABLE {temp_table} DROP COLUMN IF EXISTS id")
# COPY into temp table
from io import StringIO
copy_data = StringIO()
for row in rows:
values = []
for col in columns:
val = row.get(col)
if val is None:
values.append("\\N")
elif isinstance(val, (dict, list)):
json_str = json.dumps(val).replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
values.append(json_str)
elif isinstance(val, bool):
values.append("t" if val else "f")
else:
str_val = str(val).replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
values.append(str_val)
copy_data.write("\t".join(values) + "\n")
copy_data.seek(0)
with cursor.copy(f"COPY {temp_table} ({','.join(columns)}) FROM STDIN") as copy:
copy.write(copy_data.getvalue())
# INSERT from temp table with ON CONFLICT
conflict_clause = f"({','.join(conflict_columns)})"
insert_sql = f"""
INSERT INTO {table} ({','.join(columns)})
SELECT {','.join(columns)}
FROM {temp_table}
ON CONFLICT {conflict_clause} DO NOTHING
"""
cursor.execute(insert_sql)
inserted_count = cursor.rowcount
self.connection.commit()
if inserted_count < len(rows):
logger.debug(
f"COPY+ON CONFLICT inserted {inserted_count}/{len(rows)} rows into {table} "
f"({len(rows) - inserted_count} duplicates skipped)"
)
else:
logger.debug(f"COPY+ON CONFLICT inserted {inserted_count} rows into {table}")
return inserted_count
except psycopg.Error as e:
try:
self.connection.rollback()
except Exception:
pass
retries += 1
if retries >= max_retries:
logger.error(f"COPY+ON CONFLICT insert failed after {max_retries} retries: {e}")
raise
else:
logger.warning(
f"COPY+ON CONFLICT insert failed (retry {retries}/{max_retries}): {e}. "
f"Reconnecting and retrying..." f"Reconnecting and retrying..."
) )
try: try:
@@ -105,22 +301,6 @@ class PostgreSQLConnector:
if retries >= max_retries: if retries >= max_retries:
raise raise
def execute_script(self, script: str) -> None:
"""Execute multiple SQL statements (script).
Args:
script: SQL script with multiple statements
"""
try:
with self.connection.cursor() as cursor:
cursor.execute(script)
self.connection.commit()
logger.debug("Script executed successfully")
except psycopg.Error as e:
self.connection.rollback()
logger.error(f"Script execution failed: {e}")
raise
def insert_batch( def insert_batch(
self, self,
table: str, table: str,
@@ -204,48 +384,21 @@ class PostgreSQLConnector:
table: Table name table: Table name
Returns: Returns:
True if table exists, False otherwise True if table exists
""" """
try: try:
with self.connection.cursor() as cursor: with self.connection.cursor() as cursor:
cursor.execute( cursor.execute(
"SELECT EXISTS(" "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = %s)",
" SELECT 1 FROM information_schema.tables " (table,),
" WHERE table_name = %s"
")",
(table,)
) )
return cursor.fetchone()[0] return cursor.fetchone()[0]
except psycopg.Error as e: except psycopg.Error as e:
logger.error(f"Failed to check if table exists: {e}") logger.error(f"Failed to check table existence: {e}")
raise
def get_max_timestamp(
self,
table: str,
timestamp_col: str = "created_at"
) -> Optional[str]:
"""Get the maximum timestamp from a table.
Args:
table: Table name
timestamp_col: Timestamp column name
Returns:
ISO format timestamp or None if table is empty
"""
try:
with self.connection.cursor() as cursor:
query = f"SELECT MAX({timestamp_col})::text FROM {table}"
cursor.execute(query)
result = cursor.fetchone()
return result[0] if result and result[0] else None
except psycopg.Error as e:
logger.error(f"Failed to get max timestamp: {e}")
raise raise
def get_row_count(self, table: str) -> int: def get_row_count(self, table: str) -> int:
"""Get row count for a table. """Get the number of rows in a table.
Args: Args:
table: Table name table: Table name

View File

@@ -0,0 +1,25 @@
"""Migration modules for MySQL to PostgreSQL migration.
New architecture:
- consolidator: Consolidate MySQL rows by NodeNum into single PostgreSQL row
- state_manager: Track migration progress in PostgreSQL migration_state table
- partition_migrator: Migrate single MySQL partition
- full_migrator: Orchestrate full migration of all partitions
- incremental_migrator: Migrate only new data since last migration
"""
from src.migrator.consolidator import Consolidator, consolidate_rows
from src.migrator.state_manager import StateManager
from src.migrator.partition_migrator import PartitionMigrator
from src.migrator.full_migrator import FullMigrator, run_full_migration
from src.migrator.incremental_migrator import IncrementalMigrator, run_incremental_migration
__all__ = [
"Consolidator",
"consolidate_rows",
"StateManager",
"PartitionMigrator",
"FullMigrator",
"run_full_migration",
"IncrementalMigrator",
"run_incremental_migration",
]

View File

@@ -0,0 +1,330 @@
"""Consolidation logic for MySQL to PostgreSQL migration.
Consolidates multiple MySQL rows (different NodeNum) with same consolidation key
into a single PostgreSQL row with measurements stored in JSONB.
Consolidation key: (UnitName, ToolNameID, EventDate, EventTime)
Multiple nodes → single JSONB structure with node_X keys
"""
from typing import List, Dict, Any, Optional
from datetime import datetime, date, time, timedelta
from decimal import Decimal
from config import (
TABLE_CONFIGS,
RAWDATACOR_COLUMNS,
ELABDATADISP_FIELD_MAPPING,
)
from src.utils.logger import get_logger
logger = get_logger(__name__)
def _combine_datetime(event_date: Any, event_time: Any) -> Optional[datetime]:
"""Combine MySQL EventDate and EventTime into a single datetime.
Args:
event_date: MySQL EventDate (date object or string)
event_time: MySQL EventTime (time object or string)
Returns:
Combined datetime or None if inputs are invalid
"""
if event_date is None or event_time is None:
return None
try:
# Handle if already datetime objects
if isinstance(event_date, datetime):
event_date = event_date.date()
elif isinstance(event_date, str):
event_date = datetime.fromisoformat(event_date).date()
if isinstance(event_time, str):
event_time = datetime.strptime(event_time, "%H:%M:%S").time()
elif isinstance(event_time, datetime):
event_time = event_time.time()
elif isinstance(event_time, timedelta):
# Convert timedelta to time (MySQL TIME type returns timedelta)
total_seconds = int(event_time.total_seconds())
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
event_time = time(hours, minutes, seconds)
# Combine date and time
return datetime.combine(event_date, event_time)
except Exception as e:
logger.warning(f"Failed to combine date {event_date} and time {event_time}: {e}")
return None
class Consolidator:
"""Consolidate MySQL rows into PostgreSQL format with node-based JSONB."""
@staticmethod
def consolidate_rawdatacor(mysql_rows: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Consolidate RAWDATACOR rows by NodeNum into single PostgreSQL row.
Args:
mysql_rows: List of MySQL rows with same (UnitName, ToolNameID, EventDate, EventTime)
but different NodeNum
Returns:
Single PostgreSQL row dict with consolidated measurements
Example measurements structure:
{
"node_1": {
"0": {"value": "123.45", "unit": "°C"},
"1": {"value": "67.89", "unit": "bar"},
...
"F": {"value": "11.22", "unit": "m/s"}
},
"node_2": { ... }
}
"""
if not mysql_rows:
raise ValueError("Cannot consolidate empty row list")
# Get consolidation key from first row (all rows have same key)
first_row = mysql_rows[0]
config = TABLE_CONFIGS["RAWDATACOR"]
# Build measurements JSONB by node
measurements = {}
mysql_ids = []
for row in mysql_rows:
node_num = row.get("NodeNum")
if node_num is None:
logger.warning(f"Row missing NodeNum, skipping: {row.get(config['mysql_pk'])}")
continue
# Track MySQL IDs for max calculation
mysql_id = row.get(config["mysql_pk"])
if mysql_id:
mysql_ids.append(int(mysql_id))
# Create node key (e.g., "node_1", "node_2")
node_key = f"node_{node_num}"
# Build measurements for this node
node_measurements = {}
val_columns = RAWDATACOR_COLUMNS["val_columns"]
unit_columns = RAWDATACOR_COLUMNS["unit_columns"]
for idx, val_col in enumerate(val_columns):
unit_col = unit_columns[idx]
val = row.get(val_col)
unit = row.get(unit_col)
# Only include non-NULL values
if val is not None:
# Get hex key (0, 1, 2, ..., F)
hex_key = val_col.replace("Val", "")
measurement = {}
# Convert Decimal to float for JSON serialization
if isinstance(val, Decimal):
measurement["value"] = float(val)
else:
measurement["value"] = val
if unit is not None and unit != "":
measurement["unit"] = unit
node_measurements[hex_key] = measurement
measurements[node_key] = node_measurements
# Calculate max MySQL ID
mysql_max_id = max(mysql_ids) if mysql_ids else 0
# Combine EventDate + EventTime into event_timestamp
event_timestamp = _combine_datetime(
first_row.get("EventDate"),
first_row.get("EventTime")
)
# Extract year from event_timestamp for partition key
event_year = event_timestamp.year if event_timestamp else None
# Calculate created_at (minimum from all rows)
# Note: RAWDATACOR does not have updated_at field
created_at_values = [row.get("created_at") for row in mysql_rows if row.get("created_at") is not None]
created_at = min(created_at_values) if created_at_values else None
# Build PostgreSQL row
pg_row = {
"mysql_max_id": mysql_max_id,
"unit_name": first_row.get("UnitName"),
"tool_name_id": first_row.get("ToolNameID"),
"event_timestamp": event_timestamp,
"event_year": event_year,
"measurements": measurements,
"bat_level": first_row.get("BatLevel"),
"temperature": first_row.get("Temperature"),
"bat_level_module": first_row.get("BatLevel_module"),
"temperature_module": first_row.get("Temperature_module"),
"rssi_module": first_row.get("RSSI_module"),
"created_at": created_at,
}
return pg_row
@staticmethod
def consolidate_elabdatadisp(mysql_rows: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Consolidate ELABDATADISP rows by NodeNum into single PostgreSQL row.
Args:
mysql_rows: List of MySQL rows with same (UnitName, ToolNameID, EventDate, EventTime)
but different NodeNum
Returns:
Single PostgreSQL row dict with consolidated measurements
Example measurements structure:
{
"node_1": {
"shifts": {"x": 1.234, "y": 2.345, "z": 3.456, ...},
"coordinates": {"x": 10.123, "y": 20.234, ...},
"kinematics": {"speed": 1.111, ...},
"sensors": {"t_node": 25.5, ...},
"calculated": {"alfa_x": 0.123, ...}
},
"node_2": { ... }
}
"""
if not mysql_rows:
raise ValueError("Cannot consolidate empty row list")
# Get consolidation key from first row (all rows have same key)
first_row = mysql_rows[0]
config = TABLE_CONFIGS["ELABDATADISP"]
# Build measurements JSONB by node
measurements = {}
mysql_ids = []
for row in mysql_rows:
node_num = row.get("NodeNum")
if node_num is None:
logger.warning(f"Row missing NodeNum, skipping: {row.get(config['mysql_pk'])}")
continue
# Track MySQL IDs for max calculation
mysql_id = row.get(config["mysql_pk"])
if mysql_id:
mysql_ids.append(int(mysql_id))
# Create node key (e.g., "node_1", "node_2")
node_key = f"node_{node_num}"
# Build measurements for this node using field mapping
node_measurements = {
"shifts": {},
"coordinates": {},
"kinematics": {},
"sensors": {},
"calculated": {},
}
# Add state and calc_err if present
if "State" in row and row["State"] is not None:
node_measurements["state"] = row["State"]
# MySQL field is 'calcerr' (lowercase), not 'CalcErr'
if "calcerr" in row and row["calcerr"] is not None:
node_measurements["calc_err"] = row["calcerr"]
# Map MySQL fields to JSONB structure
for mysql_field, (category, json_key) in ELABDATADISP_FIELD_MAPPING.items():
value = row.get(mysql_field)
# Only include non-NULL values
if value is not None:
# Convert Decimal to float for JSON serialization
if isinstance(value, Decimal):
value = float(value)
node_measurements[category][json_key] = value
# Remove empty categories
node_measurements = {
k: v for k, v in node_measurements.items()
if v and (not isinstance(v, dict) or len(v) > 0)
}
measurements[node_key] = node_measurements
# Calculate max MySQL ID
mysql_max_id = max(mysql_ids) if mysql_ids else 0
# Combine EventDate + EventTime into event_timestamp
event_timestamp = _combine_datetime(
first_row.get("EventDate"),
first_row.get("EventTime")
)
# Extract year from event_timestamp for partition key
event_year = event_timestamp.year if event_timestamp else None
# Calculate created_at (minimum from all rows) and updated_at (maximum from all rows)
created_at_values = [row.get("created_at") for row in mysql_rows if row.get("created_at") is not None]
updated_at_values = [row.get("updated_at") for row in mysql_rows if row.get("updated_at") is not None]
created_at = min(created_at_values) if created_at_values else None
updated_at = max(updated_at_values) if updated_at_values else None
# Build PostgreSQL row
pg_row = {
"mysql_max_id": mysql_max_id,
"unit_name": first_row.get("UnitName"),
"tool_name_id": first_row.get("ToolNameID"),
"event_timestamp": event_timestamp,
"event_year": event_year,
"measurements": measurements,
"created_at": created_at,
"updated_at": updated_at,
}
return pg_row
@staticmethod
def consolidate(table: str, mysql_rows: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Consolidate rows for the specified table.
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
mysql_rows: List of MySQL rows to consolidate
Returns:
Consolidated PostgreSQL row
Raises:
ValueError: If table is unknown or rows are empty
"""
if not mysql_rows:
raise ValueError("Cannot consolidate empty row list")
table_upper = table.upper()
if table_upper == "RAWDATACOR":
return Consolidator.consolidate_rawdatacor(mysql_rows)
elif table_upper == "ELABDATADISP":
return Consolidator.consolidate_elabdatadisp(mysql_rows)
else:
raise ValueError(f"Unknown table: {table}")
def consolidate_rows(table: str, mysql_rows: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Convenience function to consolidate rows.
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
mysql_rows: List of MySQL rows with same consolidation key
Returns:
Single consolidated PostgreSQL row
"""
return Consolidator.consolidate(table, mysql_rows)

View File

@@ -0,0 +1,240 @@
"""Full migration orchestrator for MySQL to PostgreSQL.
Coordinates migration of all partitions or a specific partition.
"""
from typing import Optional, List
from config import get_settings, TABLE_CONFIGS
from src.connectors.mysql_connector import MySQLConnector
from src.connectors.postgres_connector import PostgreSQLConnector
from src.migrator.partition_migrator import PartitionMigrator
from src.migrator.state_manager import StateManager
from src.utils.logger import get_logger
logger = get_logger(__name__)
class FullMigrator:
"""Orchestrate full migration of all partitions."""
def __init__(self, table: str):
"""Initialize full migrator.
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
"""
if table.upper() not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Unknown table: {table}")
self.table = table.upper()
self.config = TABLE_CONFIGS[self.table]
self.settings = get_settings()
self.partition_migrator = PartitionMigrator(table)
def migrate(
self,
partition: Optional[str] = None,
dry_run: bool = False,
resume: bool = False
) -> int:
"""Perform full migration.
Args:
partition: If specified, migrate only this partition (for testing)
dry_run: If True, log what would be done without modifying data
resume: If True, resume from last checkpoint
Returns:
Total number of PostgreSQL rows migrated
"""
mysql_table = self.config["mysql_table"]
pg_table = self.config["postgres_table"]
logger.info(f"Starting full migration of {mysql_table} -> {pg_table}")
try:
with MySQLConnector() as mysql_conn:
with PostgreSQLConnector() as pg_conn:
# Check PostgreSQL table exists
if not pg_conn.table_exists(pg_table):
raise ValueError(
f"PostgreSQL table {pg_table} does not exist. "
"Run 'setup --create-schema' first."
)
# Initialize state manager (using _global for table-level state)
global_state_mgr = StateManager(pg_conn, pg_table, "_global")
# Get partitions to migrate
if partition:
# Single partition mode
partitions = [partition]
# Check if this partition is in-progress and should be resumed
in_progress_partitions = global_state_mgr.get_in_progress_partitions()
should_resume = resume and partition in in_progress_partitions
logger.info(f"Single partition mode: {partition}" +
(f" (resuming from checkpoint)" if should_resume else ""))
else:
# Full migration: all partitions
all_partitions = mysql_conn.get_table_partitions(mysql_table)
logger.info(f"Found {len(all_partitions)} partitions: {all_partitions}")
# Check resume - get list of completed and in-progress partitions
if resume:
completed_partitions = global_state_mgr.get_completed_partitions()
in_progress_partitions = global_state_mgr.get_in_progress_partitions()
# Skip partitions already completed
partitions = [p for p in all_partitions if p not in completed_partitions]
logger.info(
f"Resuming migration. Completed: {len(completed_partitions)}, "
f"In-progress: {len(in_progress_partitions)}, "
f"Remaining: {len(partitions)} partitions"
)
if in_progress_partitions:
logger.info(f"Will resume in-progress partitions: {in_progress_partitions}")
else:
partitions = all_partitions
in_progress_partitions = []
if dry_run:
logger.info(
f"[DRY RUN] Would migrate {len(partitions)} partition(s): "
f"{partitions}"
)
return 0
# Migrate each partition
total_migrated = 0
for idx, part_name in enumerate(partitions, 1):
# Determine if this partition should be resumed
# In single partition mode, use should_resume from above
# In full mode, check if partition is in-progress
if partition:
# Single partition mode - use the should_resume flag set earlier
do_resume = should_resume
else:
# Full migration mode - resume if partition is in-progress
do_resume = resume and part_name in in_progress_partitions
resume_msg = " (resuming from checkpoint)" if do_resume else ""
logger.info(
f"[{idx}/{len(partitions)}] Migrating partition: {part_name}{resume_msg}"
)
try:
migrated = self.partition_migrator.migrate_partition(
mysql_conn,
pg_conn,
part_name,
dry_run=dry_run,
resume=do_resume
)
total_migrated += migrated
logger.info(
f"✓ Partition {part_name} complete: "
f"{migrated} rows, total: {total_migrated}"
)
except Exception as e:
logger.error(
f"Failed to migrate partition {part_name}: {e}"
)
raise
# Get final row count from PostgreSQL
final_count = pg_conn.get_row_count(pg_table)
# Only mark global migration complete if ALL partitions are completed
# Check if there are any in-progress or pending partitions remaining
if partition:
# Single partition mode - don't update global state
logger.info(
f"✓ Partition migration complete: {total_migrated:,} rows migrated in this run, "
f"{final_count:,} total rows in {pg_table}"
)
else:
# Full migration mode - check if everything is really done
completed_partitions = global_state_mgr.get_completed_partitions()
in_progress_check = global_state_mgr.get_in_progress_partitions()
all_partitions_check = mysql_conn.get_table_partitions(mysql_table)
if len(completed_partitions) == len(all_partitions_check) and len(in_progress_check) == 0:
# All partitions are completed - mark global as complete
# Get the maximum last_key across all partitions for incremental migration
max_last_key = global_state_mgr.get_max_last_key_across_partitions()
if max_last_key:
logger.info(
f"Setting global last_key: "
f"({max_last_key.get('unit_name')}, {max_last_key.get('tool_name_id')}, "
f"{max_last_key.get('event_date')}, {max_last_key.get('event_time')})"
)
global_state_mgr.update_state(last_key=max_last_key)
global_state_mgr.mark_completed(final_count)
logger.info(
f"✓ Full migration complete: {total_migrated:,} rows migrated in this run, "
f"{final_count:,} total rows in {pg_table}"
)
else:
logger.info(
f"✓ Migration batch complete: {total_migrated:,} rows migrated in this run, "
f"{final_count:,} total rows in {pg_table}. "
f"{len(completed_partitions)}/{len(all_partitions_check)} partitions completed."
)
return total_migrated
except Exception as e:
logger.error(f"Full migration failed: {e}")
raise
def _get_remaining_partitions(
self,
all_partitions: List[str],
last_completed: str
) -> List[str]:
"""Get list of partitions that still need to be migrated.
Args:
all_partitions: All partition names from MySQL
last_completed: Last partition that was completed
Returns:
List of partition names to migrate
"""
try:
# Find index of last completed partition
idx = all_partitions.index(last_completed)
# Return partitions after it
return all_partitions[idx + 1:]
except ValueError:
# last_completed not in list, return all
logger.warning(
f"Last completed partition {last_completed} not found in partition list. "
"Starting from beginning."
)
return all_partitions
def run_full_migration(
table: str,
partition: Optional[str] = None,
dry_run: bool = False,
resume: bool = False
) -> int:
"""Run full migration for a table.
Args:
table: Table name to migrate (RAWDATACOR or ELABDATADISP)
partition: If specified, migrate only this partition
dry_run: If True, show what would be done without modifying data
resume: If True, resume from last checkpoint
Returns:
Number of rows migrated
"""
migrator = FullMigrator(table)
return migrator.migrate(partition=partition, dry_run=dry_run, resume=resume)

View File

@@ -0,0 +1,324 @@
"""Incremental migration from MySQL to PostgreSQL.
Migrates only new data since last full/incremental migration based on consolidation keys.
"""
from typing import Optional
from config import get_settings, TABLE_CONFIGS
from src.connectors.mysql_connector import MySQLConnector
from src.connectors.postgres_connector import PostgreSQLConnector
from src.migrator.consolidator import consolidate_rows
from src.migrator.state_manager import StateManager
from src.utils.logger import get_logger
from src.utils.progress import ProgressTracker
logger = get_logger(__name__)
class IncrementalMigrator:
"""Perform incremental migration based on consolidation keys."""
def __init__(self, table: str):
"""Initialize incremental migrator.
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
"""
if table.upper() not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Unknown table: {table}")
self.table = table.upper()
self.config = TABLE_CONFIGS[self.table]
self.settings = get_settings()
def migrate(self, dry_run: bool = False) -> int:
"""Perform incremental migration since last migration.
Migrates only consolidation keys that come AFTER the last migrated key.
Args:
dry_run: If True, log what would be done without modifying data
Returns:
Number of PostgreSQL rows migrated
"""
mysql_table = self.config["mysql_table"]
pg_table = self.config["postgres_table"]
logger.info(f"Starting incremental migration of {mysql_table} -> {pg_table}")
try:
with MySQLConnector() as mysql_conn:
with PostgreSQLConnector() as pg_conn:
# Check PostgreSQL table exists
if not pg_conn.table_exists(pg_table):
raise ValueError(
f"PostgreSQL table {pg_table} does not exist. "
"Run 'setup --create-schema' first."
)
# Initialize state manager
state_mgr = StateManager(pg_conn, pg_table)
# Get last migrated key from migration_state
# This was saved during the last full/incremental migration
last_key = state_mgr.get_last_key()
if last_key is None:
logger.info(
f"No previous migration found for {pg_table}. "
"Run 'migrate full' for initial migration."
)
return 0
logger.info(
f"Last migrated key (from migration_state): "
f"({last_key.get('unit_name')}, {last_key.get('tool_name_id')}, "
f"{last_key.get('event_date')}, {last_key.get('event_time')})"
)
# Get max MySQL ID already migrated to optimize query performance
cursor = pg_conn.connection.cursor()
cursor.execute(f"SELECT MAX(mysql_max_id) FROM {pg_table}")
result = cursor.fetchone()
max_mysql_id = result[0] if result and result[0] else 0
logger.info(f"Max MySQL ID already migrated: {max_mysql_id}")
if dry_run:
# In dry-run, check how many new keys exist in MySQL
logger.info("[DRY RUN] Checking for new keys in MySQL...")
# Sample first 100 keys to check if there are new records
sample_keys = mysql_conn.fetch_consolidation_keys_after(
mysql_table,
after_key=last_key,
min_mysql_id=max_mysql_id,
limit=100,
offset=0
)
if sample_keys:
# If we found 100 keys in the sample, there might be many more
# Try to get a rough count by checking larger offsets
if len(sample_keys) == 100:
# There are at least 100 keys, check if there are more
logger.info(
f"[DRY RUN] Found at least 100 new keys, checking total count..."
)
# Sample at different offsets to estimate total
test_batch = mysql_conn.fetch_consolidation_keys_after(
mysql_table,
after_key=last_key,
min_mysql_id=max_mysql_id,
limit=1,
offset=1000
)
if test_batch:
logger.info(f"[DRY RUN] Estimated: More than 1000 new keys to migrate")
else:
logger.info(f"[DRY RUN] Estimated: Between 100-1000 new keys to migrate")
else:
logger.info(f"[DRY RUN] Found {len(sample_keys)} new keys to migrate")
logger.info("[DRY RUN] First 3 keys:")
for i, key in enumerate(sample_keys[:3]):
logger.info(
f" {i+1}. ({key.get('UnitName')}, {key.get('ToolNameID')}, "
f"{key.get('EventDate')}, {key.get('EventTime')})"
)
logger.info(
f"[DRY RUN] Run without --dry-run to perform actual migration"
)
# Return a positive number to indicate there's data to migrate
return len(sample_keys)
else:
logger.info("[DRY RUN] No new keys found - database is up to date")
return 0
# Migrate new keys
migrated_rows = 0
offset = 0
insert_buffer = []
buffer_size = self.settings.migration.consolidation_group_limit // 10
with ProgressTracker(
total=None, # Unknown total
description=f"Incremental migration of {mysql_table}"
) as progress:
# Get column order for PostgreSQL insert
pg_columns = self._get_pg_columns()
while True:
# Fetch batch of consolidation keys AFTER last_key
logger.debug(f"Fetching keys after last_key with offset={offset}")
keys = mysql_conn.fetch_consolidation_keys_after(
mysql_table,
after_key=last_key,
min_mysql_id=max_mysql_id,
limit=self.settings.migration.consolidation_group_limit,
offset=offset
)
if not keys:
logger.info("No more new keys to migrate")
break
logger.info(f"Processing {len(keys)} new keys (offset={offset})")
# Process each consolidation key
keys_processed = 0
for key in keys:
keys_processed += 1
# Log progress every 1000 keys
if keys_processed % 1000 == 0:
logger.info(f" Processed {keys_processed}/{len(keys)} keys in this batch...")
unit_name = key.get("UnitName")
tool_name_id = key.get("ToolNameID")
event_date = key.get("EventDate")
event_time = key.get("EventTime")
# Fetch all MySQL rows for this key (all nodes, all partitions)
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
mysql_table,
unit_name,
tool_name_id,
event_date,
event_time
)
if not mysql_rows:
logger.warning(
f"No records found for key: "
f"({unit_name}, {tool_name_id}, {event_date}, {event_time})"
)
continue
# Consolidate into single PostgreSQL row
try:
pg_row = consolidate_rows(self.table, mysql_rows)
except Exception as e:
logger.error(
f"Failed to consolidate key "
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}"
)
continue
# Add to insert buffer
insert_buffer.append(pg_row)
# Flush buffer when full
if len(insert_buffer) >= buffer_size:
# Use COPY with ON CONFLICT to handle duplicates
inserted = pg_conn.copy_from_with_conflict(
pg_table,
insert_buffer,
pg_columns,
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
)
migrated_rows += inserted
progress.update(inserted)
# Update state with last key
last_processed_key = {
"unit_name": unit_name,
"tool_name_id": tool_name_id,
"event_date": str(event_date) if event_date else None,
"event_time": str(event_time) if event_time else None,
}
state_mgr.update_state(
last_key=last_processed_key,
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
)
logger.debug(
f"Flushed {inserted} rows, total new: {migrated_rows}"
)
insert_buffer = []
# Move to next batch of keys
offset += len(keys)
# If we got fewer keys than requested, we're done
if len(keys) < self.settings.migration.consolidation_group_limit:
break
# Flush remaining buffer
if insert_buffer:
# Use COPY with ON CONFLICT to handle duplicates
inserted = pg_conn.copy_from_with_conflict(
pg_table,
insert_buffer,
pg_columns,
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
)
migrated_rows += inserted
progress.update(inserted)
logger.debug(f"Final flush: {inserted} rows")
# Get final row count
final_count = pg_conn.get_row_count(pg_table)
logger.info(f"Total PostgreSQL rows: {final_count}")
logger.info(
f"✓ Incremental migration complete: "
f"{migrated_rows} new rows migrated to {pg_table}"
)
return migrated_rows
except Exception as e:
logger.error(f"Incremental migration failed: {e}")
raise
def _get_pg_columns(self) -> list[str]:
"""Get PostgreSQL column names in insertion order.
Returns:
List of column names
"""
# Base columns for both tables
columns = [
"mysql_max_id",
"unit_name",
"tool_name_id",
"event_timestamp",
"event_year",
"measurements",
]
# Add table-specific columns
if self.table == "RAWDATACOR":
columns.extend([
"bat_level",
"temperature",
"bat_level_module",
"temperature_module",
"rssi_module",
"created_at",
])
elif self.table == "ELABDATADISP":
columns.extend([
"created_at",
"updated_at",
])
return columns
def run_incremental_migration(
table: str,
dry_run: bool = False
) -> int:
"""Run incremental migration for a table.
Args:
table: Table name to migrate (RAWDATACOR or ELABDATADISP)
dry_run: If True, show what would be done without modifying data
Returns:
Number of rows migrated
"""
migrator = IncrementalMigrator(table)
return migrator.migrate(dry_run=dry_run)

View File

@@ -0,0 +1,220 @@
"""Parallel migration orchestrator for MySQL to PostgreSQL.
Runs multiple partition migrations in parallel using multiprocessing.
Each process migrates a different partition independently.
"""
import multiprocessing as mp
from typing import Optional, List
import sys
from config import get_settings, TABLE_CONFIGS
from src.connectors.mysql_connector import MySQLConnector
from src.connectors.postgres_connector import PostgreSQLConnector
from src.migrator.partition_migrator import PartitionMigrator
from src.migrator.state_manager import StateManager
from src.utils.logger import get_logger
logger = get_logger(__name__)
def migrate_partition_worker(table: str, partition_name: str, dry_run: bool = False, resume: bool = False) -> tuple[str, int, bool]:
"""Worker function to migrate a single partition.
Runs in a separate process.
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
partition_name: Partition to migrate
dry_run: If True, simulate without writing
resume: If True, resume from last checkpoint if exists
Returns:
Tuple of (partition_name, rows_migrated, success)
"""
# Configure logging for this worker process (multiprocessing requires per-process setup)
# Set up root logger so all child loggers (PartitionMigrator, ProgressTracker, etc.) work
from src.utils.logger import setup_logger
setup_logger("") # Configure root logger for this process
# Now get a logger for this worker
from src.utils.logger import get_logger
worker_logger = get_logger(f"worker.{partition_name}")
try:
resume_msg = " (resuming)" if resume else ""
worker_logger.info(f"Worker starting: {partition_name}{resume_msg}")
with MySQLConnector() as mysql_conn:
with PostgreSQLConnector() as pg_conn:
migrator = PartitionMigrator(table)
rows = migrator.migrate_partition(
mysql_conn,
pg_conn,
partition_name,
dry_run=dry_run,
resume=resume
)
worker_logger.info(f"Worker completed: {partition_name} ({rows} rows)")
return (partition_name, rows, True)
except Exception as e:
worker_logger.error(f"Worker failed for {partition_name}: {e}")
return (partition_name, 0, False)
class ParallelMigrator:
"""Orchestrate parallel migration of multiple partitions."""
def __init__(self, table: str):
"""Initialize parallel migrator.
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
"""
if table.upper() not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Unknown table: {table}")
self.table = table.upper()
self.config = TABLE_CONFIGS[self.table]
self.settings = get_settings()
def migrate(
self,
num_workers: int = 5,
dry_run: bool = False,
resume: bool = False
) -> int:
"""Perform parallel migration of all partitions.
Args:
num_workers: Number of parallel workers
dry_run: If True, log what would be done without modifying data
resume: If True, skip already completed partitions
Returns:
Total number of PostgreSQL rows migrated
"""
mysql_table = self.config["mysql_table"]
pg_table = self.config["postgres_table"]
logger.info(
f"Starting parallel migration of {mysql_table} -> {pg_table} "
f"with {num_workers} workers"
)
try:
with MySQLConnector() as mysql_conn:
with PostgreSQLConnector() as pg_conn:
# Check PostgreSQL table exists
if not pg_conn.table_exists(pg_table):
raise ValueError(
f"PostgreSQL table {pg_table} does not exist. "
"Run 'setup --create-schema' first."
)
# Get all partitions
all_partitions = mysql_conn.get_table_partitions(mysql_table)
logger.info(f"Found {len(all_partitions)} partitions: {all_partitions}")
# Filter out completed partitions if resuming
partitions_to_migrate = all_partitions
if resume:
state_mgr = StateManager(pg_conn, pg_table, "_global")
completed = state_mgr.get_completed_partitions()
if completed:
partitions_to_migrate = [p for p in all_partitions if p not in completed]
logger.info(
f"Resuming: {len(completed)} partitions already completed, "
f"{len(partitions_to_migrate)} remaining"
)
if not partitions_to_migrate:
logger.info("All partitions already migrated")
return 0
if dry_run:
logger.info(
f"[DRY RUN] Would migrate {len(partitions_to_migrate)} partitions "
f"in parallel with {num_workers} workers"
)
return 0
# Run migrations in parallel using multiprocessing pool
logger.info(f"Launching {num_workers} worker processes...")
with mp.Pool(processes=num_workers) as pool:
# Create tasks for each partition
tasks = [
(self.table, partition_name, dry_run, resume)
for partition_name in partitions_to_migrate
]
# Run migrations in parallel
results = pool.starmap(migrate_partition_worker, tasks)
# Collect results
total_migrated = 0
failed_partitions = []
for partition_name, rows, success in results:
if success:
total_migrated += rows
logger.info(f"{partition_name}: {rows} rows")
else:
failed_partitions.append(partition_name)
logger.error(f"{partition_name}: FAILED")
if failed_partitions:
raise Exception(
f"Migration failed for {len(failed_partitions)} partitions: "
f"{', '.join(failed_partitions)}"
)
# Only mark global migration complete if ALL partitions are completed
with PostgreSQLConnector() as pg_conn:
final_count = pg_conn.get_row_count(pg_table)
global_state_mgr = StateManager(pg_conn, pg_table, "_global")
# Verify all partitions are actually completed
completed = global_state_mgr.get_completed_partitions()
in_progress = global_state_mgr.get_in_progress_partitions()
if len(in_progress) == 0:
# All partitions completed successfully
global_state_mgr.mark_completed(final_count)
logger.info(
f"✓ Parallel migration complete: {final_count} total rows in {pg_table}"
)
else:
logger.warning(
f"Migration finished but {len(in_progress)} partitions still in-progress: {in_progress}. "
f"Not marking global as completed."
)
return final_count
except Exception as e:
logger.error(f"Parallel migration failed: {e}")
raise
def run_parallel_migration(
table: str,
num_workers: int = 5,
dry_run: bool = False,
resume: bool = False
) -> int:
"""Run parallel migration for a table.
Args:
table: Table name to migrate (RAWDATACOR or ELABDATADISP)
num_workers: Number of parallel workers
dry_run: If True, show what would be done without modifying data
resume: If True, skip already completed partitions
Returns:
Number of rows migrated
"""
migrator = ParallelMigrator(table)
return migrator.migrate(num_workers=num_workers, dry_run=dry_run, resume=resume)

View File

@@ -0,0 +1,355 @@
"""Partition-based migration for MySQL to PostgreSQL.
Streaming Strategy:
1. Stream all rows from partition ordered by consolidation key (single query)
2. Group rows by consolidation key as they arrive
3. Validate and consolidate each group
4. Batch insert using PostgreSQL COPY (10-100x faster than INSERT)
5. Log invalid keys to error file
6. Update migration state periodically with resume support
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from config import get_settings, TABLE_CONFIGS
from src.connectors.mysql_connector import MySQLConnector
from src.connectors.postgres_connector import PostgreSQLConnector
from src.migrator.consolidator import consolidate_rows
from src.migrator.state_manager import StateManager
from src.utils.logger import get_logger
from src.utils.progress import ProgressTracker
from src.utils.validation import validate_consolidation_key, ErrorLogger
logger = get_logger(__name__)
class PartitionMigrator:
"""Migrate a single MySQL partition to PostgreSQL with consolidation."""
def __init__(self, table: str):
"""Initialize partition migrator.
Args:
table: Table name (RAWDATACOR or ELABDATADISP)
"""
if table.upper() not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Unknown table: {table}")
self.table = table.upper()
self.config = TABLE_CONFIGS[self.table]
self.settings = get_settings()
def migrate_partition_streaming(
self,
mysql_conn: MySQLConnector,
pg_conn: PostgreSQLConnector,
partition_name: str,
dry_run: bool = False,
resume: bool = False
) -> int:
"""Migrate a partition using streaming (OPTIMIZED - eliminates N+1 queries).
This method streams all rows from the partition in a single query,
groups them by consolidation key, and uses PostgreSQL COPY for fast inserts.
Performance improvement: 10-100x faster than old N+1 query approach.
Args:
mysql_conn: MySQL connector (already connected)
pg_conn: PostgreSQL connector (already connected)
partition_name: MySQL partition name (e.g., 'p2024')
dry_run: If True, log what would be done without modifying data
resume: If True, resume from last checkpoint
Returns:
Number of PostgreSQL rows inserted (consolidated)
"""
mysql_table = self.config["mysql_table"]
pg_table = self.config["postgres_table"]
logger.info(f"Starting STREAMING migration of partition {partition_name} from {mysql_table}")
# Initialize state manager for this partition
state_mgr = StateManager(pg_conn, pg_table, partition_name)
# Initialize error logger
error_logger = ErrorLogger(pg_table, partition_name)
# Check resume state
resume_from_key = None
start_key_tuple = None
if resume:
last_key = state_mgr.get_last_key()
if last_key:
resume_from_key = last_key
# Convert to tuple for MySQL query
start_key_tuple = (
last_key.get("unit_name"),
last_key.get("tool_name_id"),
last_key.get("event_date"),
last_key.get("event_time")
)
logger.info(
f"Resuming AFTER last key: "
f"({start_key_tuple[0]}, {start_key_tuple[1]}, "
f"{start_key_tuple[2]}, {start_key_tuple[3]})"
)
if dry_run:
logger.info(f"[DRY RUN] Would stream partition {partition_name}")
return 0
# Track migration
state_mgr.mark_in_progress()
migrated_rows = 0
insert_buffer = []
buffer_size = 10000 # Larger buffer for COPY performance
# Get column order for PostgreSQL insert
pg_columns = self._get_pg_columns()
# Group rows by consolidation key
current_group: List[Dict[str, Any]] = []
current_key = None
rows_processed = 0
migration_completed = False
try:
with ProgressTracker(
total=None, # Unknown total
description=f"Streaming {mysql_table} partition {partition_name}"
) as progress:
# Use fetch_consolidation_groups_from_partition with start_key for efficient resume
# MySQL will skip all keys <= start_key using WHERE clause (no unnecessary data transfer)
for group in mysql_conn.fetch_consolidation_groups_from_partition(
mysql_table,
partition_name,
limit=self.settings.migration.consolidation_group_limit,
offset=0,
start_key=start_key_tuple # Resume AFTER this key
):
rows_processed += len(group)
# Extract consolidation key from first row
if not group:
continue
first_row = group[0]
key = (
first_row.get("UnitName"),
first_row.get("ToolNameID"),
first_row.get("EventDate"),
first_row.get("EventTime")
)
# No need to skip here anymore - MySQL query already handles resume
# (keys are filtered in the database with WHERE clause)
# Validate and process group
self._process_group(
group,
key,
insert_buffer,
buffer_size,
pg_conn,
pg_table,
pg_columns,
state_mgr,
error_logger,
progress
)
# Update migrated count
if len(insert_buffer) == 0: # Just flushed
migrated_rows = state_mgr.get_state().get("total_rows_migrated", migrated_rows)
# Flush remaining buffer
if insert_buffer:
inserted = pg_conn.copy_from_with_conflict(
pg_table,
insert_buffer,
pg_columns,
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
)
migrated_rows += inserted
progress.update(inserted)
logger.debug(f"Final flush: {inserted} rows")
# If we reach here, migration completed successfully
migration_completed = True
finally:
# Close error logger
error_logger.close()
# Only mark as completed if migration actually finished
if migration_completed:
state_mgr.update_state(
total_rows_migrated=migrated_rows,
status="completed",
mark_completed=True
)
else:
# Migration was interrupted - save progress but keep status as in_progress
logger.warning(f"Migration of partition {partition_name} was interrupted")
state_mgr.update_state(
total_rows_migrated=migrated_rows,
status="in_progress"
)
logger.info(
f"✓ Partition {partition_name} STREAMING migration complete: "
f"{migrated_rows} PostgreSQL rows inserted, "
f"{rows_processed} MySQL rows processed, "
f"{error_logger.get_error_count()} invalid keys skipped"
)
return migrated_rows
def _process_group(
self,
mysql_rows: List[Dict[str, Any]],
key: tuple,
insert_buffer: List[Dict[str, Any]],
buffer_size: int,
pg_conn: PostgreSQLConnector,
pg_table: str,
pg_columns: List[str],
state_mgr: StateManager,
error_logger: ErrorLogger,
progress: ProgressTracker
) -> None:
"""Process a consolidation group: validate, consolidate, and buffer.
Args:
mysql_rows: List of MySQL rows with same consolidation key
key: Consolidation key tuple (unit_name, tool_name_id, event_date, event_time)
insert_buffer: Buffer to add consolidated row to
buffer_size: Max buffer size before flush
pg_conn: PostgreSQL connector
pg_table: PostgreSQL table name
pg_columns: Column names for insert
state_mgr: State manager
error_logger: Error logger
progress: Progress tracker
"""
unit_name, tool_name_id, event_date, event_time = key
# Validate consolidation key
is_valid, error_reason = validate_consolidation_key(
unit_name, tool_name_id, event_date, event_time
)
if not is_valid:
# Log invalid key and skip
error_logger.log_invalid_key(
unit_name, tool_name_id, event_date, event_time, error_reason
)
return
# Consolidate into single PostgreSQL row
try:
pg_row = consolidate_rows(self.table, mysql_rows)
except Exception as e:
logger.error(
f"Failed to consolidate key "
f"({unit_name}, {tool_name_id}, {event_date}, {event_time}): {e}"
)
error_logger.log_invalid_key(
unit_name, tool_name_id, event_date, event_time,
f"Consolidation failed: {e}"
)
return
# Add to insert buffer
insert_buffer.append(pg_row)
# Flush buffer when full
if len(insert_buffer) >= buffer_size:
# Use COPY with ON CONFLICT to handle UNIQUE constraint
# (unit_name, tool_name_id, event_timestamp, event_year)
inserted = pg_conn.copy_from_with_conflict(
pg_table,
insert_buffer,
pg_columns,
conflict_columns=["unit_name", "tool_name_id", "event_timestamp", "event_year"]
)
progress.update(inserted)
# Update state with last key
last_processed_key = {
"unit_name": unit_name,
"tool_name_id": tool_name_id,
"event_date": str(event_date) if event_date else None,
"event_time": str(event_time) if event_time else None,
}
current_total = state_mgr.get_state().get("total_rows_migrated", 0)
state_mgr.update_state(
last_key=last_processed_key,
total_rows_migrated=current_total + inserted
)
logger.debug(
f"Flushed {inserted} rows using COPY, "
f"total migrated: {current_total + inserted}"
)
insert_buffer.clear()
def migrate_partition(
self,
mysql_conn: MySQLConnector,
pg_conn: PostgreSQLConnector,
partition_name: str,
dry_run: bool = False,
resume: bool = False
) -> int:
"""Migrate a single partition from MySQL to PostgreSQL using streaming.
Args:
mysql_conn: MySQL connector (already connected)
pg_conn: PostgreSQL connector (already connected)
partition_name: MySQL partition name (e.g., 'p2024')
dry_run: If True, log what would be done without modifying data
resume: If True, resume from last checkpoint
Returns:
Number of PostgreSQL rows inserted (consolidated)
"""
return self.migrate_partition_streaming(
mysql_conn, pg_conn, partition_name, dry_run, resume
)
def _get_pg_columns(self) -> list[str]:
"""Get PostgreSQL column names in insertion order.
Returns:
List of column names
"""
# Base columns for both tables
columns = [
"mysql_max_id",
"unit_name",
"tool_name_id",
"event_timestamp",
"event_year",
"measurements",
]
# Add table-specific columns
if self.table == "RAWDATACOR":
columns.extend([
"bat_level",
"temperature",
"bat_level_module",
"temperature_module",
"rssi_module",
"created_at",
])
elif self.table == "ELABDATADISP":
columns.extend([
"created_at",
"updated_at",
])
return columns

View File

@@ -0,0 +1,347 @@
"""Migration state management using PostgreSQL migration_state table.
Tracks migration progress with:
- last_key: Last consolidation key migrated (UnitName, ToolNameID, EventDate, EventTime)
- last_completed_partition: Last partition that was fully migrated
- total_rows_migrated: Count of PostgreSQL rows (consolidated)
- status: pending, in_progress, completed
"""
from typing import Optional, Dict, Any
from datetime import datetime
import json
from src.connectors.postgres_connector import PostgreSQLConnector
from src.utils.logger import get_logger
logger = get_logger(__name__)
class StateManager:
"""Manage migration state in PostgreSQL migration_state table.
Supports per-partition state tracking for parallel migration.
"""
def __init__(self, pg_conn: PostgreSQLConnector, table_name: str, partition_name: str = "_global"):
"""Initialize state manager.
Args:
pg_conn: PostgreSQL connector
table_name: PostgreSQL table name (rawdatacor or elabdatadisp)
partition_name: MySQL partition name (e.g., 'd0', 'part0') or '_global' for global state
"""
self.pg_conn = pg_conn
self.table_name = table_name
self.partition_name = partition_name
def get_last_key(self) -> Optional[Dict[str, Any]]:
"""Get last consolidation key that was migrated for this partition.
Returns:
Dict with keys: unit_name, tool_name_id, event_date, event_time
Or None if no previous migration
"""
try:
with self.pg_conn.connection.cursor() as cursor:
cursor.execute(
"SELECT last_key FROM migration_state WHERE table_name = %s AND partition_name = %s",
(self.table_name, self.partition_name)
)
result = cursor.fetchone()
if result and result[0]:
# last_key is stored as JSONB, psycopg returns it as dict
return result[0]
return None
except Exception as e:
logger.debug(f"Could not get last_key from migration_state: {e}")
return None
def get_total_rows_migrated(self) -> int:
"""Get total rows migrated for this partition.
Returns:
Count of PostgreSQL rows (consolidated)
"""
try:
with self.pg_conn.connection.cursor() as cursor:
cursor.execute(
"SELECT total_rows_migrated FROM migration_state WHERE table_name = %s AND partition_name = %s",
(self.table_name, self.partition_name)
)
result = cursor.fetchone()
return result[0] if result and result[0] else 0
except Exception as e:
logger.debug(f"Could not get total_rows_migrated from migration_state: {e}")
return 0
def get_status(self) -> str:
"""Get migration status for this partition.
Returns:
Status string: pending, in_progress, or completed
"""
try:
with self.pg_conn.connection.cursor() as cursor:
cursor.execute(
"SELECT status FROM migration_state WHERE table_name = %s AND partition_name = %s",
(self.table_name, self.partition_name)
)
result = cursor.fetchone()
return result[0] if result and result[0] else "pending"
except Exception as e:
logger.debug(f"Could not get status from migration_state: {e}")
return "pending"
def get_state(self) -> Dict[str, Any]:
"""Get complete migration state for this partition.
Returns:
Dict with keys: last_key, total_rows_migrated, status, migration_completed_at
"""
try:
with self.pg_conn.connection.cursor() as cursor:
cursor.execute(
"SELECT last_key, total_rows_migrated, status, migration_completed_at "
"FROM migration_state WHERE table_name = %s AND partition_name = %s",
(self.table_name, self.partition_name)
)
result = cursor.fetchone()
if result:
return {
"last_key": result[0],
"total_rows_migrated": result[1] if result[1] else 0,
"status": result[2] if result[2] else "pending",
"migration_completed_at": result[3]
}
return {
"last_key": None,
"total_rows_migrated": 0,
"status": "pending",
"migration_completed_at": None
}
except Exception as e:
logger.debug(f"Could not get state from migration_state: {e}")
return {
"last_key": None,
"total_rows_migrated": 0,
"status": "pending",
"migration_completed_at": None
}
def get_completed_partitions(self) -> list[str]:
"""Get list of all completed partitions for this table.
Returns:
List of partition names that have been completed
"""
try:
with self.pg_conn.connection.cursor() as cursor:
cursor.execute(
"SELECT partition_name FROM migration_state WHERE table_name = %s AND status = 'completed'",
(self.table_name,)
)
results = cursor.fetchall()
return [row[0] for row in results if row[0] != "_global"]
except Exception as e:
logger.debug(f"Could not get completed partitions from migration_state: {e}")
return []
def get_in_progress_partitions(self) -> list[str]:
"""Get list of all in-progress partitions for this table.
Returns:
List of partition names that are currently in progress
"""
try:
with self.pg_conn.connection.cursor() as cursor:
cursor.execute(
"SELECT partition_name FROM migration_state WHERE table_name = %s AND status = 'in_progress'",
(self.table_name,)
)
results = cursor.fetchall()
return [row[0] for row in results if row[0] != "_global"]
except Exception as e:
logger.debug(f"Could not get in-progress partitions from migration_state: {e}")
return []
def update_state(
self,
last_key: Optional[Dict[str, Any]] = None,
total_rows_migrated: Optional[int] = None,
status: Optional[str] = None,
mark_completed: bool = False
) -> None:
"""Update migration state for this partition.
Args:
last_key: Last consolidation key migrated
total_rows_migrated: Total PostgreSQL rows migrated for this partition
status: Migration status (pending, in_progress, completed)
mark_completed: If True, mark migration as completed with timestamp
"""
try:
with self.pg_conn.connection.cursor() as cursor:
# Build dynamic UPDATE based on provided parameters
updates = []
params = []
if last_key is not None:
updates.append("last_key = %s::jsonb")
params.append(json.dumps(last_key))
if total_rows_migrated is not None:
updates.append("total_rows_migrated = %s")
params.append(total_rows_migrated)
if status is not None:
updates.append("status = %s")
params.append(status)
if mark_completed:
updates.append("migration_completed_at = %s")
params.append(datetime.utcnow())
if status is None:
updates.append("status = 'completed'")
if not updates:
logger.warning("update_state called with no parameters to update")
return
# Upsert: INSERT or UPDATE
query = f"""
INSERT INTO migration_state (table_name, partition_name, {', '.join([u.split('=')[0].strip() for u in updates])})
VALUES (%s, %s, {', '.join(['%s'] * len(updates))})
ON CONFLICT (table_name, partition_name) DO UPDATE SET
{', '.join(updates)}
"""
all_params = [self.table_name, self.partition_name] + params + params
cursor.execute(query, tuple(all_params))
self.pg_conn.connection.commit()
logger.debug(
f"Migration state updated for {self.table_name}/{self.partition_name}: "
f"last_key={last_key}, total={total_rows_migrated}, status={status}"
)
except Exception as e:
logger.error(f"Failed to update migration state: {e}")
self.pg_conn.connection.rollback()
raise
def reset_state(self) -> None:
"""Reset migration state for this partition.
Deletes the state record for this specific partition.
"""
try:
with self.pg_conn.connection.cursor() as cursor:
cursor.execute(
"DELETE FROM migration_state WHERE table_name = %s AND partition_name = %s",
(self.table_name, self.partition_name)
)
self.pg_conn.connection.commit()
logger.info(f"Migration state reset for {self.table_name}/{self.partition_name}")
except Exception as e:
logger.error(f"Failed to reset migration state: {e}")
self.pg_conn.connection.rollback()
raise
def mark_in_progress(self) -> None:
"""Mark migration as in progress."""
self.update_state(status="in_progress")
def mark_completed(self, total_rows: int) -> None:
"""Mark migration as completed.
Args:
total_rows: Final count of migrated rows
"""
self.update_state(
total_rows_migrated=total_rows,
status="completed",
mark_completed=True
)
def get_max_last_key_across_partitions(self) -> Optional[Dict[str, Any]]:
"""Get the maximum (latest) last_key across all partitions for this table.
Used when completing full migration to set the global last_key for incremental migration.
The consolidation key ordering is: (UnitName, ToolNameID, EventDate, EventTime)
To find the maximum, we order by these fields in DESC order.
Returns:
The latest last_key across all partitions, or None if no partitions have keys
"""
try:
with self.pg_conn.connection.cursor() as cursor:
# Order by the full consolidation key (UnitName, ToolNameID, EventDate, EventTime) DESC
# This matches the ordering used in MySQL fetch_consolidation_keys_after
cursor.execute(
"""
SELECT last_key
FROM migration_state
WHERE table_name = %s AND partition_name != '_global' AND last_key IS NOT NULL
ORDER BY
last_key->>'unit_name' DESC,
last_key->>'tool_name_id' DESC,
(last_key->>'event_date')::date DESC,
(last_key->>'event_time')::time DESC
LIMIT 1
""",
(self.table_name,)
)
result = cursor.fetchone()
if result and result[0]:
return result[0]
return None
except Exception as e:
logger.debug(f"Could not get max last_key across partitions: {e}")
return None
def get_max_last_key_from_data(self) -> Optional[Dict[str, Any]]:
"""Get the consolidation key of the most recent event in PostgreSQL.
This queries the target table directly (not migration_state) to find the row with
the maximum event_timestamp (most recent event), then returns its consolidation key.
We order by event_timestamp only (not the full consolidation key) because:
1. Ordering by unit_name can pick up corrupted data (Java error strings)
2. event_timestamp represents the actual chronological order of events
3. This avoids re-processing old data that happens to sort later alphabetically
Corrupted data (like '[Ljava.lang.String;@...' in unit_name) is explicitly excluded.
Returns:
The consolidation key of the most recent event, or None if table is empty
"""
try:
with self.pg_conn.connection.cursor() as cursor:
# Find the row with maximum event_timestamp (most recent event)
# We use ONLY event_timestamp because ordering by unit_name can pick up
# corrupted data (like Java error strings) which sort incorrectly
cursor.execute(
f"""
SELECT unit_name, tool_name_id,
DATE(event_timestamp)::text as event_date,
event_timestamp::time::text as event_time,
event_timestamp
FROM {self.table_name}
WHERE unit_name NOT LIKE '[L%' -- Exclude corrupted Java strings
ORDER BY event_timestamp DESC
LIMIT 1
"""
)
result = cursor.fetchone()
if result:
return {
"unit_name": result[0],
"tool_name_id": result[1],
"event_date": result[2],
"event_time": result[3]
}
return None
except Exception as e:
logger.error(f"Could not get max last_key from data table {self.table_name}: {e}")
return None

View File

@@ -1,4 +1,12 @@
"""PostgreSQL schema creation from MySQL structure.""" """PostgreSQL schema creation for MySQL to PostgreSQL migration.
New design:
- id: BIGSERIAL PRIMARY KEY (auto-increment)
- mysql_max_id: max(idElabData) from consolidated MySQL records
- event_timestamp: TIMESTAMP created from MySQL EventDate + EventTime
- Consolidation key MySQL: (UnitName, ToolNameID, EventDate, EventTime)
- NodeNum consolidated in measurements JSONB
"""
from config import PARTITION_YEARS from config import PARTITION_YEARS
from src.utils.logger import get_logger from src.utils.logger import get_logger
@@ -8,31 +16,37 @@ logger = get_logger(__name__)
def create_rawdatacor_schema() -> str: def create_rawdatacor_schema() -> str:
"""Create PostgreSQL schema for RAWDATACOR table. """Create PostgreSQL schema for RAWDATACOR table.
Schema design:
- id: Auto-increment primary key (PostgreSQL sequence)
- mysql_max_id: Max ID from MySQL records that were consolidated
- Consolidation: Multiple MySQL rows (different NodeNum) → 1 PostgreSQL row
- measurements JSONB structure:
{
"node_1": {"0": {"value": "123.45", "unit": "°C"}, ...},
"node_2": {"0": {"value": "67.89", "unit": "bar"}, ...},
...
}
Returns: Returns:
SQL script to create the table with partitions SQL script to create the table with partitions
""" """
sql = """ sql = """
-- Create sequence for id auto-increment -- Create RAWDATACOR table with partitioning by year
CREATE SEQUENCE IF NOT EXISTS rawdatacor_id_seq;
-- Create RAWDATACOR table with partitioning
-- Note: node_num is stored in measurements JSONB, not as a separate column
CREATE TABLE IF NOT EXISTS rawdatacor ( CREATE TABLE IF NOT EXISTS rawdatacor (
id BIGINT NOT NULL DEFAULT nextval('rawdatacor_id_seq'), id bigint GENERATED BY DEFAULT AS IDENTITY,
unit_name VARCHAR(32), mysql_max_id INTEGER,
tool_name_id VARCHAR(32) NOT NULL, unit_name VARCHAR(50) NOT NULL,
tool_name_id VARCHAR(50) NOT NULL,
event_timestamp TIMESTAMP NOT NULL, event_timestamp TIMESTAMP NOT NULL,
bat_level NUMERIC(4,2) NOT NULL, event_year smallint NOT NULL,
temperature NUMERIC(5,2) NOT NULL, measurements JSONB NOT NULL,
measurements JSONB, bat_level NUMERIC(4,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, temperature NUMERIC(5,2),
bat_level_module NUMERIC(4,2), bat_level_module NUMERIC(4,2),
temperature_module NUMERIC(5,2), temperature_module NUMERIC(5,2),
rssi_module INTEGER rssi_module INTEGER,
) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp)); created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) PARTITION BY RANGE (event_year);
-- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints
-- with RANGE partitioning on expressions. Using sequence for id auto-increment.
-- Create partitions for each year -- Create partitions for each year
""" """
@@ -54,21 +68,21 @@ CREATE TABLE IF NOT EXISTS rawdatacor_default
# Add indexes # Add indexes
sql += """ sql += """
-- Create indexes -- Create indexes for efficient queries
CREATE INDEX IF NOT EXISTS idx_unit_tool_datetime_raw -- UNIQUE constraint on (id, event_year) for partitioned table primary key
ON rawdatacor(unit_name, tool_name_id, event_timestamp); CREATE UNIQUE INDEX IF NOT EXISTS rawdatacor_pkey
ON rawdatacor(id, event_year);
CREATE INDEX IF NOT EXISTS idx_unit_tool_raw -- UNIQUE constraint on consolidation key to prevent duplicates
-- This is the key used to consolidate MySQL rows (UnitName, ToolNameID, EventDate, EventTime)
CREATE UNIQUE INDEX IF NOT EXISTS rawdatacor_consolidation_key_unique
ON rawdatacor(unit_name, tool_name_id, event_timestamp, event_year);
CREATE INDEX IF NOT EXISTS idx_rawdatacor_unit_tool
ON rawdatacor(unit_name, tool_name_id); ON rawdatacor(unit_name, tool_name_id);
CREATE INDEX IF NOT EXISTS idx_measurements_gin_raw CREATE INDEX IF NOT EXISTS idx_rawdatacor_measurements_gin
ON rawdatacor USING GIN (measurements); ON rawdatacor USING GIN (measurements);
CREATE INDEX IF NOT EXISTS idx_event_timestamp_raw
ON rawdatacor(event_timestamp);
CREATE INDEX IF NOT EXISTS idx_id_raw
ON rawdatacor(id);
""" """
return sql return sql
@@ -77,27 +91,39 @@ CREATE INDEX IF NOT EXISTS idx_id_raw
def create_elabdatadisp_schema() -> str: def create_elabdatadisp_schema() -> str:
"""Create PostgreSQL schema for ELABDATADISP table. """Create PostgreSQL schema for ELABDATADISP table.
Schema design:
- id: Auto-increment primary key (PostgreSQL sequence)
- mysql_max_id: Max idElabData from MySQL records that were consolidated
- Consolidation: Multiple MySQL rows (different NodeNum) → 1 PostgreSQL row
- measurements JSONB structure:
{
"node_1": {
"shifts": {"x": 1.234, "y": 2.345, ...},
"coordinates": {"x": 10.123, "y": 20.234, ...},
"kinematics": {...},
"sensors": {...},
"calculated": {...}
},
"node_2": { ... },
...
}
Returns: Returns:
SQL script to create the table with partitions SQL script to create the table with partitions
""" """
sql = """ sql = """
-- Create sequence for id_elab_data auto-increment -- Create ELABDATADISP table with partitioning by year
CREATE SEQUENCE IF NOT EXISTS elabdatadisp_id_seq;
-- Create ELABDATADISP table with partitioning
-- Note: node_num, state, and calc_err are stored in measurements JSONB, not as separate columns
CREATE TABLE IF NOT EXISTS elabdatadisp ( CREATE TABLE IF NOT EXISTS elabdatadisp (
id_elab_data BIGINT NOT NULL DEFAULT nextval('elabdatadisp_id_seq'), id bigint GENERATED BY DEFAULT AS IDENTITY,
unit_name VARCHAR(32), mysql_max_id INTEGER NOT NULL,
tool_name_id VARCHAR(32) NOT NULL, unit_name VARCHAR(50),
tool_name_id VARCHAR(50),
event_timestamp TIMESTAMP NOT NULL, event_timestamp TIMESTAMP NOT NULL,
measurements JSONB, event_year smallint NOT NULL,
measurements JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp)); ) PARTITION BY RANGE (event_year);
-- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints
-- with RANGE partitioning on expressions. Using sequence for id_elab_data auto-increment.
-- Create partitions for each year -- Create partitions for each year
""" """
@@ -119,21 +145,21 @@ CREATE TABLE IF NOT EXISTS elabdatadisp_default
# Add indexes # Add indexes
sql += """ sql += """
-- Create indexes -- Create indexes for efficient queries
CREATE INDEX IF NOT EXISTS idx_unit_tool_datetime_elab -- UNIQUE constraint on (id, event_year) for partitioned table primary key
ON elabdatadisp(unit_name, tool_name_id, event_timestamp); CREATE UNIQUE INDEX IF NOT EXISTS elabdatadisp_pkey
ON elabdatadisp(id, event_year);
CREATE INDEX IF NOT EXISTS idx_unit_tool_elab -- UNIQUE constraint on consolidation key to prevent duplicates
-- This is the key used to consolidate MySQL rows (UnitName, ToolNameID, EventDate, EventTime)
CREATE UNIQUE INDEX IF NOT EXISTS elabdatadisp_consolidation_key_unique
ON elabdatadisp(unit_name, tool_name_id, event_timestamp, event_year);
CREATE INDEX IF NOT EXISTS idx_elabdatadisp_unit_tool
ON elabdatadisp(unit_name, tool_name_id); ON elabdatadisp(unit_name, tool_name_id);
CREATE INDEX IF NOT EXISTS idx_measurements_gin_elab CREATE INDEX IF NOT EXISTS idx_elabdatadisp_measurements_gin
ON elabdatadisp USING GIN (measurements); ON elabdatadisp USING GIN (measurements);
CREATE INDEX IF NOT EXISTS idx_event_timestamp_elab
ON elabdatadisp(event_timestamp);
CREATE INDEX IF NOT EXISTS idx_id_elab_data
ON elabdatadisp(id_elab_data);
""" """
return sql return sql
@@ -142,21 +168,42 @@ CREATE INDEX IF NOT EXISTS idx_id_elab_data
def create_migration_state_table() -> str: def create_migration_state_table() -> str:
"""Create table to track migration state. """Create table to track migration state.
Tracks migration progress per partition for parallel processing:
- table_name + partition_name: Composite primary key for per-partition tracking
- last_key: Last consolidated key migrated (unit_name, tool_name_id, event_date, event_time)
- total_rows_migrated: Count of PostgreSQL rows (consolidated) for this partition
- status: pending, in_progress, completed
Returns: Returns:
SQL to create migration_state table SQL to create migration_state table
""" """
sql = """ sql = """
-- Create table to track migration state -- Create table to track migration state (supports parallel partition migration)
CREATE TABLE IF NOT EXISTS migration_state ( CREATE TABLE IF NOT EXISTS migration_state (
table_name VARCHAR(255) PRIMARY KEY, table_name VARCHAR(255) NOT NULL,
last_migrated_timestamp TIMESTAMP, partition_name VARCHAR(255) NOT NULL,
last_migrated_id BIGINT, last_key JSONB,
migration_started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, migration_started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
migration_completed_at TIMESTAMP, migration_completed_at TIMESTAMP,
total_rows_migrated BIGINT DEFAULT 0, total_rows_migrated BIGINT DEFAULT 0,
status VARCHAR(32) DEFAULT 'pending', status VARCHAR(32) DEFAULT 'pending',
last_completed_partition VARCHAR(255) PRIMARY KEY (table_name, partition_name),
CONSTRAINT last_key_structure CHECK (
last_key IS NULL OR (
last_key ? 'unit_name' AND
last_key ? 'tool_name_id' AND
last_key ? 'event_date' AND
last_key ? 'event_time'
)
)
); );
-- Index for querying migration state
CREATE INDEX IF NOT EXISTS idx_migration_state_status
ON migration_state(table_name, status);
CREATE INDEX IF NOT EXISTS idx_migration_state_partition
ON migration_state(partition_name);
""" """
return sql return sql

View File

@@ -1,73 +1,109 @@
"""Progress tracking utility.""" """Progress tracking utility - lightweight logging version."""
from rich.progress import (
Progress,
SpinnerColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
TimeElapsedColumn,
TransferSpeedColumn,
)
from rich.console import Console
import time import time
from typing import Optional
from src.utils.logger import get_logger
logger = get_logger(__name__)
class ProgressTracker: class ProgressTracker:
"""Track migration progress with Rich progress bar.""" """Track migration progress with periodic text logging (lightweight)."""
def __init__(self, total: int, description: str = "Migrating"): def __init__(self, total: Optional[int] = None, description: str = "Migrating", log_interval: Optional[int] = None):
"""Initialize progress tracker. """Initialize progress tracker.
Args: Args:
total: Total number of items to process total: Total number of items to process (None if unknown)
description: Description of the task description: Description of the task
log_interval: Log progress every N items (if None, reads from config)
""" """
self.total = total self.total = total
self.description = description self.description = description
self.progress = Progress(
SpinnerColumn(), # Read log_interval from config if not provided
BarColumn(), if log_interval is None:
TaskProgressColumn(), from config import get_settings
TimeElapsedColumn(), self.log_interval = get_settings().migration.progress_log_interval
TimeRemainingColumn(), else:
TransferSpeedColumn(), self.log_interval = log_interval
console=Console(),
)
self.task_id = None
self.start_time = None self.start_time = None
self.processed = 0 self.processed = 0
self.last_log_count = 0
def __enter__(self): def __enter__(self):
"""Context manager entry.""" """Context manager entry."""
self.progress.start()
self.task_id = self.progress.add_task(
self.description, total=self.total
)
self.start_time = time.time() self.start_time = time.time()
logger.info(f"Starting: {self.description}")
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit.""" """Context manager exit."""
self.progress.stop()
if exc_type is None: if exc_type is None:
elapsed = time.time() - self.start_time elapsed = time.time() - self.start_time
rate = self.processed / elapsed if elapsed > 0 else 0 rate = self.processed / elapsed if elapsed > 0 else 0
self.progress.console.print(
f"[green]✓ Completed: {self.processed}/{self.total} items " # Format elapsed time
f"in {elapsed:.2f}s ({rate:.0f} items/sec)[/green]" if elapsed < 60:
) elapsed_str = f"{elapsed:.1f}s"
elif elapsed < 3600:
elapsed_str = f"{elapsed/60:.1f}m"
else:
elapsed_str = f"{elapsed/3600:.1f}h"
if self.total is not None:
logger.info(
f"✓ Completed: {self.processed:,}/{self.total:,} items "
f"in {elapsed_str} ({rate:.0f} items/sec)"
)
else:
logger.info(
f"✓ Completed: {self.processed:,} items "
f"in {elapsed_str} ({rate:.0f} items/sec)"
)
def update(self, advance: int = 1): def update(self, advance: int = 1):
"""Update progress. """Update progress and log periodically.
Args: Args:
advance: Number of items processed advance: Number of items processed
""" """
if self.task_id is not None: self.processed += advance
self.progress.update(self.task_id, advance=advance)
self.processed += advance # Log every log_interval items
if self.processed - self.last_log_count >= self.log_interval:
self._log_progress()
self.last_log_count = self.processed
def _log_progress(self):
"""Log current progress."""
elapsed = time.time() - self.start_time
rate = self.processed / elapsed if elapsed > 0 else 0
# Format elapsed time
if elapsed < 60:
elapsed_str = f"{elapsed:.0f}s"
elif elapsed < 3600:
mins = int(elapsed / 60)
secs = int(elapsed % 60)
elapsed_str = f"{mins}m {secs}s"
else:
hours = int(elapsed / 3600)
mins = int((elapsed % 3600) / 60)
elapsed_str = f"{hours}h {mins}m"
if self.total is not None:
progress_pct = (self.processed / self.total) * 100
logger.info(
f"{self.description}: {self.processed:,}/{self.total:,} items "
f"({progress_pct:.1f}%) - elapsed: {elapsed_str} - rate: {rate:.0f} items/sec"
)
else:
logger.info(
f"{self.description}: {self.processed:,} items "
f"- elapsed: {elapsed_str} - rate: {rate:.0f} items/sec"
)
def print_status(self, message: str): def print_status(self, message: str):
"""Print a status message without interrupting progress bar.""" """Print a status message."""
if self.task_id is not None: logger.info(message)
self.progress.print(message)

157
src/utils/validation.py Normal file
View File

@@ -0,0 +1,157 @@
"""Data validation utilities for migration."""
from typing import Dict, Any, Optional, Tuple
from datetime import datetime, date
import os
from src.utils.logger import get_logger
logger = get_logger(__name__)
class ErrorLogger:
"""Log invalid migration keys to a file."""
def __init__(self, table: str, partition: str):
"""Initialize error logger.
Args:
table: Table name
partition: Partition name
"""
self.table = table
self.partition = partition
self.error_file = f"migration_errors_{table}_{partition}.log"
self.error_count = 0
# Create error file with header
with open(self.error_file, "w") as f:
f.write(f"# Migration errors for {table} partition {partition}\n")
f.write("# Format: UnitName|ToolNameID|EventDate|EventTime|Reason\n\n")
logger.info(f"Error log file created: {self.error_file}")
def log_invalid_key(
self,
unit_name: Any,
tool_name_id: Any,
event_date: Any,
event_time: Any,
reason: str
) -> None:
"""Log an invalid consolidation key.
Args:
unit_name: UnitName value
tool_name_id: ToolNameID value
event_date: EventDate value
event_time: EventTime value
reason: Reason for rejection
"""
with open(self.error_file, "a") as f:
f.write(f"{unit_name}|{tool_name_id}|{event_date}|{event_time}|{reason}\n")
self.error_count += 1
if self.error_count % 100 == 0:
logger.warning(f"Logged {self.error_count} invalid keys to {self.error_file}")
def get_error_count(self) -> int:
"""Get total number of errors logged.
Returns:
Number of errors logged
"""
return self.error_count
def close(self) -> None:
"""Close error logger and log summary."""
if self.error_count > 0:
logger.warning(
f"Total invalid keys for {self.table} partition {self.partition}: "
f"{self.error_count} (see {self.error_file})"
)
else:
logger.info(f"No invalid keys found for {self.table} partition {self.partition}")
# Remove empty error file
if os.path.exists(self.error_file):
os.remove(self.error_file)
def validate_consolidation_key(
unit_name: Any,
tool_name_id: Any,
event_date: Any,
event_time: Any
) -> Tuple[bool, Optional[str]]:
"""Validate a consolidation key.
Args:
unit_name: UnitName value
tool_name_id: ToolNameID value
event_date: EventDate value
event_time: EventTime value
Returns:
Tuple of (is_valid, error_reason)
If valid: (True, None)
If invalid: (False, "reason description")
"""
# Check for NULL unit_name or tool_name_id
if unit_name is None or unit_name == "":
return False, "UnitName is NULL or empty"
if tool_name_id is None or tool_name_id == "":
return False, "ToolNameID is NULL or empty"
# Check for NULL or invalid dates
if event_date is None:
return False, "EventDate is NULL"
# Check for invalid date like '0000-00-00'
try:
if isinstance(event_date, str):
if event_date.startswith("0000-00-00"):
return False, f"EventDate is invalid: {event_date}"
# Try to parse
parsed_date = datetime.strptime(event_date, "%Y-%m-%d").date()
elif isinstance(event_date, (date, datetime)):
parsed_date = event_date if isinstance(event_date, date) else event_date.date()
# Check for zero date
if parsed_date.year == 0:
return False, f"EventDate year is 0: {event_date}"
else:
return False, f"EventDate has invalid type: {type(event_date)}"
except (ValueError, AttributeError) as e:
return False, f"EventDate parsing failed: {event_date} ({e})"
# Check for NULL event_time
if event_time is None:
return False, "EventTime is NULL"
return True, None
def validate_mysql_row(row: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Validate a complete MySQL row for migration.
Args:
row: MySQL row dictionary
Returns:
Tuple of (is_valid, error_reason)
"""
# Validate consolidation key
is_valid, reason = validate_consolidation_key(
row.get("UnitName"),
row.get("ToolNameID"),
row.get("EventDate"),
row.get("EventTime")
)
if not is_valid:
return False, reason
# Check for NodeNum
if row.get("NodeNum") is None:
return False, "NodeNum is NULL"
return True, None