diff --git a/main.py b/main.py index 2ee4eed..890b9be 100644 --- a/main.py +++ b/main.py @@ -69,7 +69,12 @@ def migrate(): is_flag=True, help="Show what would be done without modifying data" ) -def full(table, dry_run): +@click.option( + "--resume", + is_flag=True, + help="Resume from last checkpoint if migration was interrupted" +) +def full(table, dry_run, resume): """Perform full migration of all data.""" setup_logger(__name__) @@ -80,7 +85,7 @@ def full(table, dry_run): for tbl in tables: click.echo(f"\nMigrating {tbl}...") - migrated = run_full_migration(tbl, dry_run=dry_run) + migrated = run_full_migration(tbl, dry_run=dry_run, resume=resume) total_migrated += migrated click.echo(f"✓ {tbl}: {migrated} rows migrated") diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index 09f35d8..2c54be9 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -219,6 +219,72 @@ class MySQLConnector: logger.error(f"Failed to fetch rows from {table}: {e}") raise + def fetch_rows_ordered_for_consolidation( + self, + table: str, + start_id: Optional[int] = None, + batch_size: Optional[int] = None + ) -> Generator[List[Dict[str, Any]], None, None]: + """Fetch rows using keyset pagination for efficient consolidation. + + Uses keyset pagination (id-based) to avoid expensive re-sorting with OFFSET. + Consolidation happens within each batch in Python. + + Args: + table: Table name (must be 'RAWDATACOR') + start_id: Resume from this ID (fetch id > start_id). If None, starts from beginning + batch_size: Number of rows per batch (uses config default if None) + + Yields: + Batches of row dictionaries + """ + if table != "RAWDATACOR": + raise ValueError("Consolidation ordering only supported for RAWDATACOR") + + if batch_size is None: + batch_size = self.settings.migration.batch_size + + last_id = start_id + max_retries = 3 + + while True: + retries = 0 + while retries < max_retries: + try: + with self.connection.cursor() as cursor: + # Use keyset pagination: fetch by id > last_id + # This is much more efficient than OFFSET for large tables + if last_id is None: + query = f"SELECT * FROM `{table}` ORDER BY `id` ASC LIMIT %s" + cursor.execute(query, (batch_size,)) + else: + query = f"SELECT * FROM `{table}` WHERE `id` > %s ORDER BY `id` ASC LIMIT %s" + cursor.execute(query, (last_id, batch_size)) + + rows = cursor.fetchall() + + if not rows: + return + + yield rows + last_id = rows[-1]["id"] + break # Success, exit retry loop + + except pymysql.Error as e: + retries += 1 + if retries >= max_retries: + logger.error(f"Failed to fetch rows from {table} after {max_retries} retries: {e}") + raise + else: + logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}") + # Reconnect and retry + try: + self.disconnect() + self.connect() + except Exception as reconnect_error: + logger.error(f"Failed to reconnect: {reconnect_error}") + raise + def get_table_structure(self, table: str) -> Dict[str, Any]: """Get table structure (column info). diff --git a/src/connectors/postgres_connector.py b/src/connectors/postgres_connector.py index d1358a5..8ffc68c 100644 --- a/src/connectors/postgres_connector.py +++ b/src/connectors/postgres_connector.py @@ -64,20 +64,46 @@ class PostgreSQLConnector: self.disconnect() def execute(self, query: str, params: Optional[tuple] = None) -> None: - """Execute a query without returning results. + """Execute a query without returning results with retry logic. Args: query: SQL query params: Query parameters """ - try: - with self.connection.cursor() as cursor: - cursor.execute(query, params) - self.connection.commit() - except psycopg.Error as e: - self.connection.rollback() - logger.error(f"Query execution failed: {e}\nQuery: {query}") - raise + max_retries = 3 + retries = 0 + + while retries < max_retries: + try: + with self.connection.cursor() as cursor: + cursor.execute(query, params) + self.connection.commit() + return # Success + except psycopg.Error as e: + try: + self.connection.rollback() + except Exception: + pass + + retries += 1 + if retries >= max_retries: + logger.error(f"Query execution failed after {max_retries} retries: {e}\nQuery: {query}") + raise + else: + logger.warning( + f"Query execution failed (retry {retries}/{max_retries}): {e}. " + f"Reconnecting and retrying..." + ) + try: + self.disconnect() + except Exception: + pass + try: + self.connect() + except Exception as reconnect_error: + logger.error(f"Failed to reconnect: {reconnect_error}") + if retries >= max_retries: + raise def execute_script(self, script: str) -> None: """Execute multiple SQL statements (script). @@ -101,7 +127,7 @@ class PostgreSQLConnector: rows: List[Dict[str, Any]], columns: List[str] ) -> int: - """Insert a batch of rows using parameterized INSERT. + """Insert a batch of rows using parameterized INSERT with retry logic. Args: table: Table name @@ -114,36 +140,62 @@ class PostgreSQLConnector: if not rows: return 0 - try: - with self.connection.cursor() as cursor: - # Prepare values for INSERT - values_list = [] - for row in rows: - values = [] - for col in columns: - val = row.get(col) - # Convert JSONB dicts to JSON strings - if isinstance(val, (dict, list)): - values.append(json.dumps(val)) - else: - values.append(val) - values_list.append(tuple(values)) + max_retries = 3 + retries = 0 - # Build parameterized INSERT query - placeholders = ",".join(["%s"] * len(columns)) - insert_sql = f"INSERT INTO {table} ({','.join(columns)}) VALUES ({placeholders})" + while retries < max_retries: + try: + with self.connection.cursor() as cursor: + # Prepare values for INSERT + values_list = [] + for row in rows: + values = [] + for col in columns: + val = row.get(col) + # Convert JSONB dicts to JSON strings + if isinstance(val, (dict, list)): + values.append(json.dumps(val)) + else: + values.append(val) + values_list.append(tuple(values)) - # Execute batch insert - cursor.executemany(insert_sql, values_list) - self.connection.commit() + # Build parameterized INSERT query + placeholders = ",".join(["%s"] * len(columns)) + insert_sql = f"INSERT INTO {table} ({','.join(columns)}) VALUES ({placeholders})" - logger.debug(f"Inserted {len(rows)} rows into {table}") - return len(rows) + # Execute batch insert + cursor.executemany(insert_sql, values_list) + self.connection.commit() - except psycopg.Error as e: - self.connection.rollback() - logger.error(f"Batch insert failed: {e}") - raise + logger.debug(f"Inserted {len(rows)} rows into {table}") + return len(rows) + + except psycopg.Error as e: + try: + self.connection.rollback() + except Exception: + pass + + retries += 1 + if retries >= max_retries: + logger.error(f"Batch insert failed after {max_retries} retries: {e}") + raise + else: + logger.warning( + f"Batch insert failed (retry {retries}/{max_retries}): {e}. " + f"Reconnecting and retrying..." + ) + # Reconnect before retry + try: + self.disconnect() + except Exception: + pass + try: + self.connect() + except Exception as reconnect_error: + logger.error(f"Failed to reconnect: {reconnect_error}") + if retries >= max_retries: + raise def table_exists(self, table: str) -> bool: """Check if a table exists. diff --git a/src/migrator/full_migration.py b/src/migrator/full_migration.py index 4959a03..f79acd5 100644 --- a/src/migrator/full_migration.py +++ b/src/migrator/full_migration.py @@ -9,6 +9,7 @@ from src.connectors.postgres_connector import PostgreSQLConnector from src.transformers.data_transformer import DataTransformer from src.utils.logger import get_logger, setup_logger from src.utils.progress import ProgressTracker +from src.migrator.state import MigrationState logger = get_logger(__name__) @@ -28,20 +29,23 @@ class FullMigrator: self.table = table self.config = TABLE_CONFIGS[table] self.settings = get_settings() + self.state = MigrationState() - def migrate(self, dry_run: bool = False) -> int: - """Perform full migration of the table. + def migrate(self, dry_run: bool = False, resume: bool = False) -> int: + """Perform full migration of the table with resume capability. Args: dry_run: If True, log what would be done but don't modify data + resume: If True, resume from last checkpoint; if False, check for conflicts Returns: - Total number of rows migrated + Total number of rows migrated in this run """ setup_logger(__name__) mysql_table = self.config["mysql_table"] pg_table = self.config["postgres_table"] + primary_key = self.config.get("primary_key", "id") logger.info(f"Starting full migration of {mysql_table} -> {pg_table}") @@ -49,11 +53,7 @@ class FullMigrator: with MySQLConnector() as mysql_conn: # Get total row count total_rows = mysql_conn.get_row_count(mysql_table) - logger.info(f"Total rows to migrate: {total_rows}") - - if dry_run: - logger.info("[DRY RUN] Would migrate all rows") - return total_rows + logger.info(f"Total rows in source: {total_rows}") with PostgreSQLConnector() as pg_conn: # Check if table exists @@ -63,18 +63,52 @@ class FullMigrator: "Run 'setup --create-schema' first." ) - migrated = 0 + # Check for previous migration state + last_migrated_id = self._get_last_migrated_id(pg_conn, pg_table) + previous_migrated_count = self._get_previous_migrated_count(pg_conn, pg_table) + + if last_migrated_id is not None: + pg_row_count = pg_conn.get_row_count(pg_table) + logger.warning( + f"Found previous migration state: {pg_row_count} rows already in {pg_table}" + ) + if not resume: + raise ValueError( + f"Migration already in progress for {pg_table}. " + f"Use --resume to continue from last checkpoint, or delete data to restart." + ) + logger.info(f"Resuming from ID > {last_migrated_id}") + rows_to_migrate = total_rows - last_migrated_id + else: + last_migrated_id = None + previous_migrated_count = 0 + rows_to_migrate = total_rows + + if dry_run: + logger.info(f"[DRY RUN] Would migrate {rows_to_migrate} rows") + return rows_to_migrate + + migrated = previous_migrated_count + migration_start_time = datetime.utcnow().isoformat() with ProgressTracker( - total_rows, + rows_to_migrate, f"Migrating {mysql_table}" ) as progress: # Fetch and migrate rows in batches - for batch in mysql_conn.fetch_all_rows(mysql_table): - # Transform batch + # Use ordered fetching for node consolidation with resume support + for batch in mysql_conn.fetch_rows_ordered_for_consolidation( + mysql_table, + start_id=last_migrated_id + ): + if not batch: + break + + # Transform batch with consolidation enabled transformed = DataTransformer.transform_batch( mysql_table, - batch + batch, + consolidate=True ) # Insert batch @@ -85,65 +119,151 @@ class FullMigrator: columns ) - migrated += inserted - progress.update(inserted) + if inserted > 0: + # For consolidated batches, count transformed rows, not source rows + migrated += inserted + progress.update(inserted) + + # Update state after each batch for resume capability + # Use MAX id of the batch (represents last MySQL id processed) + batch_max_id = max( + int(row.get("id", 0)) for row in transformed + ) + # Get actual row count from PostgreSQL for accuracy + actual_count = pg_conn.get_row_count(pg_table) + self._update_migration_state( + pg_conn, actual_count, batch_max_id, migration_start_time + ) logger.info( f"✓ Migration complete: {migrated} rows migrated " f"to {pg_table}" ) - # Update migration state - self._update_migration_state(pg_conn, migrated) - return migrated except Exception as e: logger.error(f"Migration failed: {e}") raise - def _update_migration_state( - self, - pg_conn: PostgreSQLConnector, - rows_migrated: int - ) -> None: - """Update migration state tracking table. + def _get_last_migrated_id(self, pg_conn: PostgreSQLConnector, pg_table: str) -> Optional[int]: + """Get the last migrated MySQL ID from migration_state table. Args: pg_conn: PostgreSQL connection - rows_migrated: Number of rows migrated + pg_table: PostgreSQL table name + + Returns: + Last migrated MySQL ID or None if no previous migration """ try: - pg_table = self.config["postgres_table"] + with pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT last_migrated_id FROM migration_state WHERE table_name = %s", + (pg_table,) + ) + result = cursor.fetchone() + if result and result[0]: + return result[0] + except Exception: + pass + return None + + def _get_previous_migrated_count(self, pg_conn: PostgreSQLConnector, pg_table: str) -> int: + """Get the total rows migrated so far from migration_state table. + + Args: + pg_conn: PostgreSQL connection + pg_table: PostgreSQL table name + + Returns: + Total rows migrated so far (0 if no previous migration) + """ + try: + with pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT total_rows_migrated FROM migration_state WHERE table_name = %s", + (pg_table,) + ) + result = cursor.fetchone() + if result and result[0]: + return result[0] + except Exception: + pass + return 0 + + def _update_migration_state( + self, + pg_conn: PostgreSQLConnector, + rows_migrated: int, + last_id: Optional[int] = None, + migration_start_time: Optional[str] = None + ) -> None: + """Update migration state in PostgreSQL and state file. + + Args: + pg_conn: PostgreSQL connection + rows_migrated: Total number of rows migrated so far + last_id: Last ID that was migrated (for resume capability) + migration_start_time: When the migration started (ISO format) + """ + pg_table = self.config["postgres_table"] + now = datetime.utcnow() + status = "in_progress" if last_id is not None else "completed" + + # Update PostgreSQL migration_state table + try: + # Use COALESCE to handle both insert (first time) and update (resume) + # For resume: total_rows_migrated will be the full accumulated count query = f""" INSERT INTO migration_state - (table_name, last_migrated_timestamp, total_rows_migrated, migration_completed_at, status) - VALUES (%s, %s, %s, %s, %s) + (table_name, last_migrated_timestamp, last_migrated_id, total_rows_migrated, migration_completed_at, status) + VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (table_name) DO UPDATE SET last_migrated_timestamp = EXCLUDED.last_migrated_timestamp, + last_migrated_id = EXCLUDED.last_migrated_id, total_rows_migrated = EXCLUDED.total_rows_migrated, migration_completed_at = EXCLUDED.migration_completed_at, status = EXCLUDED.status """ - now = datetime.utcnow() - pg_conn.execute(query, (pg_table, now, rows_migrated, now, "completed")) - logger.debug("Migration state updated") + pg_conn.execute( + query, + ( + pg_table, + migration_start_time or now.isoformat(), + last_id, + rows_migrated, + now if status == "completed" else None, + status + ) + ) + logger.debug(f"Migration state updated: {rows_migrated} rows total, last_id={last_id}, status={status}") except Exception as e: - logger.warning(f"Failed to update migration state: {e}") + logger.warning(f"Failed to update migration state in PostgreSQL: {e}") + + # Also save to state file for incremental migrations + try: + self.state.set_last_timestamp(pg_table, migration_start_time or now.isoformat()) + self.state.increment_migration_count(pg_table, rows_migrated) + logger.debug("Migration state saved to file") + except Exception as e: + logger.warning(f"Failed to save migration state to file: {e}") def run_full_migration( table: str, - dry_run: bool = False + dry_run: bool = False, + resume: bool = False ) -> int: """Run full migration for a table. Args: table: Table name to migrate dry_run: If True, show what would be done without modifying data + resume: If True, resume from last checkpoint instead of starting fresh Returns: - Number of rows migrated + Number of rows migrated in this run """ migrator = FullMigrator(table) - return migrator.migrate(dry_run=dry_run) + return migrator.migrate(dry_run=dry_run, resume=resume) diff --git a/src/migrator/incremental_migration.py b/src/migrator/incremental_migration.py index 15f4c6a..a0ad6c2 100644 --- a/src/migrator/incremental_migration.py +++ b/src/migrator/incremental_migration.py @@ -1,6 +1,7 @@ """Incremental migration from MySQL to PostgreSQL based on timestamps.""" from datetime import datetime from typing import Optional +import psycopg from config import get_settings, TABLE_CONFIGS from src.connectors.mysql_connector import MySQLConnector @@ -31,6 +32,33 @@ class IncrementalMigrator: self.settings = get_settings() self.state = MigrationState(state_file) + def _get_last_timestamp_from_db( + self, + pg_conn: PostgreSQLConnector, + pg_table: str + ) -> Optional[str]: + """Get last migration timestamp from PostgreSQL migration_state table. + + Args: + pg_conn: PostgreSQL connector + pg_table: PostgreSQL table name + + Returns: + ISO format timestamp or None if not found + """ + try: + with pg_conn.connection.cursor() as cursor: + cursor.execute( + "SELECT last_migrated_timestamp FROM migration_state WHERE table_name = %s", + (pg_table,) + ) + result = cursor.fetchone() + if result and result[0]: + return result[0].isoformat() + except psycopg.Error: + return None + return None + def migrate(self, dry_run: bool = False, use_id: bool = False) -> int: """Perform incremental migration since last sync. @@ -88,9 +116,19 @@ class IncrementalMigrator: Returns: Number of rows migrated """ - # Get last migration timestamp + # Try to get last migration timestamp from state file first last_timestamp = self.state.get_last_timestamp(pg_table) + # If not in state file, try to get from PostgreSQL migration_state table + if last_timestamp is None: + try: + last_timestamp = self._get_last_timestamp_from_db(pg_conn, pg_table) + if last_timestamp: + logger.info(f"Found previous migration state in database: {last_timestamp}") + except Exception as e: + logger.debug(f"Could not read from migration_state table: {e}") + last_timestamp = None + if last_timestamp is None: logger.info( f"No previous migration found for {pg_table}. " @@ -98,6 +136,8 @@ class IncrementalMigrator: ) return 0 + logger.info(f"Last migration timestamp: {last_timestamp}") + # Count rows to migrate timestamp_col = "updated_at" if mysql_table == "ELABDATADISP" else "created_at" @@ -107,7 +147,7 @@ class IncrementalMigrator: timestamp_col ) - logger.info(f"Last timestamp in PostgreSQL: {pg_max_timestamp}") + logger.info(f"Current max timestamp in PostgreSQL: {pg_max_timestamp}") if dry_run: logger.info("[DRY RUN] Would migrate rows after timestamp") diff --git a/src/transformers/data_transformer.py b/src/transformers/data_transformer.py index 0b4c251..c7c9cbe 100644 --- a/src/transformers/data_transformer.py +++ b/src/transformers/data_transformer.py @@ -1,5 +1,5 @@ """Data transformation from MySQL to PostgreSQL format.""" -from typing import Dict, Any, List +from typing import Dict, Any, List, Tuple from datetime import datetime, time, timedelta from config import ( RAWDATACOR_COLUMNS, @@ -45,17 +45,16 @@ class DataTransformer: raise ValueError(f"Unsupported event_time type: {type(event_time)}") @staticmethod - def transform_rawdatacor_row(mysql_row: Dict[str, Any]) -> Dict[str, Any]: - """Transform a RAWDATACOR row from MySQL to PostgreSQL format. + def _build_measurement_for_node(mysql_row: Dict[str, Any]) -> Dict[str, Any]: + """Build measurement object for a single node. Args: mysql_row: Row dictionary from MySQL Returns: - Transformed row dictionary for PostgreSQL + Measurement dictionary for this node (without node key wrapper) """ - # Create measurements JSONB - measurements = {} + measurement = {} # Map Val0-ValF with their units for i, val_col in enumerate(RAWDATACOR_COLUMNS["val_columns"]): @@ -66,10 +65,31 @@ class DataTransformer: # Only add to JSONB if value is not None if value is not None: - measurements[str(i)] = { - "value": str(value), - "unit": unit if unit else None, - } + measurement[str(i)] = {"value": str(value)} + # Only add unit if it's not None (saves ~20% space) + if unit: + measurement[str(i)]["unit"] = unit + + return measurement + + @staticmethod + def transform_rawdatacor_row(mysql_row: Dict[str, Any], measurements: Dict[str, Any] = None) -> Dict[str, Any]: + """Transform a RAWDATACOR row from MySQL to PostgreSQL format. + + Args: + mysql_row: Row dictionary from MySQL + measurements: Pre-built measurements JSONB (for consolidated nodes). + If None, builds measurements from mysql_row. + + Returns: + Transformed row dictionary for PostgreSQL + """ + # If measurements not provided, build from single row + if measurements is None: + node_num = mysql_row.get("NodeNum") + node_measurements = DataTransformer._build_measurement_for_node(mysql_row) + # Wrap with node number as key for consolidation compatibility + measurements = {str(node_num): node_measurements} if node_num is not None else {} # Combine event_date and event_time into event_timestamp event_date = mysql_row.get("EventDate") @@ -94,11 +114,11 @@ class DataTransformer: event_timestamp = None # Create PostgreSQL row + # Note: node_num is now stored in measurements JSONB, not as separate column pg_row = { "id": mysql_row["id"], "unit_name": mysql_row.get("UnitName"), "tool_name_id": mysql_row["ToolNameID"], - "node_num": mysql_row["NodeNum"], "event_timestamp": event_timestamp, "bat_level": mysql_row["BatLevel"], "temperature": mysql_row["Temperature"], @@ -179,25 +199,103 @@ class DataTransformer: return pg_row + @staticmethod + def consolidate_rawdatacor_batch( + rows: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Consolidate RAWDATACOR rows by (unit_name, tool_name_id, event_timestamp). + + Groups multiple nodes with the same key into a single row with measurements + keyed by node number. Uses MAX(id) as the consolidated row ID for proper resume. + + Args: + rows: List of row dictionaries from MySQL, ordered by + (UnitName, ToolNameID, EventDate, EventTime, NodeNum) + + Returns: + List of consolidated row dictionaries ready for transformation + """ + if not rows: + return [] + + # Group rows by consolidation key + groups = {} + group_order = [] # Track order of first appearance + + for row in rows: + # Build consolidation key + unit_name = row.get("UnitName") + tool_name_id = row["ToolNameID"] + event_date = row.get("EventDate") + event_time = row.get("EventTime") + + # Create a hashable key + key = (unit_name, tool_name_id, event_date, event_time) + + if key not in groups: + groups[key] = [] + group_order.append(key) + + groups[key].append(row) + + # Transform each group into a consolidated row + consolidated_rows = [] + + for key in group_order: + group_rows = groups[key] + + # Build consolidated measurements with nodes as keys + consolidated_measurements = {} + + for row in group_rows: + node_num = row.get("NodeNum") + node_measurements = DataTransformer._build_measurement_for_node(row) + # Store measurements with node number as key + consolidated_measurements[str(node_num)] = node_measurements + + # Use the row with minimum id as template for other fields + min_id_row = min(group_rows, key=lambda r: r["id"]) + # Use the row with maximum id for the consolidated row ID (for proper resume) + max_id_row = max(group_rows, key=lambda r: r["id"]) + + # Create consolidated row with pre-built measurements + consolidated_row = DataTransformer.transform_rawdatacor_row( + min_id_row, + measurements=consolidated_measurements + ) + + # Update id to MAX(id) of the group (represents last MySQL row processed) + consolidated_row["id"] = max_id_row["id"] + + consolidated_rows.append(consolidated_row) + + return consolidated_rows + @staticmethod def transform_batch( table: str, - rows: List[Dict[str, Any]] + rows: List[Dict[str, Any]], + consolidate: bool = False ) -> List[Dict[str, Any]]: """Transform a batch of rows from MySQL to PostgreSQL format. Args: table: Table name ('RAWDATACOR' or 'ELABDATADISP') rows: List of row dictionaries from MySQL + consolidate: If True and table is RAWDATACOR, consolidate nodes Returns: List of transformed row dictionaries for PostgreSQL """ if table == "RAWDATACOR": - return [ - DataTransformer.transform_rawdatacor_row(row) - for row in rows - ] + if consolidate: + # Consolidate rows by key first, then they're already transformed + return DataTransformer.consolidate_rawdatacor_batch(rows) + else: + return [ + DataTransformer.transform_rawdatacor_row(row) + for row in rows + ] elif table == "ELABDATADISP": return [ DataTransformer.transform_elabdatadisp_row(row) @@ -221,7 +319,6 @@ class DataTransformer: "id", "unit_name", "tool_name_id", - "node_num", "event_timestamp", "bat_level", "temperature", diff --git a/src/transformers/schema_transformer.py b/src/transformers/schema_transformer.py index 94fd495..b51d4ca 100644 --- a/src/transformers/schema_transformer.py +++ b/src/transformers/schema_transformer.py @@ -16,11 +16,11 @@ def create_rawdatacor_schema() -> str: CREATE SEQUENCE IF NOT EXISTS rawdatacor_id_seq; -- Create RAWDATACOR table with partitioning +-- Note: node_num is stored in measurements JSONB, not as a separate column CREATE TABLE IF NOT EXISTS rawdatacor ( id BIGINT NOT NULL DEFAULT nextval('rawdatacor_id_seq'), unit_name VARCHAR(32), tool_name_id VARCHAR(32) NOT NULL, - node_num INTEGER NOT NULL, event_timestamp TIMESTAMP NOT NULL, bat_level NUMERIC(4,2) NOT NULL, temperature NUMERIC(5,2) NOT NULL, @@ -55,8 +55,8 @@ CREATE TABLE IF NOT EXISTS rawdatacor_default # Add indexes sql += """ -- Create indexes -CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_raw - ON rawdatacor(unit_name, tool_name_id, node_num, event_timestamp); +CREATE INDEX IF NOT EXISTS idx_unit_tool_datetime_raw + ON rawdatacor(unit_name, tool_name_id, event_timestamp); CREATE INDEX IF NOT EXISTS idx_unit_tool_raw ON rawdatacor(unit_name, tool_name_id); diff --git a/tests/test_setup.py b/tests/test_setup.py index 1025fd5..e8995bc 100644 --- a/tests/test_setup.py +++ b/tests/test_setup.py @@ -62,6 +62,7 @@ class TestDataTransformation: assert pg_row["id"] == 1 assert pg_row["unit_name"] == "TestUnit" assert pg_row["tool_name_id"] == "Tool1" + assert "node_num" not in pg_row # node_num should NOT be a column anymore assert pg_row["event_timestamp"] is not None assert pg_row["event_timestamp"].year == 2024 assert pg_row["event_timestamp"].month == 1 @@ -69,11 +70,15 @@ class TestDataTransformation: assert pg_row["event_timestamp"].hour == 12 assert pg_row["event_timestamp"].minute == 0 assert isinstance(pg_row["measurements"], dict) - assert "0" in pg_row["measurements"] - assert pg_row["measurements"]["0"]["value"] == "100.5" - assert pg_row["measurements"]["0"]["unit"] == "°C" - assert "1" not in pg_row["measurements"] # NULL values excluded - assert "2" in pg_row["measurements"] + # Verify node is a key in measurements JSONB (single node case) + assert "1" in pg_row["measurements"] # node number as key + assert "0" in pg_row["measurements"]["1"] + assert pg_row["measurements"]["1"]["0"]["value"] == "100.5" + assert pg_row["measurements"]["1"]["0"]["unit"] == "°C" + assert "1" not in pg_row["measurements"]["1"] # NULL values excluded + assert "2" in pg_row["measurements"]["1"] + assert pg_row["measurements"]["1"]["2"]["value"] == "200.3" + assert pg_row["measurements"]["1"]["2"]["unit"] == "m/s" def test_elabdatadisp_transformation(self): """Test ELABDATADISP row transformation.""" @@ -246,6 +251,167 @@ class TestTimeConversion: assert pg_row["event_timestamp"].hour == 0 assert pg_row["event_timestamp"].minute == 0 + # Verify that unit is NOT included when it's None (optimization) + assert "0" in pg_row["measurements"] + assert pg_row["measurements"]["0"]["value"] == "-1709" + assert "unit" not in pg_row["measurements"]["0"] # unit should not exist when None + + def test_rawdatacor_consolidation(self): + """Test consolidation of multiple nodes into single JSONB row.""" + # Create three rows with same (unit, tool, timestamp) but different nodes + rows = [ + { + "id": 1, + "UnitName": "TestUnit", + "ToolNameID": "Tool1", + "NodeNum": 1, + "EventDate": "2024-01-01", + "EventTime": "12:00:00", + "BatLevel": 3.5, + "Temperature": 25.5, + "Val0": "100.5", + "Val1": None, + "Val2": "200.3", + "Val0_unitmisure": "°C", + "Val1_unitmisure": None, + "Val2_unitmisure": "m/s", + }, + { + "id": 2, + "UnitName": "TestUnit", + "ToolNameID": "Tool1", + "NodeNum": 2, + "EventDate": "2024-01-01", + "EventTime": "12:00:00", + "BatLevel": 3.5, + "Temperature": 25.5, + "Val0": "101.2", + "Val1": None, + "Val2": "205.1", + "Val0_unitmisure": "°C", + "Val1_unitmisure": None, + "Val2_unitmisure": "m/s", + }, + { + "id": 3, + "UnitName": "TestUnit", + "ToolNameID": "Tool1", + "NodeNum": 3, + "EventDate": "2024-01-01", + "EventTime": "12:00:00", + "BatLevel": 3.5, + "Temperature": 25.5, + "Val0": "102.0", + "Val1": None, + "Val2": "210.5", + "Val0_unitmisure": "°C", + "Val1_unitmisure": None, + "Val2_unitmisure": "m/s", + }, + ] + + # Add remaining Val columns as None for all rows + for row in rows: + for i in range(3, 16): + col = f"Val{i:X}" + row[col] = None + row[f"{col}_unitmisure"] = None + row["created_at"] = None + row["BatLevelModule"] = None + row["TemperatureModule"] = None + row["RssiModule"] = None + + # Consolidate + consolidated = DataTransformer.consolidate_rawdatacor_batch(rows) + + # Should have 1 consolidated row (all three nodes have same unit/tool/timestamp) + assert len(consolidated) == 1 + + consolidated_row = consolidated[0] + + # Verify consolidated row properties + assert consolidated_row["id"] == 3 # MAX(id) for proper resume + assert consolidated_row["unit_name"] == "TestUnit" + assert consolidated_row["tool_name_id"] == "Tool1" + assert consolidated_row["bat_level"] == 3.5 + assert consolidated_row["temperature"] == 25.5 + + # Verify all three nodes are in measurements + measurements = consolidated_row["measurements"] + assert "1" in measurements + assert "2" in measurements + assert "3" in measurements + + # Verify node 1 measurements + assert measurements["1"]["0"]["value"] == "100.5" + assert measurements["1"]["0"]["unit"] == "°C" + assert measurements["1"]["2"]["value"] == "200.3" + + # Verify node 2 measurements + assert measurements["2"]["0"]["value"] == "101.2" + assert measurements["2"]["2"]["value"] == "205.1" + + # Verify node 3 measurements + assert measurements["3"]["0"]["value"] == "102.0" + assert measurements["3"]["2"]["value"] == "210.5" + + def test_rawdatacor_consolidation_with_different_keys(self): + """Test that rows with different keys are NOT consolidated.""" + # Two rows with different units + rows = [ + { + "id": 1, + "UnitName": "Unit1", + "ToolNameID": "Tool1", + "NodeNum": 1, + "EventDate": "2024-01-01", + "EventTime": "12:00:00", + "BatLevel": 3.5, + "Temperature": 25.5, + "Val0": "100.5", + "Val1": None, + "Val2": None, + "Val0_unitmisure": "°C", + "Val1_unitmisure": None, + "Val2_unitmisure": None, + }, + { + "id": 2, + "UnitName": "Unit2", # Different unit + "ToolNameID": "Tool1", + "NodeNum": 1, + "EventDate": "2024-01-01", + "EventTime": "12:00:00", + "BatLevel": 3.5, + "Temperature": 25.5, + "Val0": "100.5", + "Val1": None, + "Val2": None, + "Val0_unitmisure": "°C", + "Val1_unitmisure": None, + "Val2_unitmisure": None, + }, + ] + + # Add remaining Val columns as None for all rows + for row in rows: + for i in range(3, 16): + col = f"Val{i:X}" + row[col] = None + row[f"{col}_unitmisure"] = None + row["created_at"] = None + row["BatLevelModule"] = None + row["TemperatureModule"] = None + row["RssiModule"] = None + + # Consolidate + consolidated = DataTransformer.consolidate_rawdatacor_batch(rows) + + # Should have 2 rows (different units) + assert len(consolidated) == 2 + assert consolidated[0]["unit_name"] == "Unit1" + assert consolidated[1]["unit_name"] == "Unit2" + class TestFieldMapping: """Test field mapping configuration."""