From 693228c0dab79394dfaff66399ef6b127c89311d Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 25 Dec 2025 18:41:54 +0100 Subject: [PATCH] feat: Implement node consolidation for ELABDATADISP table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add consolidation logic to ELABDATADISP similar to RAWDATACOR: - Group rows by (unit_name, tool_name_id, event_timestamp) - Consolidate multiple nodes with same timestamp into single row - Store node_num, state, calc_err in JSONB measurements keyed by node Changes: 1. Add _build_measurement_for_elabdatadisp_node() helper - Builds measurement object with state, calc_err, and measurement categories - Filters out empty categories to save space 2. Update transform_elabdatadisp_row() signature - Accept optional measurements parameter for consolidated rows - Build from single row if measurements not provided - Remove node_num, state, calc_err from returned columns (now in JSONB) - Keep only: id_elab_data, unit_name, tool_name_id, event_timestamp, measurements, created_at 3. Add consolidate_elabdatadisp_batch() method - Group rows by consolidation key - Build consolidated measurements with node numbers as keys - Use MAX(idElabData) for checkpoint tracking (resume capability) - Use MIN(idElabData) as template for other fields 4. Update transform_batch() to support ELABDATADISP consolidation - Check consolidate flag for both tables - Call consolidate_elabdatadisp_batch() when needed Result: ELABDATADISP now consolidates ~5-10:1 like RAWDATACOR, with all node data (node_num, state, calc_err, measurements) keyed by node number in JSONB. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Haiku 4.5 --- src/transformers/data_transformer.py | 158 ++++++++++++++++++++----- src/transformers/schema_transformer.py | 11 +- 2 files changed, 133 insertions(+), 36 deletions(-) diff --git a/src/transformers/data_transformer.py b/src/transformers/data_transformer.py index 7173577..c233c1e 100644 --- a/src/transformers/data_transformer.py +++ b/src/transformers/data_transformer.py @@ -103,6 +103,43 @@ class DataTransformer: return measurement + @staticmethod + def _build_measurement_for_elabdatadisp_node(mysql_row: Dict[str, Any]) -> Dict[str, Any]: + """Build measurement object for a single ELABDATADISP node including state and calc_err. + + Args: + mysql_row: Row dictionary from MySQL + + Returns: + Measurement dictionary for this node with state, calc_err, and measurement fields + """ + measurement = { + "state": mysql_row.get("State"), + "calc_err": mysql_row.get("calcerr", 0), + } + + # Create nested structure for measurement categories + categories = { + "shifts": {}, + "coordinates": {}, + "kinematics": {}, + "sensors": {}, + "calculated": {}, + } + + # Map all measurement fields using the configuration + for mysql_col, (category, pg_key) in ELABDATADISP_FIELD_MAPPING.items(): + value = mysql_row.get(mysql_col) + if value is not None: + categories[category][pg_key] = float(value) if isinstance(value, str) else value + + # Merge categories into measurement, keeping only non-empty categories + for category, values in categories.items(): + if values: # Only add if category has data + measurement[category] = values + + return measurement + @staticmethod def transform_rawdatacor_row(mysql_row: Dict[str, Any], measurements: Dict[str, Any] = None) -> Dict[str, Any]: """Transform a RAWDATACOR row from MySQL to PostgreSQL format. @@ -168,34 +205,23 @@ class DataTransformer: return pg_row @staticmethod - def transform_elabdatadisp_row(mysql_row: Dict[str, Any]) -> Dict[str, Any]: + def transform_elabdatadisp_row(mysql_row: Dict[str, Any], measurements: Dict[str, Any] = None) -> Dict[str, Any]: """Transform an ELABDATADISP row from MySQL to PostgreSQL format. Args: mysql_row: Row dictionary from MySQL + measurements: Pre-built measurements JSONB (for consolidated nodes). + If None, builds measurements from mysql_row. Returns: Transformed row dictionary for PostgreSQL """ - # Create measurements JSONB with structured categories - measurements = { - "shifts": {}, - "coordinates": {}, - "kinematics": {}, - "sensors": {}, - "calculated": {}, - } - - # Map all measurement fields using the configuration - for mysql_col, (category, pg_key) in ELABDATADISP_FIELD_MAPPING.items(): - value = mysql_row.get(mysql_col) - if value is not None: - measurements[category][pg_key] = float(value) if isinstance(value, str) else value - - # Remove empty categories - measurements = { - k: v for k, v in measurements.items() if v - } + # If measurements not provided, build from single row + if measurements is None: + node_num = mysql_row.get("NodeNum") + node_measurements = DataTransformer._build_measurement_for_elabdatadisp_node(mysql_row) + # Wrap with node number as key for consolidation compatibility + measurements = {str(node_num): node_measurements} if node_num is not None else {} # Combine event_date and event_time into event_timestamp event_date = mysql_row.get("EventDate") @@ -225,17 +251,14 @@ class DataTransformer: event_timestamp = None # Create PostgreSQL row + # Note: node_num, state, calc_err are now stored in measurements JSONB, not as separate columns pg_row = { "id_elab_data": mysql_row["idElabData"], "unit_name": mysql_row.get("UnitName"), "tool_name_id": mysql_row["ToolNameID"], - "node_num": mysql_row["NodeNum"], "event_timestamp": event_timestamp, - "state": mysql_row.get("State"), - "calc_err": mysql_row.get("calcerr", 0), "measurements": measurements, "created_at": mysql_row.get("created_at"), - "updated_at": mysql_row.get("updated_at"), } return pg_row @@ -312,6 +335,79 @@ class DataTransformer: return consolidated_rows + @staticmethod + def consolidate_elabdatadisp_batch( + rows: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Consolidate ELABDATADISP rows by (unit_name, tool_name_id, event_timestamp). + + Groups multiple nodes with the same key into a single row with measurements + keyed by node number, including state and calc_err for each node. + Uses MAX(id) as the consolidated row ID for proper resume. + + Args: + rows: List of row dictionaries from MySQL, ordered by + (UnitName, ToolNameID, EventDate, EventTime, NodeNum) + + Returns: + List of consolidated row dictionaries ready for transformation + """ + if not rows: + return [] + + # Group rows by consolidation key + groups = {} + group_order = [] # Track order of first appearance + + for row in rows: + # Build consolidation key + unit_name = row.get("UnitName") + tool_name_id = row["ToolNameID"] + event_date = row.get("EventDate") + event_time = row.get("EventTime") + + # Create a hashable key + key = (unit_name, tool_name_id, event_date, event_time) + + if key not in groups: + groups[key] = [] + group_order.append(key) + + groups[key].append(row) + + # Transform each group into a consolidated row + consolidated_rows = [] + + for key in group_order: + group_rows = groups[key] + + # Build consolidated measurements with nodes as keys + consolidated_measurements = {} + + for row in group_rows: + node_num = row.get("NodeNum") + node_measurements = DataTransformer._build_measurement_for_elabdatadisp_node(row) + # Store measurements with node number as key + consolidated_measurements[str(node_num)] = node_measurements + + # Use the row with minimum id as template for other fields + min_id_row = min(group_rows, key=lambda r: r["idElabData"]) + # Use the row with maximum id for the consolidated row ID (for proper resume) + max_id_row = max(group_rows, key=lambda r: r["idElabData"]) + + # Create consolidated row with pre-built measurements + consolidated_row = DataTransformer.transform_elabdatadisp_row( + min_id_row, + measurements=consolidated_measurements + ) + + # Update id to MAX(id) of the group (represents last MySQL row processed) + consolidated_row["id_elab_data"] = max_id_row["idElabData"] + + consolidated_rows.append(consolidated_row) + + return consolidated_rows + @staticmethod def transform_batch( table: str, @@ -323,7 +419,7 @@ class DataTransformer: Args: table: Table name ('RAWDATACOR' or 'ELABDATADISP') rows: List of row dictionaries from MySQL - consolidate: If True and table is RAWDATACOR, consolidate nodes + consolidate: If True, consolidate nodes (for both RAWDATACOR and ELABDATADISP) Returns: List of transformed row dictionaries for PostgreSQL @@ -338,10 +434,14 @@ class DataTransformer: for row in rows ] elif table == "ELABDATADISP": - return [ - DataTransformer.transform_elabdatadisp_row(row) - for row in rows - ] + if consolidate: + # Consolidate rows by key first, then they're already transformed + return DataTransformer.consolidate_elabdatadisp_batch(rows) + else: + return [ + DataTransformer.transform_elabdatadisp_row(row) + for row in rows + ] else: raise ValueError(f"Unknown table: {table}") diff --git a/src/transformers/schema_transformer.py b/src/transformers/schema_transformer.py index b51d4ca..768813c 100644 --- a/src/transformers/schema_transformer.py +++ b/src/transformers/schema_transformer.py @@ -82,17 +82,14 @@ def create_elabdatadisp_schema() -> str: CREATE SEQUENCE IF NOT EXISTS elabdatadisp_id_seq; -- Create ELABDATADISP table with partitioning +-- Note: node_num, state, and calc_err are stored in measurements JSONB, not as separate columns CREATE TABLE IF NOT EXISTS elabdatadisp ( id_elab_data BIGINT NOT NULL DEFAULT nextval('elabdatadisp_id_seq'), unit_name VARCHAR(32), tool_name_id VARCHAR(32) NOT NULL, - node_num INTEGER NOT NULL, event_timestamp TIMESTAMP NOT NULL, - state VARCHAR(32), - calc_err INTEGER DEFAULT 0, measurements JSONB, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp)); -- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints @@ -119,8 +116,8 @@ CREATE TABLE IF NOT EXISTS elabdatadisp_default # Add indexes sql += """ -- Create indexes -CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_elab - ON elabdatadisp(unit_name, tool_name_id, node_num, event_timestamp); +CREATE INDEX IF NOT EXISTS idx_unit_tool_datetime_elab + ON elabdatadisp(unit_name, tool_name_id, event_timestamp); CREATE INDEX IF NOT EXISTS idx_unit_tool_elab ON elabdatadisp(unit_name, tool_name_id);