feat: Implement node consolidation for ELABDATADISP table
Add consolidation logic to ELABDATADISP similar to RAWDATACOR: - Group rows by (unit_name, tool_name_id, event_timestamp) - Consolidate multiple nodes with same timestamp into single row - Store node_num, state, calc_err in JSONB measurements keyed by node Changes: 1. Add _build_measurement_for_elabdatadisp_node() helper - Builds measurement object with state, calc_err, and measurement categories - Filters out empty categories to save space 2. Update transform_elabdatadisp_row() signature - Accept optional measurements parameter for consolidated rows - Build from single row if measurements not provided - Remove node_num, state, calc_err from returned columns (now in JSONB) - Keep only: id_elab_data, unit_name, tool_name_id, event_timestamp, measurements, created_at 3. Add consolidate_elabdatadisp_batch() method - Group rows by consolidation key - Build consolidated measurements with node numbers as keys - Use MAX(idElabData) for checkpoint tracking (resume capability) - Use MIN(idElabData) as template for other fields 4. Update transform_batch() to support ELABDATADISP consolidation - Check consolidate flag for both tables - Call consolidate_elabdatadisp_batch() when needed Result: ELABDATADISP now consolidates ~5-10:1 like RAWDATACOR, with all node data (node_num, state, calc_err, measurements) keyed by node number in JSONB. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -103,6 +103,43 @@ class DataTransformer:
|
||||
|
||||
return measurement
|
||||
|
||||
@staticmethod
|
||||
def _build_measurement_for_elabdatadisp_node(mysql_row: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Build measurement object for a single ELABDATADISP node including state and calc_err.
|
||||
|
||||
Args:
|
||||
mysql_row: Row dictionary from MySQL
|
||||
|
||||
Returns:
|
||||
Measurement dictionary for this node with state, calc_err, and measurement fields
|
||||
"""
|
||||
measurement = {
|
||||
"state": mysql_row.get("State"),
|
||||
"calc_err": mysql_row.get("calcerr", 0),
|
||||
}
|
||||
|
||||
# Create nested structure for measurement categories
|
||||
categories = {
|
||||
"shifts": {},
|
||||
"coordinates": {},
|
||||
"kinematics": {},
|
||||
"sensors": {},
|
||||
"calculated": {},
|
||||
}
|
||||
|
||||
# Map all measurement fields using the configuration
|
||||
for mysql_col, (category, pg_key) in ELABDATADISP_FIELD_MAPPING.items():
|
||||
value = mysql_row.get(mysql_col)
|
||||
if value is not None:
|
||||
categories[category][pg_key] = float(value) if isinstance(value, str) else value
|
||||
|
||||
# Merge categories into measurement, keeping only non-empty categories
|
||||
for category, values in categories.items():
|
||||
if values: # Only add if category has data
|
||||
measurement[category] = values
|
||||
|
||||
return measurement
|
||||
|
||||
@staticmethod
|
||||
def transform_rawdatacor_row(mysql_row: Dict[str, Any], measurements: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||
"""Transform a RAWDATACOR row from MySQL to PostgreSQL format.
|
||||
@@ -168,34 +205,23 @@ class DataTransformer:
|
||||
return pg_row
|
||||
|
||||
@staticmethod
|
||||
def transform_elabdatadisp_row(mysql_row: Dict[str, Any]) -> Dict[str, Any]:
|
||||
def transform_elabdatadisp_row(mysql_row: Dict[str, Any], measurements: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||
"""Transform an ELABDATADISP row from MySQL to PostgreSQL format.
|
||||
|
||||
Args:
|
||||
mysql_row: Row dictionary from MySQL
|
||||
measurements: Pre-built measurements JSONB (for consolidated nodes).
|
||||
If None, builds measurements from mysql_row.
|
||||
|
||||
Returns:
|
||||
Transformed row dictionary for PostgreSQL
|
||||
"""
|
||||
# Create measurements JSONB with structured categories
|
||||
measurements = {
|
||||
"shifts": {},
|
||||
"coordinates": {},
|
||||
"kinematics": {},
|
||||
"sensors": {},
|
||||
"calculated": {},
|
||||
}
|
||||
|
||||
# Map all measurement fields using the configuration
|
||||
for mysql_col, (category, pg_key) in ELABDATADISP_FIELD_MAPPING.items():
|
||||
value = mysql_row.get(mysql_col)
|
||||
if value is not None:
|
||||
measurements[category][pg_key] = float(value) if isinstance(value, str) else value
|
||||
|
||||
# Remove empty categories
|
||||
measurements = {
|
||||
k: v for k, v in measurements.items() if v
|
||||
}
|
||||
# If measurements not provided, build from single row
|
||||
if measurements is None:
|
||||
node_num = mysql_row.get("NodeNum")
|
||||
node_measurements = DataTransformer._build_measurement_for_elabdatadisp_node(mysql_row)
|
||||
# Wrap with node number as key for consolidation compatibility
|
||||
measurements = {str(node_num): node_measurements} if node_num is not None else {}
|
||||
|
||||
# Combine event_date and event_time into event_timestamp
|
||||
event_date = mysql_row.get("EventDate")
|
||||
@@ -225,17 +251,14 @@ class DataTransformer:
|
||||
event_timestamp = None
|
||||
|
||||
# Create PostgreSQL row
|
||||
# Note: node_num, state, calc_err are now stored in measurements JSONB, not as separate columns
|
||||
pg_row = {
|
||||
"id_elab_data": mysql_row["idElabData"],
|
||||
"unit_name": mysql_row.get("UnitName"),
|
||||
"tool_name_id": mysql_row["ToolNameID"],
|
||||
"node_num": mysql_row["NodeNum"],
|
||||
"event_timestamp": event_timestamp,
|
||||
"state": mysql_row.get("State"),
|
||||
"calc_err": mysql_row.get("calcerr", 0),
|
||||
"measurements": measurements,
|
||||
"created_at": mysql_row.get("created_at"),
|
||||
"updated_at": mysql_row.get("updated_at"),
|
||||
}
|
||||
|
||||
return pg_row
|
||||
@@ -312,6 +335,79 @@ class DataTransformer:
|
||||
|
||||
return consolidated_rows
|
||||
|
||||
@staticmethod
|
||||
def consolidate_elabdatadisp_batch(
|
||||
rows: List[Dict[str, Any]]
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Consolidate ELABDATADISP rows by (unit_name, tool_name_id, event_timestamp).
|
||||
|
||||
Groups multiple nodes with the same key into a single row with measurements
|
||||
keyed by node number, including state and calc_err for each node.
|
||||
Uses MAX(id) as the consolidated row ID for proper resume.
|
||||
|
||||
Args:
|
||||
rows: List of row dictionaries from MySQL, ordered by
|
||||
(UnitName, ToolNameID, EventDate, EventTime, NodeNum)
|
||||
|
||||
Returns:
|
||||
List of consolidated row dictionaries ready for transformation
|
||||
"""
|
||||
if not rows:
|
||||
return []
|
||||
|
||||
# Group rows by consolidation key
|
||||
groups = {}
|
||||
group_order = [] # Track order of first appearance
|
||||
|
||||
for row in rows:
|
||||
# Build consolidation key
|
||||
unit_name = row.get("UnitName")
|
||||
tool_name_id = row["ToolNameID"]
|
||||
event_date = row.get("EventDate")
|
||||
event_time = row.get("EventTime")
|
||||
|
||||
# Create a hashable key
|
||||
key = (unit_name, tool_name_id, event_date, event_time)
|
||||
|
||||
if key not in groups:
|
||||
groups[key] = []
|
||||
group_order.append(key)
|
||||
|
||||
groups[key].append(row)
|
||||
|
||||
# Transform each group into a consolidated row
|
||||
consolidated_rows = []
|
||||
|
||||
for key in group_order:
|
||||
group_rows = groups[key]
|
||||
|
||||
# Build consolidated measurements with nodes as keys
|
||||
consolidated_measurements = {}
|
||||
|
||||
for row in group_rows:
|
||||
node_num = row.get("NodeNum")
|
||||
node_measurements = DataTransformer._build_measurement_for_elabdatadisp_node(row)
|
||||
# Store measurements with node number as key
|
||||
consolidated_measurements[str(node_num)] = node_measurements
|
||||
|
||||
# Use the row with minimum id as template for other fields
|
||||
min_id_row = min(group_rows, key=lambda r: r["idElabData"])
|
||||
# Use the row with maximum id for the consolidated row ID (for proper resume)
|
||||
max_id_row = max(group_rows, key=lambda r: r["idElabData"])
|
||||
|
||||
# Create consolidated row with pre-built measurements
|
||||
consolidated_row = DataTransformer.transform_elabdatadisp_row(
|
||||
min_id_row,
|
||||
measurements=consolidated_measurements
|
||||
)
|
||||
|
||||
# Update id to MAX(id) of the group (represents last MySQL row processed)
|
||||
consolidated_row["id_elab_data"] = max_id_row["idElabData"]
|
||||
|
||||
consolidated_rows.append(consolidated_row)
|
||||
|
||||
return consolidated_rows
|
||||
|
||||
@staticmethod
|
||||
def transform_batch(
|
||||
table: str,
|
||||
@@ -323,7 +419,7 @@ class DataTransformer:
|
||||
Args:
|
||||
table: Table name ('RAWDATACOR' or 'ELABDATADISP')
|
||||
rows: List of row dictionaries from MySQL
|
||||
consolidate: If True and table is RAWDATACOR, consolidate nodes
|
||||
consolidate: If True, consolidate nodes (for both RAWDATACOR and ELABDATADISP)
|
||||
|
||||
Returns:
|
||||
List of transformed row dictionaries for PostgreSQL
|
||||
@@ -338,6 +434,10 @@ class DataTransformer:
|
||||
for row in rows
|
||||
]
|
||||
elif table == "ELABDATADISP":
|
||||
if consolidate:
|
||||
# Consolidate rows by key first, then they're already transformed
|
||||
return DataTransformer.consolidate_elabdatadisp_batch(rows)
|
||||
else:
|
||||
return [
|
||||
DataTransformer.transform_elabdatadisp_row(row)
|
||||
for row in rows
|
||||
|
||||
@@ -82,17 +82,14 @@ def create_elabdatadisp_schema() -> str:
|
||||
CREATE SEQUENCE IF NOT EXISTS elabdatadisp_id_seq;
|
||||
|
||||
-- Create ELABDATADISP table with partitioning
|
||||
-- Note: node_num, state, and calc_err are stored in measurements JSONB, not as separate columns
|
||||
CREATE TABLE IF NOT EXISTS elabdatadisp (
|
||||
id_elab_data BIGINT NOT NULL DEFAULT nextval('elabdatadisp_id_seq'),
|
||||
unit_name VARCHAR(32),
|
||||
tool_name_id VARCHAR(32) NOT NULL,
|
||||
node_num INTEGER NOT NULL,
|
||||
event_timestamp TIMESTAMP NOT NULL,
|
||||
state VARCHAR(32),
|
||||
calc_err INTEGER DEFAULT 0,
|
||||
measurements JSONB,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp));
|
||||
|
||||
-- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints
|
||||
@@ -119,8 +116,8 @@ CREATE TABLE IF NOT EXISTS elabdatadisp_default
|
||||
# Add indexes
|
||||
sql += """
|
||||
-- Create indexes
|
||||
CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_elab
|
||||
ON elabdatadisp(unit_name, tool_name_id, node_num, event_timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_unit_tool_datetime_elab
|
||||
ON elabdatadisp(unit_name, tool_name_id, event_timestamp);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_unit_tool_elab
|
||||
ON elabdatadisp(unit_name, tool_name_id);
|
||||
|
||||
Reference in New Issue
Block a user