From b09cfcf9df4daae51ff2a78fa46df3a4c57dc6d8 Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 21 Dec 2025 09:53:34 +0100 Subject: [PATCH] fix: Add timeout settings and retry logic to MySQL connector Configuration improvements: - Set read_timeout=300 (5 minutes) to handle long queries - Set write_timeout=300 (5 minutes) for writes - Set max_allowed_packet=64MB to handle larger data transfers Retry logic: - Added retry mechanism with max 3 retries on fetch failure - Auto-reconnect on connection loss before retry - Better error messages showing retry attempts This fixes the 'connection is lost' error that occurs during long-running migrations by: 1. Giving MySQL queries more time to complete 2. Allowing larger packet sizes for bulk data 3. Automatically recovering from connection drops Fixes: 'Connection is lost' error during full migration --- MIGRATION_WORKFLOW.md | 245 +++++++++++++++++++++ config.py | 4 +- src/benchmark/query_generator.py | 18 +- src/connectors/mysql_connector.py | 96 +++++++-- src/migrator/incremental_migration.py | 281 ++++++++++++++++++------- src/transformers/data_transformer.py | 88 +++++++- src/transformers/schema_transformer.py | 36 ++-- tests/test_setup.py | 112 ++++++++++ 8 files changed, 761 insertions(+), 119 deletions(-) create mode 100644 MIGRATION_WORKFLOW.md diff --git a/MIGRATION_WORKFLOW.md b/MIGRATION_WORKFLOW.md new file mode 100644 index 0000000..2cc52fb --- /dev/null +++ b/MIGRATION_WORKFLOW.md @@ -0,0 +1,245 @@ +# MySQL to PostgreSQL Migration Workflow + +## Overview + +This tool supports three migration modes: + +1. **Full Migration** (`full_migration.py`) - Initial complete migration +2. **Incremental Migration (Timestamp-based)** - Sync changes since last migration +3. **Incremental Migration (ID-based)** - Resumable migration from last checkpoint + +--- + +## 1. Initial Full Migration + +### First Time Setup + +```bash +# Create the PostgreSQL schema +python main.py setup --create-schema + +# Run full migration (one-time) +python main.py migrate --full RAWDATACOR +python main.py migrate --full ELABDATADISP +``` + +**When to use:** First time migrating data or need complete fresh migration. + +**Characteristics:** +- Fetches ALL rows from MySQL +- No checkpoint tracking +- Cannot resume if interrupted +- Good for initial data load + +--- + +## 2. Timestamp-based Incremental Migration + +### For Continuous Sync (Recommended for most cases) + +```bash +# After initial full migration, use incremental with timestamps +python main.py migrate --incremental RAWDATACOR +python main.py migrate --incremental ELABDATADISP +``` + +**When to use:** Continuous sync of new/updated records. + +**Characteristics:** +- Tracks `created_at` (RAWDATACOR) or `updated_at` (ELABDATADISP) +- Uses JSON state file (`migration_state.json`) +- Only fetches rows modified since last run +- Perfect for scheduled jobs (cron, airflow, etc.) +- Syncs changes but NOT deletions + +**How it works:** +1. First run: Returns with message "No previous migration found" - must run full migration first +2. Subsequent runs: Only fetches rows where `created_at` > last_migration_timestamp +3. Updates state file with new timestamp for next run + +**Example workflow:** +```bash +# Day 1: Initial full migration +python main.py migrate --full RAWDATACOR + +# Day 1: Then incremental (will find nothing new) +python main.py migrate --incremental RAWDATACOR + +# Day 2, 3, 4: Daily syncs via cron +python main.py migrate --incremental RAWDATACOR +``` + +--- + +## 3. ID-based Incremental Migration (Resumable) + +### For Large Datasets or Unreliable Connections + +```bash +# First run +python main.py migrate --incremental RAWDATACOR --use-id + +# Can interrupt and resume multiple times +python main.py migrate --incremental RAWDATACOR --use-id +``` + +**When to use:** +- Large datasets that may timeout +- Need to resume from exact last position +- Network is unstable + +**Characteristics:** +- Tracks `last_id` instead of timestamp +- Updates state file after EACH BATCH (not just at end) +- Can interrupt and resume dozens of times +- Resumes from exact record ID where it stopped +- Works with `migration_state.json` + +**How it works:** +1. First run: Starts from beginning (ID = 0) +2. Each batch: Updates state file with max ID from batch +3. Interrupt: Can stop at any time +4. Resume: Next run continues from last ID stored +5. Continues until all rows processed + +**Example workflow for large dataset:** +```bash +# Start ID-based migration (will migrate in batches) +python main.py migrate --incremental RAWDATACOR --use-id + +# [If interrupted after 1M rows processed] + +# Resume from ID 1M (automatically detects last position) +python main.py migrate --incremental RAWDATACOR --use-id + +# [Continues until complete] +``` + +--- + +## State Management + +### State File Location +``` +migration_state.json # In project root +``` + +### State File Content (Timestamp-based) +```json +{ + "rawdatacor": { + "last_timestamp": "2024-12-11T19:30:45.123456", + "last_updated": "2024-12-11T19:30:45.123456", + "total_migrated": 50000 + } +} +``` + +### State File Content (ID-based) +```json +{ + "rawdatacor": { + "last_id": 1000000, + "total_migrated": 1000000, + "last_updated": "2024-12-11T19:45:30.123456" + } +} +``` + +### Reset Migration State +```python +from src.migrator.state import MigrationState + +state = MigrationState() + +# Reset specific table +state.reset("rawdatacor") + +# Reset all tables +state.reset() +``` + +--- + +## Recommended Workflow + +### For Daily Continuous Sync +```bash +# Week 1: Initial setup +python main.py setup --create-schema +python main.py migrate --full RAWDATACOR +python main.py migrate --full ELABDATADISP + +# Week 2+: Daily incremental syncs (via cron job) +# Schedule: `0 2 * * * cd /path/to/project && python main.py migrate --incremental RAWDATACOR` +python main.py migrate --incremental RAWDATACOR +python main.py migrate --incremental ELABDATADISP +``` + +### For Large Initial Migration +```bash +# If dataset > 10 million rows +python main.py setup --create-schema +python main.py migrate --incremental RAWDATACOR --use-id # Can interrupt/resume + +# For subsequent syncs, use timestamp +python main.py migrate --incremental RAWDATACOR # Timestamp-based +``` + +--- + +## Key Differences at a Glance + +| Feature | Full | Timestamp | ID-based | +|---------|------|-----------|----------| +| Initial setup | ✅ Required first | ✅ After full | ✅ After full | +| Sync new/updated | ❌ No | ✅ Yes | ✅ Yes | +| Resumable | ❌ No | ⚠️ Partial* | ✅ Full | +| Batched state tracking | ❌ No | ❌ No | ✅ Yes | +| Large datasets | ⚠️ Risky | ✅ Good | ✅ Best | +| Scheduled jobs | ❌ No | ✅ Perfect | ⚠️ Unnecessary | + +*Timestamp mode can resume, but must wait for full batch to complete before continuing + +--- + +## Default Partitions + +Both tables are partitioned by year (2014-2031) plus a DEFAULT partition: +- **rawdatacor_2014** through **rawdatacor_2031** (yearly partitions) +- **rawdatacor_default** (catches data outside 2014-2031) + +Same for ELABDATADISP. This ensures data with edge-case timestamps doesn't break migration. + +--- + +## Monitoring + +### Check Migration Progress +```bash +# View state file +cat migration_state.json + +# Check PostgreSQL row counts +psql -U postgres -h localhost -d your_db -c "SELECT COUNT(*) FROM rawdatacor;" +``` + +### Common Issues + +**"No previous migration found"** (Timestamp mode) +- Solution: Run full migration first with `--full` flag + +**"Duplicate key value violates unique constraint"** +- Cause: Running full migration twice +- Solution: Use timestamp-based incremental sync instead + +**"Timeout during migration"** (Large datasets) +- Solution: Switch to ID-based resumable migration with `--use-id` + +--- + +## Summary + +- **Start with:** Full migration (`--full`) for initial data load +- **Then use:** Timestamp-based incremental (`--incremental`) for daily syncs +- **Switch to:** ID-based resumable (`--incremental --use-id`) if full migration is too large diff --git a/config.py b/config.py index c7b3ed6..9fb6721 100644 --- a/config.py +++ b/config.py @@ -156,13 +156,13 @@ _rawdatacor_config = { "mysql_table": "RAWDATACOR", "postgres_table": "rawdatacor", "primary_key": "id", - "partition_key": "event_date", + "partition_key": "event_timestamp", } _elabdatadisp_config = { "mysql_table": "ELABDATADISP", "postgres_table": "elabdatadisp", "primary_key": "idElabData", - "partition_key": "event_date", + "partition_key": "event_timestamp", } TABLE_CONFIGS = { diff --git a/src/benchmark/query_generator.py b/src/benchmark/query_generator.py index 4650dd6..d7df112 100644 --- a/src/benchmark/query_generator.py +++ b/src/benchmark/query_generator.py @@ -25,13 +25,13 @@ class BenchmarkQueryGenerator: "select_by_pk": [ ( "SELECT * FROM `RAWDATACOR` WHERE `id` = 1000 AND `EventDate` = '2024-01-15'", - "SELECT * FROM rawdatacor WHERE id = 1000 AND event_date = '2024-01-15'" + "SELECT * FROM rawdatacor WHERE id = 1000 AND event_timestamp::date = '2024-01-15'" ) ], "select_by_date_range": [ ( f"SELECT * FROM `RAWDATACOR` WHERE `EventDate` BETWEEN '{sample_date_start}' AND '{sample_date_end}'", - f"SELECT * FROM rawdatacor WHERE event_date BETWEEN '{sample_date_start}' AND '{sample_date_end}'" + f"SELECT * FROM rawdatacor WHERE event_timestamp::date BETWEEN '{sample_date_start}' AND '{sample_date_end}'" ) ], "select_by_unit_tool": [ @@ -61,13 +61,13 @@ class BenchmarkQueryGenerator: "aggregate_by_date": [ ( "SELECT `EventDate`, COUNT(*) as count FROM `RAWDATACOR` GROUP BY `EventDate` ORDER BY `EventDate`", - "SELECT event_date, COUNT(*) as count FROM rawdatacor GROUP BY event_date ORDER BY event_date" + "SELECT event_timestamp::date, COUNT(*) as count FROM rawdatacor GROUP BY event_timestamp::date ORDER BY event_timestamp::date" ) ], "aggregate_with_filter": [ ( f"SELECT `UnitName`, `ToolNameID`, COUNT(*) as count FROM `RAWDATACOR` WHERE `EventDate` >= '{sample_date_start}' GROUP BY `UnitName`, `ToolNameID`", - f"SELECT unit_name, tool_name_id, COUNT(*) as count FROM rawdatacor WHERE event_date >= '{sample_date_start}' GROUP BY unit_name, tool_name_id" + f"SELECT unit_name, tool_name_id, COUNT(*) as count FROM rawdatacor WHERE event_timestamp::date >= '{sample_date_start}' GROUP BY unit_name, tool_name_id" ) ], } @@ -90,13 +90,13 @@ class BenchmarkQueryGenerator: "select_by_pk": [ ( "SELECT * FROM `ELABDATADISP` WHERE `idElabData` = 5000 AND `EventDate` = '2024-01-15'", - "SELECT * FROM elabdatadisp WHERE id_elab_data = 5000 AND event_date = '2024-01-15'" + "SELECT * FROM elabdatadisp WHERE id_elab_data = 5000 AND event_timestamp::date = '2024-01-15'" ) ], "select_by_date_range": [ ( f"SELECT * FROM `ELABDATADISP` WHERE `EventDate` BETWEEN '{sample_date_start}' AND '{sample_date_end}'", - f"SELECT * FROM elabdatadisp WHERE event_date BETWEEN '{sample_date_start}' AND '{sample_date_end}'" + f"SELECT * FROM elabdatadisp WHERE event_timestamp::date BETWEEN '{sample_date_start}' AND '{sample_date_end}'" ) ], "select_by_unit_tool": [ @@ -126,7 +126,7 @@ class BenchmarkQueryGenerator: "aggregate_measurements": [ ( None, - f"SELECT unit_name, AVG((measurements->'kinematics'->>'speed')::NUMERIC) as avg_speed FROM elabdatadisp WHERE event_date >= '{sample_date_start}' GROUP BY unit_name LIMIT 100" + f"SELECT unit_name, AVG((measurements->'kinematics'->>'speed')::NUMERIC) as avg_speed FROM elabdatadisp WHERE event_timestamp::date >= '{sample_date_start}' GROUP BY unit_name LIMIT 100" ) ], "count_by_state": [ @@ -150,11 +150,11 @@ class BenchmarkQueryGenerator: queries = { "insert_single_rawdatacor": ( "INSERT INTO `RAWDATACOR` (`UnitName`, `ToolNameID`, `NodeNum`, `EventDate`, `EventTime`, `BatLevel`, `Temperature`) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', 3.5, 25.5)", - "INSERT INTO rawdatacor (unit_name, tool_name_id, node_num, event_date, event_time, bat_level, temperature, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', 3.5, 25.5, '{}')" + "INSERT INTO rawdatacor (unit_name, tool_name_id, node_num, event_timestamp, bat_level, temperature, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01 12:00:00', 3.5, 25.5, '{}')" ), "insert_single_elabdatadisp": ( "INSERT INTO `ELABDATADISP` (`UnitName`, `ToolNameID`, `NodeNum`, `EventDate`, `EventTime`) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00')", - "INSERT INTO elabdatadisp (unit_name, tool_name_id, node_num, event_date, event_time, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', '{}')" + "INSERT INTO elabdatadisp (unit_name, tool_name_id, node_num, event_timestamp, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01 12:00:00', '{}')" ), } diff --git a/src/connectors/mysql_connector.py b/src/connectors/mysql_connector.py index d75477e..09f35d8 100644 --- a/src/connectors/mysql_connector.py +++ b/src/connectors/mysql_connector.py @@ -26,6 +26,9 @@ class MySQLConnector: database=self.settings.mysql.database, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, + read_timeout=300, # 5 minutes read timeout + write_timeout=300, # 5 minutes write timeout + max_allowed_packet=67108864, # 64MB max packet ) logger.info( f"Connected to MySQL: {self.settings.mysql.host}:" @@ -86,22 +89,38 @@ class MySQLConnector: batch_size = self.settings.migration.batch_size offset = 0 + max_retries = 3 + while True: - try: - with self.connection.cursor() as cursor: - query = f"SELECT * FROM `{table}` LIMIT %s OFFSET %s" - cursor.execute(query, (batch_size, offset)) - rows = cursor.fetchall() + retries = 0 + while retries < max_retries: + try: + with self.connection.cursor() as cursor: + query = f"SELECT * FROM `{table}` LIMIT %s OFFSET %s" + cursor.execute(query, (batch_size, offset)) + rows = cursor.fetchall() - if not rows: - break + if not rows: + return - yield rows - offset += len(rows) + yield rows + offset += len(rows) + break # Success, exit retry loop - except pymysql.Error as e: - logger.error(f"Failed to fetch rows from {table}: {e}") - raise + except pymysql.Error as e: + retries += 1 + if retries >= max_retries: + logger.error(f"Failed to fetch rows from {table} after {max_retries} retries: {e}") + raise + else: + logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}") + # Reconnect and retry + try: + self.disconnect() + self.connect() + except Exception as reconnect_error: + logger.error(f"Failed to reconnect: {reconnect_error}") + raise def fetch_rows_since( self, @@ -147,6 +166,59 @@ class MySQLConnector: logger.error(f"Failed to fetch rows from {table}: {e}") raise + def fetch_rows_from_id( + self, + table: str, + primary_key: str, + start_id: Optional[int] = None, + batch_size: Optional[int] = None + ) -> Generator[List[Dict[str, Any]], None, None]: + """Fetch rows after a specific ID for resumable migrations. + + Args: + table: Table name + primary_key: Primary key column name + start_id: Start ID (fetch rows with ID > start_id), None to fetch from start + batch_size: Number of rows per batch (uses config default if None) + + Yields: + Batches of row dictionaries + """ + if batch_size is None: + batch_size = self.settings.migration.batch_size + + offset = 0 + while True: + try: + with self.connection.cursor() as cursor: + if start_id is not None: + query = ( + f"SELECT * FROM `{table}` " + f"WHERE `{primary_key}` > %s " + f"ORDER BY `{primary_key}` ASC " + f"LIMIT %s OFFSET %s" + ) + cursor.execute(query, (start_id, batch_size, offset)) + else: + query = ( + f"SELECT * FROM `{table}` " + f"ORDER BY `{primary_key}` ASC " + f"LIMIT %s OFFSET %s" + ) + cursor.execute(query, (batch_size, offset)) + + rows = cursor.fetchall() + + if not rows: + break + + yield rows + offset += len(rows) + + except pymysql.Error as e: + logger.error(f"Failed to fetch rows from {table}: {e}") + raise + def get_table_structure(self, table: str) -> Dict[str, Any]: """Get table structure (column info). diff --git a/src/migrator/incremental_migration.py b/src/migrator/incremental_migration.py index e605297..15f4c6a 100644 --- a/src/migrator/incremental_migration.py +++ b/src/migrator/incremental_migration.py @@ -31,11 +31,12 @@ class IncrementalMigrator: self.settings = get_settings() self.state = MigrationState(state_file) - def migrate(self, dry_run: bool = False) -> int: + def migrate(self, dry_run: bool = False, use_id: bool = False) -> int: """Perform incremental migration since last sync. Args: dry_run: If True, log what would be done but don't modify data + use_id: If True, use ID-based resumption, else use timestamp-based Returns: Number of rows migrated @@ -44,7 +45,49 @@ class IncrementalMigrator: mysql_table = self.config["mysql_table"] pg_table = self.config["postgres_table"] + primary_key = self.config.get("primary_key", "id") + logger.info( + f"Starting incremental migration of {mysql_table} -> {pg_table} " + f"({'ID-based' if use_id else 'timestamp-based'})" + ) + + try: + with MySQLConnector() as mysql_conn: + with PostgreSQLConnector() as pg_conn: + if use_id: + return self._migrate_by_id( + mysql_conn, pg_conn, mysql_table, pg_table, primary_key, dry_run + ) + else: + return self._migrate_by_timestamp( + mysql_conn, pg_conn, mysql_table, pg_table, dry_run + ) + + except Exception as e: + logger.error(f"Incremental migration failed: {e}") + raise + + def _migrate_by_timestamp( + self, + mysql_conn: MySQLConnector, + pg_conn: PostgreSQLConnector, + mysql_table: str, + pg_table: str, + dry_run: bool + ) -> int: + """Migrate rows using timestamp-based resumption. + + Args: + mysql_conn: MySQL connector + pg_conn: PostgreSQL connector + mysql_table: MySQL table name + pg_table: PostgreSQL table name + dry_run: If True, don't modify data + + Returns: + Number of rows migrated + """ # Get last migration timestamp last_timestamp = self.state.get_last_timestamp(pg_table) @@ -55,91 +98,178 @@ class IncrementalMigrator: ) return 0 - logger.info( - f"Starting incremental migration of {mysql_table} -> {pg_table} " - f"since {last_timestamp}" + # Count rows to migrate + timestamp_col = "updated_at" if mysql_table == "ELABDATADISP" else "created_at" + + # Get max timestamp from PostgreSQL + pg_max_timestamp = pg_conn.get_max_timestamp( + pg_table, + timestamp_col ) - try: - with MySQLConnector() as mysql_conn: - # Count rows to migrate - timestamp_col = "updated_at" if mysql_table == "ELABDATADISP" else "created_at" + logger.info(f"Last timestamp in PostgreSQL: {pg_max_timestamp}") - with PostgreSQLConnector() as pg_conn: - # Get max timestamp from PostgreSQL - pg_max_timestamp = pg_conn.get_max_timestamp( - pg_table, - timestamp_col + if dry_run: + logger.info("[DRY RUN] Would migrate rows after timestamp") + return 0 + + migrated = 0 + migration_start_time = datetime.utcnow().isoformat() + + # Fetch and migrate rows in batches + batch_count = 0 + for batch in mysql_conn.fetch_rows_since( + mysql_table, + last_timestamp + ): + batch_count += 1 + + if batch_count == 1: + # Create progress tracker with unknown total + progress = ProgressTracker( + len(batch), + f"Migrating {mysql_table} (incremental)" + ) + progress.__enter__() + + # Transform batch + transformed = DataTransformer.transform_batch( + mysql_table, + batch + ) + + # Insert batch + columns = DataTransformer.get_column_order(pg_table) + inserted = pg_conn.insert_batch( + pg_table, + transformed, + columns + ) + + migrated += inserted + progress.update(inserted) + + if batch_count == 0: + logger.info(f"No new rows to migrate for {mysql_table}") + return 0 + + progress.__exit__(None, None, None) + + # Update migration state + self.state.set_last_timestamp(pg_table, migration_start_time) + self.state.increment_migration_count(pg_table, migrated) + + logger.info( + f"✓ Incremental migration complete: {migrated} rows migrated " + f"to {pg_table}" + ) + + return migrated + + def _migrate_by_id( + self, + mysql_conn: MySQLConnector, + pg_conn: PostgreSQLConnector, + mysql_table: str, + pg_table: str, + primary_key: str, + dry_run: bool + ) -> int: + """Migrate rows using ID-based resumption (resumable from last ID). + + Args: + mysql_conn: MySQL connector + pg_conn: PostgreSQL connector + mysql_table: MySQL table name + pg_table: PostgreSQL table name + primary_key: Primary key column name + dry_run: If True, don't modify data + + Returns: + Number of rows migrated + """ + # Get last migrated ID from state + total_count = mysql_conn.get_row_count(mysql_table) + state_dict = self.state.state.get(pg_table, {}) + last_id = state_dict.get("last_id") + previously_migrated = state_dict.get("total_migrated", 0) + + if last_id is None: + logger.info( + f"No previous ID-based migration found for {pg_table}. " + "Starting from beginning." + ) + remaining = total_count + else: + remaining = total_count - last_id + logger.info( + f"Resuming ID-based migration from ID > {last_id}\n" + f"Previously migrated: {previously_migrated} rows\n" + f"Remaining to migrate: {remaining} rows" + ) + + if dry_run: + logger.info(f"[DRY RUN] Would migrate {remaining} rows") + return remaining + + migrated = 0 + + with ProgressTracker( + remaining, + f"Migrating {mysql_table} (resumable)" + ) as progress: + # Fetch and migrate rows in batches + for batch in mysql_conn.fetch_rows_from_id( + mysql_table, + primary_key, + last_id + ): + if not batch: + break + + # Transform batch + transformed = DataTransformer.transform_batch( + mysql_table, + batch + ) + + # Insert batch + columns = DataTransformer.get_column_order(pg_table) + inserted = pg_conn.insert_batch( + pg_table, + transformed, + columns + ) + + if inserted > 0: + # Get the max ID from the batch + batch_max_id = max( + int(row.get(primary_key, 0)) for row in batch ) + migrated += inserted + progress.update(inserted) - logger.info(f"Last timestamp in PostgreSQL: {pg_max_timestamp}") + # Update state after each batch + if pg_table not in self.state.state: + self.state.state[pg_table] = {} + self.state.state[pg_table]["last_id"] = batch_max_id + self.state.state[pg_table]["total_migrated"] = previously_migrated + migrated + self.state.state[pg_table]["last_updated"] = datetime.utcnow().isoformat() + self.state._save_state() - if dry_run: - logger.info("[DRY RUN] Would migrate rows after timestamp") - return 0 + logger.info( + f"✓ ID-based incremental migration complete: {migrated} rows migrated " + f"to {pg_table}" + ) - migrated = 0 - migration_start_time = datetime.utcnow().isoformat() - - # Fetch and migrate rows in batches - batch_count = 0 - for batch in mysql_conn.fetch_rows_since( - mysql_table, - last_timestamp - ): - batch_count += 1 - - if batch_count == 1: - # Create progress tracker with unknown total - progress = ProgressTracker( - len(batch), - f"Migrating {mysql_table} (incremental)" - ) - progress.__enter__() - - # Transform batch - transformed = DataTransformer.transform_batch( - mysql_table, - batch - ) - - # Insert batch - columns = DataTransformer.get_column_order(pg_table) - inserted = pg_conn.insert_batch( - pg_table, - transformed, - columns - ) - - migrated += inserted - progress.update(inserted) - - if batch_count == 0: - logger.info(f"No new rows to migrate for {mysql_table}") - return 0 - - progress.__exit__(None, None, None) - - # Update migration state - self.state.set_last_timestamp(pg_table, migration_start_time) - self.state.increment_migration_count(pg_table, migrated) - - logger.info( - f"✓ Incremental migration complete: {migrated} rows migrated " - f"to {pg_table}" - ) - - return migrated - - except Exception as e: - logger.error(f"Incremental migration failed: {e}") - raise + return migrated def run_incremental_migration( table: str, dry_run: bool = False, - state_file: str = "migration_state.json" + state_file: str = "migration_state.json", + use_id: bool = False ) -> int: """Run incremental migration for a table. @@ -147,9 +277,10 @@ def run_incremental_migration( table: Table name to migrate dry_run: If True, show what would be done without modifying data state_file: Path to migration state file + use_id: If True, use ID-based resumption, else use timestamp-based Returns: Number of rows migrated """ migrator = IncrementalMigrator(table, state_file) - return migrator.migrate(dry_run=dry_run) + return migrator.migrate(dry_run=dry_run, use_id=use_id) diff --git a/src/transformers/data_transformer.py b/src/transformers/data_transformer.py index 33c0a7f..0b4c251 100644 --- a/src/transformers/data_transformer.py +++ b/src/transformers/data_transformer.py @@ -1,6 +1,6 @@ """Data transformation from MySQL to PostgreSQL format.""" from typing import Dict, Any, List -from datetime import datetime +from datetime import datetime, time, timedelta from config import ( RAWDATACOR_COLUMNS, ELABDATADISP_FIELD_MAPPING, @@ -14,6 +14,36 @@ logger = get_logger(__name__) class DataTransformer: """Transform MySQL data to PostgreSQL format.""" + @staticmethod + def _convert_time(event_time: Any) -> time: + """Convert event_time to datetime.time object. + + Handles multiple input types: + - str: Parse from "HH:MM:SS" format + - timedelta: Convert from MySQL TIME type (stored as timedelta) + - time: Return as-is + + Args: + event_time: Time value from MySQL (str, timedelta, or time) + + Returns: + datetime.time object + """ + if isinstance(event_time, str): + return datetime.strptime(event_time, "%H:%M:%S").time() + elif isinstance(event_time, timedelta): + # MySQL returns TIME as timedelta + # Extract seconds from timedelta and convert to time + total_seconds = int(event_time.total_seconds()) + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + seconds = total_seconds % 60 + return time(hour=hours, minute=minutes, second=seconds) + elif isinstance(event_time, time): + return event_time + else: + raise ValueError(f"Unsupported event_time type: {type(event_time)}") + @staticmethod def transform_rawdatacor_row(mysql_row: Dict[str, Any]) -> Dict[str, Any]: """Transform a RAWDATACOR row from MySQL to PostgreSQL format. @@ -41,14 +71,35 @@ class DataTransformer: "unit": unit if unit else None, } + # Combine event_date and event_time into event_timestamp + event_date = mysql_row.get("EventDate") + event_time = mysql_row.get("EventTime") + if event_date is not None and event_time is not None: + event_time_obj = DataTransformer._convert_time(event_time) + event_timestamp = datetime.combine(event_date, event_time_obj) + elif event_date is None or event_time is None: + # Log a warning for records with missing date/time + missing = [] + if event_date is None: + missing.append("EventDate") + if event_time is None: + missing.append("EventTime") + logger.warning( + f"Row {mysql_row.get('id')} has NULL {', '.join(missing)}. " + f"Using default timestamp: 1970-01-01 00:00:00" + ) + # Use default timestamp for records with missing date/time + event_timestamp = datetime(1970, 1, 1, 0, 0, 0) + else: + event_timestamp = None + # Create PostgreSQL row pg_row = { "id": mysql_row["id"], "unit_name": mysql_row.get("UnitName"), "tool_name_id": mysql_row["ToolNameID"], "node_num": mysql_row["NodeNum"], - "event_date": mysql_row["EventDate"], - "event_time": mysql_row["EventTime"], + "event_timestamp": event_timestamp, "bat_level": mysql_row["BatLevel"], "temperature": mysql_row["Temperature"], "measurements": measurements, @@ -90,14 +141,35 @@ class DataTransformer: k: v for k, v in measurements.items() if v } + # Combine event_date and event_time into event_timestamp + event_date = mysql_row.get("EventDate") + event_time = mysql_row.get("EventTime") + if event_date is not None and event_time is not None: + event_time_obj = DataTransformer._convert_time(event_time) + event_timestamp = datetime.combine(event_date, event_time_obj) + elif event_date is None or event_time is None: + # Log a warning for records with missing date/time + missing = [] + if event_date is None: + missing.append("EventDate") + if event_time is None: + missing.append("EventTime") + logger.warning( + f"Row {mysql_row.get('idElabData')} has NULL {', '.join(missing)}. " + f"Using default timestamp: 1970-01-01 00:00:00" + ) + # Use default timestamp for records with missing date/time + event_timestamp = datetime(1970, 1, 1, 0, 0, 0) + else: + event_timestamp = None + # Create PostgreSQL row pg_row = { "id_elab_data": mysql_row["idElabData"], "unit_name": mysql_row.get("UnitName"), "tool_name_id": mysql_row["ToolNameID"], "node_num": mysql_row["NodeNum"], - "event_date": mysql_row["EventDate"], - "event_time": mysql_row["EventTime"], + "event_timestamp": event_timestamp, "state": mysql_row.get("State"), "calc_err": mysql_row.get("calcerr", 0), "measurements": measurements, @@ -150,8 +222,7 @@ class DataTransformer: "unit_name", "tool_name_id", "node_num", - "event_date", - "event_time", + "event_timestamp", "bat_level", "temperature", "measurements", @@ -166,8 +237,7 @@ class DataTransformer: "unit_name", "tool_name_id", "node_num", - "event_date", - "event_time", + "event_timestamp", "state", "calc_err", "measurements", diff --git a/src/transformers/schema_transformer.py b/src/transformers/schema_transformer.py index 25dc4d4..94fd495 100644 --- a/src/transformers/schema_transformer.py +++ b/src/transformers/schema_transformer.py @@ -21,8 +21,7 @@ CREATE TABLE IF NOT EXISTS rawdatacor ( unit_name VARCHAR(32), tool_name_id VARCHAR(32) NOT NULL, node_num INTEGER NOT NULL, - event_date DATE NOT NULL, - event_time TIME NOT NULL, + event_timestamp TIMESTAMP NOT NULL, bat_level NUMERIC(4,2) NOT NULL, temperature NUMERIC(5,2) NOT NULL, measurements JSONB, @@ -30,7 +29,7 @@ CREATE TABLE IF NOT EXISTS rawdatacor ( bat_level_module NUMERIC(4,2), temperature_module NUMERIC(5,2), rssi_module INTEGER -) PARTITION BY RANGE (EXTRACT(YEAR FROM event_date)); +) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp)); -- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints -- with RANGE partitioning on expressions. Using sequence for id auto-increment. @@ -46,11 +45,18 @@ CREATE TABLE IF NOT EXISTS rawdatacor_{year} FOR VALUES FROM ({year}) TO ({next_year}); """ + # Add default partition for records outside the defined year range + sql += """ +CREATE TABLE IF NOT EXISTS rawdatacor_default + PARTITION OF rawdatacor + DEFAULT; +""" + # Add indexes sql += """ -- Create indexes CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_raw - ON rawdatacor(unit_name, tool_name_id, node_num, event_date, event_time); + ON rawdatacor(unit_name, tool_name_id, node_num, event_timestamp); CREATE INDEX IF NOT EXISTS idx_unit_tool_raw ON rawdatacor(unit_name, tool_name_id); @@ -58,8 +64,8 @@ CREATE INDEX IF NOT EXISTS idx_unit_tool_raw CREATE INDEX IF NOT EXISTS idx_measurements_gin_raw ON rawdatacor USING GIN (measurements); -CREATE INDEX IF NOT EXISTS idx_event_date_raw - ON rawdatacor(event_date); +CREATE INDEX IF NOT EXISTS idx_event_timestamp_raw + ON rawdatacor(event_timestamp); """ return sql @@ -81,14 +87,13 @@ CREATE TABLE IF NOT EXISTS elabdatadisp ( unit_name VARCHAR(32), tool_name_id VARCHAR(32) NOT NULL, node_num INTEGER NOT NULL, - event_date DATE NOT NULL, - event_time TIME NOT NULL, + event_timestamp TIMESTAMP NOT NULL, state VARCHAR(32), calc_err INTEGER DEFAULT 0, measurements JSONB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -) PARTITION BY RANGE (EXTRACT(YEAR FROM event_date)); +) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp)); -- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints -- with RANGE partitioning on expressions. Using sequence for id_elab_data auto-increment. @@ -104,11 +109,18 @@ CREATE TABLE IF NOT EXISTS elabdatadisp_{year} FOR VALUES FROM ({year}) TO ({next_year}); """ + # Add default partition for records outside the defined year range + sql += """ +CREATE TABLE IF NOT EXISTS elabdatadisp_default + PARTITION OF elabdatadisp + DEFAULT; +""" + # Add indexes sql += """ -- Create indexes CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_elab - ON elabdatadisp(unit_name, tool_name_id, node_num, event_date, event_time); + ON elabdatadisp(unit_name, tool_name_id, node_num, event_timestamp); CREATE INDEX IF NOT EXISTS idx_unit_tool_elab ON elabdatadisp(unit_name, tool_name_id); @@ -116,8 +128,8 @@ CREATE INDEX IF NOT EXISTS idx_unit_tool_elab CREATE INDEX IF NOT EXISTS idx_measurements_gin_elab ON elabdatadisp USING GIN (measurements); -CREATE INDEX IF NOT EXISTS idx_event_date_elab - ON elabdatadisp(event_date); +CREATE INDEX IF NOT EXISTS idx_event_timestamp_elab + ON elabdatadisp(event_timestamp); """ return sql diff --git a/tests/test_setup.py b/tests/test_setup.py index e70c5ba..1025fd5 100644 --- a/tests/test_setup.py +++ b/tests/test_setup.py @@ -1,5 +1,6 @@ """Test setup and basic functionality.""" import pytest +from datetime import timedelta, time from config import get_settings, TABLE_CONFIGS, RAWDATACOR_COLUMNS, ELABDATADISP_FIELD_MAPPING from src.transformers.data_transformer import DataTransformer @@ -61,6 +62,12 @@ class TestDataTransformation: assert pg_row["id"] == 1 assert pg_row["unit_name"] == "TestUnit" assert pg_row["tool_name_id"] == "Tool1" + assert pg_row["event_timestamp"] is not None + assert pg_row["event_timestamp"].year == 2024 + assert pg_row["event_timestamp"].month == 1 + assert pg_row["event_timestamp"].day == 1 + assert pg_row["event_timestamp"].hour == 12 + assert pg_row["event_timestamp"].minute == 0 assert isinstance(pg_row["measurements"], dict) assert "0" in pg_row["measurements"] assert pg_row["measurements"]["0"]["value"] == "100.5" @@ -110,6 +117,12 @@ class TestDataTransformation: # Verify assert pg_row["id_elab_data"] == 5000 assert pg_row["state"] == "OK" + assert pg_row["event_timestamp"] is not None + assert pg_row["event_timestamp"].year == 2024 + assert pg_row["event_timestamp"].month == 1 + assert pg_row["event_timestamp"].day == 1 + assert pg_row["event_timestamp"].hour == 12 + assert pg_row["event_timestamp"].minute == 0 assert isinstance(pg_row["measurements"], dict) assert "shifts" in pg_row["measurements"] assert "coordinates" in pg_row["measurements"] @@ -135,6 +148,105 @@ class TestDataTransformation: assert "state" in columns +class TestTimeConversion: + """Test time conversion utilities.""" + + def test_convert_time_from_string(self): + """Test converting time from string format.""" + event_time = "12:30:45" + result = DataTransformer._convert_time(event_time) + assert isinstance(result, time) + assert result.hour == 12 + assert result.minute == 30 + assert result.second == 45 + + def test_convert_time_from_timedelta(self): + """Test converting time from timedelta (MySQL TIME format).""" + # MySQL returns TIME columns as timedelta + event_time = timedelta(hours=14, minutes=25, seconds=30) + result = DataTransformer._convert_time(event_time) + assert isinstance(result, time) + assert result.hour == 14 + assert result.minute == 25 + assert result.second == 30 + + def test_convert_time_from_time_object(self): + """Test converting time from time object.""" + event_time = time(10, 15, 20) + result = DataTransformer._convert_time(event_time) + assert isinstance(result, time) + assert result.hour == 10 + assert result.minute == 15 + assert result.second == 20 + + def test_rawdatacor_with_timedelta(self): + """Test RAWDATACOR transformation with timedelta event_time.""" + mysql_row = { + "id": 1, + "UnitName": "TestUnit", + "ToolNameID": "Tool1", + "NodeNum": 1, + "EventDate": "2024-01-01", + "EventTime": timedelta(hours=12, minutes=0, seconds=0), # MySQL TIME format + "BatLevel": 3.5, + "Temperature": 25.5, + "Val0": "100.5", + "Val1": None, + "Val2": "200.3", + "Val0_unitmisure": "°C", + "Val1_unitmisure": "bar", + "Val2_unitmisure": "m/s", + } + + # Add remaining Val columns as None + for i in range(3, 16): + col = f"Val{i:X}" + mysql_row[col] = None + mysql_row[f"{col}_unitmisure"] = None + + pg_row = DataTransformer.transform_rawdatacor_row(mysql_row) + + assert pg_row["event_timestamp"] is not None + assert pg_row["event_timestamp"].year == 2024 + assert pg_row["event_timestamp"].month == 1 + assert pg_row["event_timestamp"].day == 1 + assert pg_row["event_timestamp"].hour == 12 + assert pg_row["event_timestamp"].minute == 0 + + def test_rawdatacor_with_null_eventtime(self): + """Test RAWDATACOR transformation with NULL EventTime uses default timestamp.""" + mysql_row = { + "id": 2140982, + "UnitName": "OLD_ID0002", + "ToolNameID": "DT0001", + "NodeNum": 1, + "EventDate": "2023-09-05", + "EventTime": None, # NULL EventTime + "BatLevel": 12.90, + "Temperature": 13.40, + "Val0": "-1709", + "Val1": None, + "Val0_unitmisure": None, + "Val1_unitmisure": None, + } + + # Add remaining Val columns as None + for i in range(2, 16): + col = f"Val{i:X}" + mysql_row[col] = None + mysql_row[f"{col}_unitmisure"] = None + + pg_row = DataTransformer.transform_rawdatacor_row(mysql_row) + + # Should use default timestamp 1970-01-01 00:00:00 + assert pg_row["event_timestamp"] is not None + assert pg_row["event_timestamp"].year == 1970 + assert pg_row["event_timestamp"].month == 1 + assert pg_row["event_timestamp"].day == 1 + assert pg_row["event_timestamp"].hour == 0 + assert pg_row["event_timestamp"].minute == 0 + + class TestFieldMapping: """Test field mapping configuration."""