Implement partition-based consolidation for ELABDATADISP

Changed consolidation strategy to leverage MySQL partitioning:
- Added get_table_partitions() to list all partitions
- Added fetch_consolidation_groups_from_partition() to read groups by consolidation key
- Each group (UnitName, ToolNameID, EventDate, EventTime) is fetched completely
- All nodes of same group are consolidated into single row with JSONB measurements
- Process partitions sequentially for predictable memory usage

Key benefits:
- Guaranteed complete consolidation (no fragmentation across batches)
- Deterministic behavior - same group always consolidated together
- Better memory efficiency with partition limits (100k groups per query)
- Clear audit trail of which partition each row came from

Tested with partition d3: 6960 input rows → 100 consolidated rows (69.6:1 ratio)
with groups containing 24-72 nodes each.

🤖 Generated with Claude Code

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-25 21:49:30 +01:00
parent a394de99ef
commit bb27f749a0
3 changed files with 156 additions and 85 deletions

View File

@@ -49,6 +49,7 @@ class MigrationSettings(BaseSettings):
) )
batch_size: int = 10000 batch_size: int = 10000
consolidation_group_limit: int = 100000
log_level: str = "INFO" log_level: str = "INFO"
dry_run: bool = False dry_run: bool = False

View File

@@ -308,3 +308,114 @@ class MySQLConnector:
except pymysql.Error as e: except pymysql.Error as e:
logger.error(f"Failed to get structure for {table}: {e}") logger.error(f"Failed to get structure for {table}: {e}")
raise raise
def get_table_partitions(self, table: str) -> List[str]:
"""Get list of partitions for a table.
Args:
table: Table name
Returns:
List of partition names
"""
try:
with self.connection.cursor() as cursor:
cursor.execute("""
SELECT PARTITION_NAME
FROM INFORMATION_SCHEMA.PARTITIONS
WHERE TABLE_NAME = %s
AND TABLE_SCHEMA = %s
ORDER BY PARTITION_NAME
""", (table, self.settings.mysql.database))
return [row["PARTITION_NAME"] for row in cursor.fetchall()]
except pymysql.Error as e:
logger.error(f"Failed to get partitions for {table}: {e}")
raise
def fetch_consolidation_groups_from_partition(
self,
table: str,
partition: str,
limit: Optional[int] = None,
offset: int = 0
) -> Generator[List[Dict[str, Any]], None, None]:
"""Fetch consolidation groups from a partition.
Reads unique combinations of (UnitName, ToolNameID, EventDate, EventTime, NodeNum)
and fetches all rows for each group. This ensures proper consolidation.
Args:
table: Table name
partition: Partition name
limit: Maximum number of groups per query (uses config default if None)
offset: Starting offset for pagination
Yields:
Lists of rows grouped by consolidation key
"""
if limit is None:
limit = self.settings.migration.consolidation_group_limit
if table not in ("RAWDATACOR", "ELABDATADISP"):
raise ValueError(f"Consolidation not supported for table {table}")
max_retries = 3
current_offset = offset
while True:
retries = 0
while retries < max_retries:
try:
with self.connection.cursor() as cursor:
# Get unique consolidation groups from partition
# First, get the distinct consolidation keys
group_query = f"""
SELECT DISTINCT UnitName, ToolNameID, EventDate, EventTime
FROM `{table}` PARTITION (`{partition}`)
ORDER BY UnitName, ToolNameID, EventDate, EventTime
LIMIT %s OFFSET %s
"""
cursor.execute(group_query, (limit, current_offset))
groups = cursor.fetchall()
if not groups:
return
# For each group, fetch all rows
for group in groups:
unit_name = group.get("UnitName")
tool_name_id = group.get("ToolNameID")
event_date = group.get("EventDate")
event_time = group.get("EventTime")
rows_query = f"""
SELECT * FROM `{table}` PARTITION (`{partition}`)
WHERE UnitName <=> %s
AND ToolNameID = %s
AND EventDate <=> %s
AND EventTime <=> %s
ORDER BY NodeNum ASC
"""
cursor.execute(rows_query, (unit_name, tool_name_id, event_date, event_time))
rows = cursor.fetchall()
if rows:
yield rows
current_offset += limit
break # Success, exit retry loop
except pymysql.Error as e:
retries += 1
if retries >= max_retries:
logger.error(f"Failed to fetch consolidation groups from {table} partition {partition} after {max_retries} retries: {e}")
raise
else:
logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}")
# Reconnect and retry
try:
self.disconnect()
self.connect()
except Exception as reconnect_error:
logger.error(f"Failed to reconnect: {reconnect_error}")
raise

View File

@@ -96,98 +96,57 @@ class FullMigrator:
rows_to_migrate, rows_to_migrate,
f"Migrating {mysql_table}" f"Migrating {mysql_table}"
) as progress: ) as progress:
# Consolidate across batches by buffering rows with the same consolidation key
# This ensures all nodes of the same (unit, tool, timestamp) are consolidated together
row_buffer = []
last_consolidation_key = None
columns = DataTransformer.get_column_order(pg_table) columns = DataTransformer.get_column_order(pg_table)
total_mysql_rows = 0
# Fetch and migrate rows in batches # Get list of partitions and process each one
# Use ordered fetching for node consolidation with resume support partitions = mysql_conn.get_table_partitions(mysql_table)
for batch in mysql_conn.fetch_rows_ordered_for_consolidation( logger.info(f"Found {len(partitions)} partitions for {mysql_table}")
mysql_table,
start_id=last_migrated_id
):
if not batch:
break
# Sort batch by consolidation key for partition in partitions:
sorted_batch = sorted(batch, key=lambda r: ( logger.info(f"Processing partition {partition}...")
r.get("UnitName") or "",
r.get("ToolNameID") or "",
str(r.get("EventDate") or ""),
str(r.get("EventTime") or ""),
int(r.get("NodeNum") or 0)
))
# Process each row, consolidating when consolidation key changes # Fetch consolidation groups from partition
for row in sorted_batch: # Each group is a list of rows with the same (unit, tool, date, time)
# Extract consolidation key for group_rows in mysql_conn.fetch_consolidation_groups_from_partition(
consolidation_key = ( mysql_table,
row.get("UnitName"), partition
row.get("ToolNameID"), ):
row.get("EventDate"), if not group_rows:
row.get("EventTime") break
# Consolidate the group
transformed = DataTransformer.transform_batch(
mysql_table,
group_rows,
consolidate=True
) )
# If consolidation key changed, consolidate the buffer # Insert consolidated rows
if last_consolidation_key is not None and consolidation_key != last_consolidation_key: inserted = pg_conn.insert_batch(pg_table, transformed, columns)
# Consolidate buffered rows if inserted > 0:
transformed = DataTransformer.transform_batch( migrated += inserted
mysql_table, batch_count += 1
row_buffer, progress.update(len(group_rows))
consolidate=True
)
# Insert consolidated rows # Update state every 10 consolidations
inserted = pg_conn.insert_batch(pg_table, transformed, columns) if batch_count % 10 == 0:
if inserted > 0: batch_max_id = max(int(r.get(primary_key, 0)) for r in group_rows)
migrated += inserted self._update_migration_state(
batch_count += 1 pg_conn, migrated, batch_max_id, migration_start_time
progress.update(len(row_buffer)) )
total_mysql_rows += len(row_buffer) else:
batch_max_id = max(int(r.get(primary_key, 0)) for r in group_rows)
# Update state every 10 inserts try:
if batch_count % 10 == 0: with pg_conn.connection.cursor() as cursor:
batch_max_id = max(int(r.get(primary_key, 0)) for r in row_buffer) cursor.execute(
self._update_migration_state( """UPDATE migration_state
pg_conn, migrated, batch_max_id, migration_start_time SET last_migrated_id = %s, last_migrated_timestamp = %s
) WHERE table_name = %s""",
else: (batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table)
batch_max_id = max(int(r.get(primary_key, 0)) for r in row_buffer) )
try: pg_conn.connection.commit()
with pg_conn.connection.cursor() as cursor: except Exception as e:
cursor.execute( logger.warning(f"Failed to update migration state: {e}")
"""UPDATE migration_state
SET last_migrated_id = %s, last_migrated_timestamp = %s
WHERE table_name = %s""",
(batch_max_id, migration_start_time or datetime.utcnow().isoformat(), pg_table)
)
pg_conn.connection.commit()
except Exception as e:
logger.warning(f"Failed to update migration state: {e}")
# Reset buffer
row_buffer = []
# Add row to buffer
row_buffer.append(row)
last_consolidation_key = consolidation_key
# Consolidate any remaining rows in buffer
if row_buffer:
transformed = DataTransformer.transform_batch(
mysql_table,
row_buffer,
consolidate=True
)
inserted = pg_conn.insert_batch(pg_table, transformed, columns)
if inserted > 0:
migrated += inserted
batch_count += 1
progress.update(len(row_buffer))
total_mysql_rows += len(row_buffer)
# Get final actual count from PostgreSQL # Get final actual count from PostgreSQL
final_count = pg_conn.get_row_count(pg_table) final_count = pg_conn.get_row_count(pg_table)