fix: Add timeout settings and retry logic to MySQL connector

Configuration improvements:
- Set read_timeout=300 (5 minutes) to handle long queries
- Set write_timeout=300 (5 minutes) for writes
- Set max_allowed_packet=64MB to handle larger data transfers

Retry logic:
- Added retry mechanism with max 3 retries on fetch failure
- Auto-reconnect on connection loss before retry
- Better error messages showing retry attempts

This fixes the 'connection is lost' error that occurs during
long-running migrations by:
1. Giving MySQL queries more time to complete
2. Allowing larger packet sizes for bulk data
3. Automatically recovering from connection drops

Fixes: 'Connection is lost' error during full migration
This commit is contained in:
2025-12-21 09:53:34 +01:00
parent 821cda850e
commit b09cfcf9df
8 changed files with 761 additions and 119 deletions

245
MIGRATION_WORKFLOW.md Normal file
View File

@@ -0,0 +1,245 @@
# MySQL to PostgreSQL Migration Workflow
## Overview
This tool supports three migration modes:
1. **Full Migration** (`full_migration.py`) - Initial complete migration
2. **Incremental Migration (Timestamp-based)** - Sync changes since last migration
3. **Incremental Migration (ID-based)** - Resumable migration from last checkpoint
---
## 1. Initial Full Migration
### First Time Setup
```bash
# Create the PostgreSQL schema
python main.py setup --create-schema
# Run full migration (one-time)
python main.py migrate --full RAWDATACOR
python main.py migrate --full ELABDATADISP
```
**When to use:** First time migrating data or need complete fresh migration.
**Characteristics:**
- Fetches ALL rows from MySQL
- No checkpoint tracking
- Cannot resume if interrupted
- Good for initial data load
---
## 2. Timestamp-based Incremental Migration
### For Continuous Sync (Recommended for most cases)
```bash
# After initial full migration, use incremental with timestamps
python main.py migrate --incremental RAWDATACOR
python main.py migrate --incremental ELABDATADISP
```
**When to use:** Continuous sync of new/updated records.
**Characteristics:**
- Tracks `created_at` (RAWDATACOR) or `updated_at` (ELABDATADISP)
- Uses JSON state file (`migration_state.json`)
- Only fetches rows modified since last run
- Perfect for scheduled jobs (cron, airflow, etc.)
- Syncs changes but NOT deletions
**How it works:**
1. First run: Returns with message "No previous migration found" - must run full migration first
2. Subsequent runs: Only fetches rows where `created_at` > last_migration_timestamp
3. Updates state file with new timestamp for next run
**Example workflow:**
```bash
# Day 1: Initial full migration
python main.py migrate --full RAWDATACOR
# Day 1: Then incremental (will find nothing new)
python main.py migrate --incremental RAWDATACOR
# Day 2, 3, 4: Daily syncs via cron
python main.py migrate --incremental RAWDATACOR
```
---
## 3. ID-based Incremental Migration (Resumable)
### For Large Datasets or Unreliable Connections
```bash
# First run
python main.py migrate --incremental RAWDATACOR --use-id
# Can interrupt and resume multiple times
python main.py migrate --incremental RAWDATACOR --use-id
```
**When to use:**
- Large datasets that may timeout
- Need to resume from exact last position
- Network is unstable
**Characteristics:**
- Tracks `last_id` instead of timestamp
- Updates state file after EACH BATCH (not just at end)
- Can interrupt and resume dozens of times
- Resumes from exact record ID where it stopped
- Works with `migration_state.json`
**How it works:**
1. First run: Starts from beginning (ID = 0)
2. Each batch: Updates state file with max ID from batch
3. Interrupt: Can stop at any time
4. Resume: Next run continues from last ID stored
5. Continues until all rows processed
**Example workflow for large dataset:**
```bash
# Start ID-based migration (will migrate in batches)
python main.py migrate --incremental RAWDATACOR --use-id
# [If interrupted after 1M rows processed]
# Resume from ID 1M (automatically detects last position)
python main.py migrate --incremental RAWDATACOR --use-id
# [Continues until complete]
```
---
## State Management
### State File Location
```
migration_state.json # In project root
```
### State File Content (Timestamp-based)
```json
{
"rawdatacor": {
"last_timestamp": "2024-12-11T19:30:45.123456",
"last_updated": "2024-12-11T19:30:45.123456",
"total_migrated": 50000
}
}
```
### State File Content (ID-based)
```json
{
"rawdatacor": {
"last_id": 1000000,
"total_migrated": 1000000,
"last_updated": "2024-12-11T19:45:30.123456"
}
}
```
### Reset Migration State
```python
from src.migrator.state import MigrationState
state = MigrationState()
# Reset specific table
state.reset("rawdatacor")
# Reset all tables
state.reset()
```
---
## Recommended Workflow
### For Daily Continuous Sync
```bash
# Week 1: Initial setup
python main.py setup --create-schema
python main.py migrate --full RAWDATACOR
python main.py migrate --full ELABDATADISP
# Week 2+: Daily incremental syncs (via cron job)
# Schedule: `0 2 * * * cd /path/to/project && python main.py migrate --incremental RAWDATACOR`
python main.py migrate --incremental RAWDATACOR
python main.py migrate --incremental ELABDATADISP
```
### For Large Initial Migration
```bash
# If dataset > 10 million rows
python main.py setup --create-schema
python main.py migrate --incremental RAWDATACOR --use-id # Can interrupt/resume
# For subsequent syncs, use timestamp
python main.py migrate --incremental RAWDATACOR # Timestamp-based
```
---
## Key Differences at a Glance
| Feature | Full | Timestamp | ID-based |
|---------|------|-----------|----------|
| Initial setup | ✅ Required first | ✅ After full | ✅ After full |
| Sync new/updated | ❌ No | ✅ Yes | ✅ Yes |
| Resumable | ❌ No | ⚠️ Partial* | ✅ Full |
| Batched state tracking | ❌ No | ❌ No | ✅ Yes |
| Large datasets | ⚠️ Risky | ✅ Good | ✅ Best |
| Scheduled jobs | ❌ No | ✅ Perfect | ⚠️ Unnecessary |
*Timestamp mode can resume, but must wait for full batch to complete before continuing
---
## Default Partitions
Both tables are partitioned by year (2014-2031) plus a DEFAULT partition:
- **rawdatacor_2014** through **rawdatacor_2031** (yearly partitions)
- **rawdatacor_default** (catches data outside 2014-2031)
Same for ELABDATADISP. This ensures data with edge-case timestamps doesn't break migration.
---
## Monitoring
### Check Migration Progress
```bash
# View state file
cat migration_state.json
# Check PostgreSQL row counts
psql -U postgres -h localhost -d your_db -c "SELECT COUNT(*) FROM rawdatacor;"
```
### Common Issues
**"No previous migration found"** (Timestamp mode)
- Solution: Run full migration first with `--full` flag
**"Duplicate key value violates unique constraint"**
- Cause: Running full migration twice
- Solution: Use timestamp-based incremental sync instead
**"Timeout during migration"** (Large datasets)
- Solution: Switch to ID-based resumable migration with `--use-id`
---
## Summary
- **Start with:** Full migration (`--full`) for initial data load
- **Then use:** Timestamp-based incremental (`--incremental`) for daily syncs
- **Switch to:** ID-based resumable (`--incremental --use-id`) if full migration is too large

View File

@@ -156,13 +156,13 @@ _rawdatacor_config = {
"mysql_table": "RAWDATACOR", "mysql_table": "RAWDATACOR",
"postgres_table": "rawdatacor", "postgres_table": "rawdatacor",
"primary_key": "id", "primary_key": "id",
"partition_key": "event_date", "partition_key": "event_timestamp",
} }
_elabdatadisp_config = { _elabdatadisp_config = {
"mysql_table": "ELABDATADISP", "mysql_table": "ELABDATADISP",
"postgres_table": "elabdatadisp", "postgres_table": "elabdatadisp",
"primary_key": "idElabData", "primary_key": "idElabData",
"partition_key": "event_date", "partition_key": "event_timestamp",
} }
TABLE_CONFIGS = { TABLE_CONFIGS = {

View File

@@ -25,13 +25,13 @@ class BenchmarkQueryGenerator:
"select_by_pk": [ "select_by_pk": [
( (
"SELECT * FROM `RAWDATACOR` WHERE `id` = 1000 AND `EventDate` = '2024-01-15'", "SELECT * FROM `RAWDATACOR` WHERE `id` = 1000 AND `EventDate` = '2024-01-15'",
"SELECT * FROM rawdatacor WHERE id = 1000 AND event_date = '2024-01-15'" "SELECT * FROM rawdatacor WHERE id = 1000 AND event_timestamp::date = '2024-01-15'"
) )
], ],
"select_by_date_range": [ "select_by_date_range": [
( (
f"SELECT * FROM `RAWDATACOR` WHERE `EventDate` BETWEEN '{sample_date_start}' AND '{sample_date_end}'", f"SELECT * FROM `RAWDATACOR` WHERE `EventDate` BETWEEN '{sample_date_start}' AND '{sample_date_end}'",
f"SELECT * FROM rawdatacor WHERE event_date BETWEEN '{sample_date_start}' AND '{sample_date_end}'" f"SELECT * FROM rawdatacor WHERE event_timestamp::date BETWEEN '{sample_date_start}' AND '{sample_date_end}'"
) )
], ],
"select_by_unit_tool": [ "select_by_unit_tool": [
@@ -61,13 +61,13 @@ class BenchmarkQueryGenerator:
"aggregate_by_date": [ "aggregate_by_date": [
( (
"SELECT `EventDate`, COUNT(*) as count FROM `RAWDATACOR` GROUP BY `EventDate` ORDER BY `EventDate`", "SELECT `EventDate`, COUNT(*) as count FROM `RAWDATACOR` GROUP BY `EventDate` ORDER BY `EventDate`",
"SELECT event_date, COUNT(*) as count FROM rawdatacor GROUP BY event_date ORDER BY event_date" "SELECT event_timestamp::date, COUNT(*) as count FROM rawdatacor GROUP BY event_timestamp::date ORDER BY event_timestamp::date"
) )
], ],
"aggregate_with_filter": [ "aggregate_with_filter": [
( (
f"SELECT `UnitName`, `ToolNameID`, COUNT(*) as count FROM `RAWDATACOR` WHERE `EventDate` >= '{sample_date_start}' GROUP BY `UnitName`, `ToolNameID`", f"SELECT `UnitName`, `ToolNameID`, COUNT(*) as count FROM `RAWDATACOR` WHERE `EventDate` >= '{sample_date_start}' GROUP BY `UnitName`, `ToolNameID`",
f"SELECT unit_name, tool_name_id, COUNT(*) as count FROM rawdatacor WHERE event_date >= '{sample_date_start}' GROUP BY unit_name, tool_name_id" f"SELECT unit_name, tool_name_id, COUNT(*) as count FROM rawdatacor WHERE event_timestamp::date >= '{sample_date_start}' GROUP BY unit_name, tool_name_id"
) )
], ],
} }
@@ -90,13 +90,13 @@ class BenchmarkQueryGenerator:
"select_by_pk": [ "select_by_pk": [
( (
"SELECT * FROM `ELABDATADISP` WHERE `idElabData` = 5000 AND `EventDate` = '2024-01-15'", "SELECT * FROM `ELABDATADISP` WHERE `idElabData` = 5000 AND `EventDate` = '2024-01-15'",
"SELECT * FROM elabdatadisp WHERE id_elab_data = 5000 AND event_date = '2024-01-15'" "SELECT * FROM elabdatadisp WHERE id_elab_data = 5000 AND event_timestamp::date = '2024-01-15'"
) )
], ],
"select_by_date_range": [ "select_by_date_range": [
( (
f"SELECT * FROM `ELABDATADISP` WHERE `EventDate` BETWEEN '{sample_date_start}' AND '{sample_date_end}'", f"SELECT * FROM `ELABDATADISP` WHERE `EventDate` BETWEEN '{sample_date_start}' AND '{sample_date_end}'",
f"SELECT * FROM elabdatadisp WHERE event_date BETWEEN '{sample_date_start}' AND '{sample_date_end}'" f"SELECT * FROM elabdatadisp WHERE event_timestamp::date BETWEEN '{sample_date_start}' AND '{sample_date_end}'"
) )
], ],
"select_by_unit_tool": [ "select_by_unit_tool": [
@@ -126,7 +126,7 @@ class BenchmarkQueryGenerator:
"aggregate_measurements": [ "aggregate_measurements": [
( (
None, None,
f"SELECT unit_name, AVG((measurements->'kinematics'->>'speed')::NUMERIC) as avg_speed FROM elabdatadisp WHERE event_date >= '{sample_date_start}' GROUP BY unit_name LIMIT 100" f"SELECT unit_name, AVG((measurements->'kinematics'->>'speed')::NUMERIC) as avg_speed FROM elabdatadisp WHERE event_timestamp::date >= '{sample_date_start}' GROUP BY unit_name LIMIT 100"
) )
], ],
"count_by_state": [ "count_by_state": [
@@ -150,11 +150,11 @@ class BenchmarkQueryGenerator:
queries = { queries = {
"insert_single_rawdatacor": ( "insert_single_rawdatacor": (
"INSERT INTO `RAWDATACOR` (`UnitName`, `ToolNameID`, `NodeNum`, `EventDate`, `EventTime`, `BatLevel`, `Temperature`) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', 3.5, 25.5)", "INSERT INTO `RAWDATACOR` (`UnitName`, `ToolNameID`, `NodeNum`, `EventDate`, `EventTime`, `BatLevel`, `Temperature`) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', 3.5, 25.5)",
"INSERT INTO rawdatacor (unit_name, tool_name_id, node_num, event_date, event_time, bat_level, temperature, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', 3.5, 25.5, '{}')" "INSERT INTO rawdatacor (unit_name, tool_name_id, node_num, event_timestamp, bat_level, temperature, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01 12:00:00', 3.5, 25.5, '{}')"
), ),
"insert_single_elabdatadisp": ( "insert_single_elabdatadisp": (
"INSERT INTO `ELABDATADISP` (`UnitName`, `ToolNameID`, `NodeNum`, `EventDate`, `EventTime`) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00')", "INSERT INTO `ELABDATADISP` (`UnitName`, `ToolNameID`, `NodeNum`, `EventDate`, `EventTime`) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00')",
"INSERT INTO elabdatadisp (unit_name, tool_name_id, node_num, event_date, event_time, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01', '12:00:00', '{}')" "INSERT INTO elabdatadisp (unit_name, tool_name_id, node_num, event_timestamp, measurements) VALUES ('Unit1', 'Tool1', 1, '2024-01-01 12:00:00', '{}')"
), ),
} }

View File

@@ -26,6 +26,9 @@ class MySQLConnector:
database=self.settings.mysql.database, database=self.settings.mysql.database,
charset="utf8mb4", charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor, cursorclass=pymysql.cursors.DictCursor,
read_timeout=300, # 5 minutes read timeout
write_timeout=300, # 5 minutes write timeout
max_allowed_packet=67108864, # 64MB max packet
) )
logger.info( logger.info(
f"Connected to MySQL: {self.settings.mysql.host}:" f"Connected to MySQL: {self.settings.mysql.host}:"
@@ -86,22 +89,38 @@ class MySQLConnector:
batch_size = self.settings.migration.batch_size batch_size = self.settings.migration.batch_size
offset = 0 offset = 0
max_retries = 3
while True: while True:
try: retries = 0
with self.connection.cursor() as cursor: while retries < max_retries:
query = f"SELECT * FROM `{table}` LIMIT %s OFFSET %s" try:
cursor.execute(query, (batch_size, offset)) with self.connection.cursor() as cursor:
rows = cursor.fetchall() query = f"SELECT * FROM `{table}` LIMIT %s OFFSET %s"
cursor.execute(query, (batch_size, offset))
rows = cursor.fetchall()
if not rows: if not rows:
break return
yield rows yield rows
offset += len(rows) offset += len(rows)
break # Success, exit retry loop
except pymysql.Error as e: except pymysql.Error as e:
logger.error(f"Failed to fetch rows from {table}: {e}") retries += 1
raise if retries >= max_retries:
logger.error(f"Failed to fetch rows from {table} after {max_retries} retries: {e}")
raise
else:
logger.warning(f"Fetch failed (retry {retries}/{max_retries}): {e}")
# Reconnect and retry
try:
self.disconnect()
self.connect()
except Exception as reconnect_error:
logger.error(f"Failed to reconnect: {reconnect_error}")
raise
def fetch_rows_since( def fetch_rows_since(
self, self,
@@ -147,6 +166,59 @@ class MySQLConnector:
logger.error(f"Failed to fetch rows from {table}: {e}") logger.error(f"Failed to fetch rows from {table}: {e}")
raise raise
def fetch_rows_from_id(
self,
table: str,
primary_key: str,
start_id: Optional[int] = None,
batch_size: Optional[int] = None
) -> Generator[List[Dict[str, Any]], None, None]:
"""Fetch rows after a specific ID for resumable migrations.
Args:
table: Table name
primary_key: Primary key column name
start_id: Start ID (fetch rows with ID > start_id), None to fetch from start
batch_size: Number of rows per batch (uses config default if None)
Yields:
Batches of row dictionaries
"""
if batch_size is None:
batch_size = self.settings.migration.batch_size
offset = 0
while True:
try:
with self.connection.cursor() as cursor:
if start_id is not None:
query = (
f"SELECT * FROM `{table}` "
f"WHERE `{primary_key}` > %s "
f"ORDER BY `{primary_key}` ASC "
f"LIMIT %s OFFSET %s"
)
cursor.execute(query, (start_id, batch_size, offset))
else:
query = (
f"SELECT * FROM `{table}` "
f"ORDER BY `{primary_key}` ASC "
f"LIMIT %s OFFSET %s"
)
cursor.execute(query, (batch_size, offset))
rows = cursor.fetchall()
if not rows:
break
yield rows
offset += len(rows)
except pymysql.Error as e:
logger.error(f"Failed to fetch rows from {table}: {e}")
raise
def get_table_structure(self, table: str) -> Dict[str, Any]: def get_table_structure(self, table: str) -> Dict[str, Any]:
"""Get table structure (column info). """Get table structure (column info).

View File

@@ -31,11 +31,12 @@ class IncrementalMigrator:
self.settings = get_settings() self.settings = get_settings()
self.state = MigrationState(state_file) self.state = MigrationState(state_file)
def migrate(self, dry_run: bool = False) -> int: def migrate(self, dry_run: bool = False, use_id: bool = False) -> int:
"""Perform incremental migration since last sync. """Perform incremental migration since last sync.
Args: Args:
dry_run: If True, log what would be done but don't modify data dry_run: If True, log what would be done but don't modify data
use_id: If True, use ID-based resumption, else use timestamp-based
Returns: Returns:
Number of rows migrated Number of rows migrated
@@ -44,7 +45,49 @@ class IncrementalMigrator:
mysql_table = self.config["mysql_table"] mysql_table = self.config["mysql_table"]
pg_table = self.config["postgres_table"] pg_table = self.config["postgres_table"]
primary_key = self.config.get("primary_key", "id")
logger.info(
f"Starting incremental migration of {mysql_table} -> {pg_table} "
f"({'ID-based' if use_id else 'timestamp-based'})"
)
try:
with MySQLConnector() as mysql_conn:
with PostgreSQLConnector() as pg_conn:
if use_id:
return self._migrate_by_id(
mysql_conn, pg_conn, mysql_table, pg_table, primary_key, dry_run
)
else:
return self._migrate_by_timestamp(
mysql_conn, pg_conn, mysql_table, pg_table, dry_run
)
except Exception as e:
logger.error(f"Incremental migration failed: {e}")
raise
def _migrate_by_timestamp(
self,
mysql_conn: MySQLConnector,
pg_conn: PostgreSQLConnector,
mysql_table: str,
pg_table: str,
dry_run: bool
) -> int:
"""Migrate rows using timestamp-based resumption.
Args:
mysql_conn: MySQL connector
pg_conn: PostgreSQL connector
mysql_table: MySQL table name
pg_table: PostgreSQL table name
dry_run: If True, don't modify data
Returns:
Number of rows migrated
"""
# Get last migration timestamp # Get last migration timestamp
last_timestamp = self.state.get_last_timestamp(pg_table) last_timestamp = self.state.get_last_timestamp(pg_table)
@@ -55,91 +98,178 @@ class IncrementalMigrator:
) )
return 0 return 0
logger.info( # Count rows to migrate
f"Starting incremental migration of {mysql_table} -> {pg_table} " timestamp_col = "updated_at" if mysql_table == "ELABDATADISP" else "created_at"
f"since {last_timestamp}"
# Get max timestamp from PostgreSQL
pg_max_timestamp = pg_conn.get_max_timestamp(
pg_table,
timestamp_col
) )
try: logger.info(f"Last timestamp in PostgreSQL: {pg_max_timestamp}")
with MySQLConnector() as mysql_conn:
# Count rows to migrate
timestamp_col = "updated_at" if mysql_table == "ELABDATADISP" else "created_at"
with PostgreSQLConnector() as pg_conn: if dry_run:
# Get max timestamp from PostgreSQL logger.info("[DRY RUN] Would migrate rows after timestamp")
pg_max_timestamp = pg_conn.get_max_timestamp( return 0
pg_table,
timestamp_col migrated = 0
migration_start_time = datetime.utcnow().isoformat()
# Fetch and migrate rows in batches
batch_count = 0
for batch in mysql_conn.fetch_rows_since(
mysql_table,
last_timestamp
):
batch_count += 1
if batch_count == 1:
# Create progress tracker with unknown total
progress = ProgressTracker(
len(batch),
f"Migrating {mysql_table} (incremental)"
)
progress.__enter__()
# Transform batch
transformed = DataTransformer.transform_batch(
mysql_table,
batch
)
# Insert batch
columns = DataTransformer.get_column_order(pg_table)
inserted = pg_conn.insert_batch(
pg_table,
transformed,
columns
)
migrated += inserted
progress.update(inserted)
if batch_count == 0:
logger.info(f"No new rows to migrate for {mysql_table}")
return 0
progress.__exit__(None, None, None)
# Update migration state
self.state.set_last_timestamp(pg_table, migration_start_time)
self.state.increment_migration_count(pg_table, migrated)
logger.info(
f"✓ Incremental migration complete: {migrated} rows migrated "
f"to {pg_table}"
)
return migrated
def _migrate_by_id(
self,
mysql_conn: MySQLConnector,
pg_conn: PostgreSQLConnector,
mysql_table: str,
pg_table: str,
primary_key: str,
dry_run: bool
) -> int:
"""Migrate rows using ID-based resumption (resumable from last ID).
Args:
mysql_conn: MySQL connector
pg_conn: PostgreSQL connector
mysql_table: MySQL table name
pg_table: PostgreSQL table name
primary_key: Primary key column name
dry_run: If True, don't modify data
Returns:
Number of rows migrated
"""
# Get last migrated ID from state
total_count = mysql_conn.get_row_count(mysql_table)
state_dict = self.state.state.get(pg_table, {})
last_id = state_dict.get("last_id")
previously_migrated = state_dict.get("total_migrated", 0)
if last_id is None:
logger.info(
f"No previous ID-based migration found for {pg_table}. "
"Starting from beginning."
)
remaining = total_count
else:
remaining = total_count - last_id
logger.info(
f"Resuming ID-based migration from ID > {last_id}\n"
f"Previously migrated: {previously_migrated} rows\n"
f"Remaining to migrate: {remaining} rows"
)
if dry_run:
logger.info(f"[DRY RUN] Would migrate {remaining} rows")
return remaining
migrated = 0
with ProgressTracker(
remaining,
f"Migrating {mysql_table} (resumable)"
) as progress:
# Fetch and migrate rows in batches
for batch in mysql_conn.fetch_rows_from_id(
mysql_table,
primary_key,
last_id
):
if not batch:
break
# Transform batch
transformed = DataTransformer.transform_batch(
mysql_table,
batch
)
# Insert batch
columns = DataTransformer.get_column_order(pg_table)
inserted = pg_conn.insert_batch(
pg_table,
transformed,
columns
)
if inserted > 0:
# Get the max ID from the batch
batch_max_id = max(
int(row.get(primary_key, 0)) for row in batch
) )
migrated += inserted
progress.update(inserted)
logger.info(f"Last timestamp in PostgreSQL: {pg_max_timestamp}") # Update state after each batch
if pg_table not in self.state.state:
self.state.state[pg_table] = {}
self.state.state[pg_table]["last_id"] = batch_max_id
self.state.state[pg_table]["total_migrated"] = previously_migrated + migrated
self.state.state[pg_table]["last_updated"] = datetime.utcnow().isoformat()
self.state._save_state()
if dry_run: logger.info(
logger.info("[DRY RUN] Would migrate rows after timestamp") f"✓ ID-based incremental migration complete: {migrated} rows migrated "
return 0 f"to {pg_table}"
)
migrated = 0 return migrated
migration_start_time = datetime.utcnow().isoformat()
# Fetch and migrate rows in batches
batch_count = 0
for batch in mysql_conn.fetch_rows_since(
mysql_table,
last_timestamp
):
batch_count += 1
if batch_count == 1:
# Create progress tracker with unknown total
progress = ProgressTracker(
len(batch),
f"Migrating {mysql_table} (incremental)"
)
progress.__enter__()
# Transform batch
transformed = DataTransformer.transform_batch(
mysql_table,
batch
)
# Insert batch
columns = DataTransformer.get_column_order(pg_table)
inserted = pg_conn.insert_batch(
pg_table,
transformed,
columns
)
migrated += inserted
progress.update(inserted)
if batch_count == 0:
logger.info(f"No new rows to migrate for {mysql_table}")
return 0
progress.__exit__(None, None, None)
# Update migration state
self.state.set_last_timestamp(pg_table, migration_start_time)
self.state.increment_migration_count(pg_table, migrated)
logger.info(
f"✓ Incremental migration complete: {migrated} rows migrated "
f"to {pg_table}"
)
return migrated
except Exception as e:
logger.error(f"Incremental migration failed: {e}")
raise
def run_incremental_migration( def run_incremental_migration(
table: str, table: str,
dry_run: bool = False, dry_run: bool = False,
state_file: str = "migration_state.json" state_file: str = "migration_state.json",
use_id: bool = False
) -> int: ) -> int:
"""Run incremental migration for a table. """Run incremental migration for a table.
@@ -147,9 +277,10 @@ def run_incremental_migration(
table: Table name to migrate table: Table name to migrate
dry_run: If True, show what would be done without modifying data dry_run: If True, show what would be done without modifying data
state_file: Path to migration state file state_file: Path to migration state file
use_id: If True, use ID-based resumption, else use timestamp-based
Returns: Returns:
Number of rows migrated Number of rows migrated
""" """
migrator = IncrementalMigrator(table, state_file) migrator = IncrementalMigrator(table, state_file)
return migrator.migrate(dry_run=dry_run) return migrator.migrate(dry_run=dry_run, use_id=use_id)

View File

@@ -1,6 +1,6 @@
"""Data transformation from MySQL to PostgreSQL format.""" """Data transformation from MySQL to PostgreSQL format."""
from typing import Dict, Any, List from typing import Dict, Any, List
from datetime import datetime from datetime import datetime, time, timedelta
from config import ( from config import (
RAWDATACOR_COLUMNS, RAWDATACOR_COLUMNS,
ELABDATADISP_FIELD_MAPPING, ELABDATADISP_FIELD_MAPPING,
@@ -14,6 +14,36 @@ logger = get_logger(__name__)
class DataTransformer: class DataTransformer:
"""Transform MySQL data to PostgreSQL format.""" """Transform MySQL data to PostgreSQL format."""
@staticmethod
def _convert_time(event_time: Any) -> time:
"""Convert event_time to datetime.time object.
Handles multiple input types:
- str: Parse from "HH:MM:SS" format
- timedelta: Convert from MySQL TIME type (stored as timedelta)
- time: Return as-is
Args:
event_time: Time value from MySQL (str, timedelta, or time)
Returns:
datetime.time object
"""
if isinstance(event_time, str):
return datetime.strptime(event_time, "%H:%M:%S").time()
elif isinstance(event_time, timedelta):
# MySQL returns TIME as timedelta
# Extract seconds from timedelta and convert to time
total_seconds = int(event_time.total_seconds())
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
return time(hour=hours, minute=minutes, second=seconds)
elif isinstance(event_time, time):
return event_time
else:
raise ValueError(f"Unsupported event_time type: {type(event_time)}")
@staticmethod @staticmethod
def transform_rawdatacor_row(mysql_row: Dict[str, Any]) -> Dict[str, Any]: def transform_rawdatacor_row(mysql_row: Dict[str, Any]) -> Dict[str, Any]:
"""Transform a RAWDATACOR row from MySQL to PostgreSQL format. """Transform a RAWDATACOR row from MySQL to PostgreSQL format.
@@ -41,14 +71,35 @@ class DataTransformer:
"unit": unit if unit else None, "unit": unit if unit else None,
} }
# Combine event_date and event_time into event_timestamp
event_date = mysql_row.get("EventDate")
event_time = mysql_row.get("EventTime")
if event_date is not None and event_time is not None:
event_time_obj = DataTransformer._convert_time(event_time)
event_timestamp = datetime.combine(event_date, event_time_obj)
elif event_date is None or event_time is None:
# Log a warning for records with missing date/time
missing = []
if event_date is None:
missing.append("EventDate")
if event_time is None:
missing.append("EventTime")
logger.warning(
f"Row {mysql_row.get('id')} has NULL {', '.join(missing)}. "
f"Using default timestamp: 1970-01-01 00:00:00"
)
# Use default timestamp for records with missing date/time
event_timestamp = datetime(1970, 1, 1, 0, 0, 0)
else:
event_timestamp = None
# Create PostgreSQL row # Create PostgreSQL row
pg_row = { pg_row = {
"id": mysql_row["id"], "id": mysql_row["id"],
"unit_name": mysql_row.get("UnitName"), "unit_name": mysql_row.get("UnitName"),
"tool_name_id": mysql_row["ToolNameID"], "tool_name_id": mysql_row["ToolNameID"],
"node_num": mysql_row["NodeNum"], "node_num": mysql_row["NodeNum"],
"event_date": mysql_row["EventDate"], "event_timestamp": event_timestamp,
"event_time": mysql_row["EventTime"],
"bat_level": mysql_row["BatLevel"], "bat_level": mysql_row["BatLevel"],
"temperature": mysql_row["Temperature"], "temperature": mysql_row["Temperature"],
"measurements": measurements, "measurements": measurements,
@@ -90,14 +141,35 @@ class DataTransformer:
k: v for k, v in measurements.items() if v k: v for k, v in measurements.items() if v
} }
# Combine event_date and event_time into event_timestamp
event_date = mysql_row.get("EventDate")
event_time = mysql_row.get("EventTime")
if event_date is not None and event_time is not None:
event_time_obj = DataTransformer._convert_time(event_time)
event_timestamp = datetime.combine(event_date, event_time_obj)
elif event_date is None or event_time is None:
# Log a warning for records with missing date/time
missing = []
if event_date is None:
missing.append("EventDate")
if event_time is None:
missing.append("EventTime")
logger.warning(
f"Row {mysql_row.get('idElabData')} has NULL {', '.join(missing)}. "
f"Using default timestamp: 1970-01-01 00:00:00"
)
# Use default timestamp for records with missing date/time
event_timestamp = datetime(1970, 1, 1, 0, 0, 0)
else:
event_timestamp = None
# Create PostgreSQL row # Create PostgreSQL row
pg_row = { pg_row = {
"id_elab_data": mysql_row["idElabData"], "id_elab_data": mysql_row["idElabData"],
"unit_name": mysql_row.get("UnitName"), "unit_name": mysql_row.get("UnitName"),
"tool_name_id": mysql_row["ToolNameID"], "tool_name_id": mysql_row["ToolNameID"],
"node_num": mysql_row["NodeNum"], "node_num": mysql_row["NodeNum"],
"event_date": mysql_row["EventDate"], "event_timestamp": event_timestamp,
"event_time": mysql_row["EventTime"],
"state": mysql_row.get("State"), "state": mysql_row.get("State"),
"calc_err": mysql_row.get("calcerr", 0), "calc_err": mysql_row.get("calcerr", 0),
"measurements": measurements, "measurements": measurements,
@@ -150,8 +222,7 @@ class DataTransformer:
"unit_name", "unit_name",
"tool_name_id", "tool_name_id",
"node_num", "node_num",
"event_date", "event_timestamp",
"event_time",
"bat_level", "bat_level",
"temperature", "temperature",
"measurements", "measurements",
@@ -166,8 +237,7 @@ class DataTransformer:
"unit_name", "unit_name",
"tool_name_id", "tool_name_id",
"node_num", "node_num",
"event_date", "event_timestamp",
"event_time",
"state", "state",
"calc_err", "calc_err",
"measurements", "measurements",

View File

@@ -21,8 +21,7 @@ CREATE TABLE IF NOT EXISTS rawdatacor (
unit_name VARCHAR(32), unit_name VARCHAR(32),
tool_name_id VARCHAR(32) NOT NULL, tool_name_id VARCHAR(32) NOT NULL,
node_num INTEGER NOT NULL, node_num INTEGER NOT NULL,
event_date DATE NOT NULL, event_timestamp TIMESTAMP NOT NULL,
event_time TIME NOT NULL,
bat_level NUMERIC(4,2) NOT NULL, bat_level NUMERIC(4,2) NOT NULL,
temperature NUMERIC(5,2) NOT NULL, temperature NUMERIC(5,2) NOT NULL,
measurements JSONB, measurements JSONB,
@@ -30,7 +29,7 @@ CREATE TABLE IF NOT EXISTS rawdatacor (
bat_level_module NUMERIC(4,2), bat_level_module NUMERIC(4,2),
temperature_module NUMERIC(5,2), temperature_module NUMERIC(5,2),
rssi_module INTEGER rssi_module INTEGER
) PARTITION BY RANGE (EXTRACT(YEAR FROM event_date)); ) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp));
-- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints -- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints
-- with RANGE partitioning on expressions. Using sequence for id auto-increment. -- with RANGE partitioning on expressions. Using sequence for id auto-increment.
@@ -46,11 +45,18 @@ CREATE TABLE IF NOT EXISTS rawdatacor_{year}
FOR VALUES FROM ({year}) TO ({next_year}); FOR VALUES FROM ({year}) TO ({next_year});
""" """
# Add default partition for records outside the defined year range
sql += """
CREATE TABLE IF NOT EXISTS rawdatacor_default
PARTITION OF rawdatacor
DEFAULT;
"""
# Add indexes # Add indexes
sql += """ sql += """
-- Create indexes -- Create indexes
CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_raw CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_raw
ON rawdatacor(unit_name, tool_name_id, node_num, event_date, event_time); ON rawdatacor(unit_name, tool_name_id, node_num, event_timestamp);
CREATE INDEX IF NOT EXISTS idx_unit_tool_raw CREATE INDEX IF NOT EXISTS idx_unit_tool_raw
ON rawdatacor(unit_name, tool_name_id); ON rawdatacor(unit_name, tool_name_id);
@@ -58,8 +64,8 @@ CREATE INDEX IF NOT EXISTS idx_unit_tool_raw
CREATE INDEX IF NOT EXISTS idx_measurements_gin_raw CREATE INDEX IF NOT EXISTS idx_measurements_gin_raw
ON rawdatacor USING GIN (measurements); ON rawdatacor USING GIN (measurements);
CREATE INDEX IF NOT EXISTS idx_event_date_raw CREATE INDEX IF NOT EXISTS idx_event_timestamp_raw
ON rawdatacor(event_date); ON rawdatacor(event_timestamp);
""" """
return sql return sql
@@ -81,14 +87,13 @@ CREATE TABLE IF NOT EXISTS elabdatadisp (
unit_name VARCHAR(32), unit_name VARCHAR(32),
tool_name_id VARCHAR(32) NOT NULL, tool_name_id VARCHAR(32) NOT NULL,
node_num INTEGER NOT NULL, node_num INTEGER NOT NULL,
event_date DATE NOT NULL, event_timestamp TIMESTAMP NOT NULL,
event_time TIME NOT NULL,
state VARCHAR(32), state VARCHAR(32),
calc_err INTEGER DEFAULT 0, calc_err INTEGER DEFAULT 0,
measurements JSONB, measurements JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) PARTITION BY RANGE (EXTRACT(YEAR FROM event_date)); ) PARTITION BY RANGE (EXTRACT(YEAR FROM event_timestamp));
-- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints -- Note: PostgreSQL doesn't support PRIMARY KEY or UNIQUE constraints
-- with RANGE partitioning on expressions. Using sequence for id_elab_data auto-increment. -- with RANGE partitioning on expressions. Using sequence for id_elab_data auto-increment.
@@ -104,11 +109,18 @@ CREATE TABLE IF NOT EXISTS elabdatadisp_{year}
FOR VALUES FROM ({year}) TO ({next_year}); FOR VALUES FROM ({year}) TO ({next_year});
""" """
# Add default partition for records outside the defined year range
sql += """
CREATE TABLE IF NOT EXISTS elabdatadisp_default
PARTITION OF elabdatadisp
DEFAULT;
"""
# Add indexes # Add indexes
sql += """ sql += """
-- Create indexes -- Create indexes
CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_elab CREATE INDEX IF NOT EXISTS idx_unit_tool_node_datetime_elab
ON elabdatadisp(unit_name, tool_name_id, node_num, event_date, event_time); ON elabdatadisp(unit_name, tool_name_id, node_num, event_timestamp);
CREATE INDEX IF NOT EXISTS idx_unit_tool_elab CREATE INDEX IF NOT EXISTS idx_unit_tool_elab
ON elabdatadisp(unit_name, tool_name_id); ON elabdatadisp(unit_name, tool_name_id);
@@ -116,8 +128,8 @@ CREATE INDEX IF NOT EXISTS idx_unit_tool_elab
CREATE INDEX IF NOT EXISTS idx_measurements_gin_elab CREATE INDEX IF NOT EXISTS idx_measurements_gin_elab
ON elabdatadisp USING GIN (measurements); ON elabdatadisp USING GIN (measurements);
CREATE INDEX IF NOT EXISTS idx_event_date_elab CREATE INDEX IF NOT EXISTS idx_event_timestamp_elab
ON elabdatadisp(event_date); ON elabdatadisp(event_timestamp);
""" """
return sql return sql

View File

@@ -1,5 +1,6 @@
"""Test setup and basic functionality.""" """Test setup and basic functionality."""
import pytest import pytest
from datetime import timedelta, time
from config import get_settings, TABLE_CONFIGS, RAWDATACOR_COLUMNS, ELABDATADISP_FIELD_MAPPING from config import get_settings, TABLE_CONFIGS, RAWDATACOR_COLUMNS, ELABDATADISP_FIELD_MAPPING
from src.transformers.data_transformer import DataTransformer from src.transformers.data_transformer import DataTransformer
@@ -61,6 +62,12 @@ class TestDataTransformation:
assert pg_row["id"] == 1 assert pg_row["id"] == 1
assert pg_row["unit_name"] == "TestUnit" assert pg_row["unit_name"] == "TestUnit"
assert pg_row["tool_name_id"] == "Tool1" assert pg_row["tool_name_id"] == "Tool1"
assert pg_row["event_timestamp"] is not None
assert pg_row["event_timestamp"].year == 2024
assert pg_row["event_timestamp"].month == 1
assert pg_row["event_timestamp"].day == 1
assert pg_row["event_timestamp"].hour == 12
assert pg_row["event_timestamp"].minute == 0
assert isinstance(pg_row["measurements"], dict) assert isinstance(pg_row["measurements"], dict)
assert "0" in pg_row["measurements"] assert "0" in pg_row["measurements"]
assert pg_row["measurements"]["0"]["value"] == "100.5" assert pg_row["measurements"]["0"]["value"] == "100.5"
@@ -110,6 +117,12 @@ class TestDataTransformation:
# Verify # Verify
assert pg_row["id_elab_data"] == 5000 assert pg_row["id_elab_data"] == 5000
assert pg_row["state"] == "OK" assert pg_row["state"] == "OK"
assert pg_row["event_timestamp"] is not None
assert pg_row["event_timestamp"].year == 2024
assert pg_row["event_timestamp"].month == 1
assert pg_row["event_timestamp"].day == 1
assert pg_row["event_timestamp"].hour == 12
assert pg_row["event_timestamp"].minute == 0
assert isinstance(pg_row["measurements"], dict) assert isinstance(pg_row["measurements"], dict)
assert "shifts" in pg_row["measurements"] assert "shifts" in pg_row["measurements"]
assert "coordinates" in pg_row["measurements"] assert "coordinates" in pg_row["measurements"]
@@ -135,6 +148,105 @@ class TestDataTransformation:
assert "state" in columns assert "state" in columns
class TestTimeConversion:
"""Test time conversion utilities."""
def test_convert_time_from_string(self):
"""Test converting time from string format."""
event_time = "12:30:45"
result = DataTransformer._convert_time(event_time)
assert isinstance(result, time)
assert result.hour == 12
assert result.minute == 30
assert result.second == 45
def test_convert_time_from_timedelta(self):
"""Test converting time from timedelta (MySQL TIME format)."""
# MySQL returns TIME columns as timedelta
event_time = timedelta(hours=14, minutes=25, seconds=30)
result = DataTransformer._convert_time(event_time)
assert isinstance(result, time)
assert result.hour == 14
assert result.minute == 25
assert result.second == 30
def test_convert_time_from_time_object(self):
"""Test converting time from time object."""
event_time = time(10, 15, 20)
result = DataTransformer._convert_time(event_time)
assert isinstance(result, time)
assert result.hour == 10
assert result.minute == 15
assert result.second == 20
def test_rawdatacor_with_timedelta(self):
"""Test RAWDATACOR transformation with timedelta event_time."""
mysql_row = {
"id": 1,
"UnitName": "TestUnit",
"ToolNameID": "Tool1",
"NodeNum": 1,
"EventDate": "2024-01-01",
"EventTime": timedelta(hours=12, minutes=0, seconds=0), # MySQL TIME format
"BatLevel": 3.5,
"Temperature": 25.5,
"Val0": "100.5",
"Val1": None,
"Val2": "200.3",
"Val0_unitmisure": "°C",
"Val1_unitmisure": "bar",
"Val2_unitmisure": "m/s",
}
# Add remaining Val columns as None
for i in range(3, 16):
col = f"Val{i:X}"
mysql_row[col] = None
mysql_row[f"{col}_unitmisure"] = None
pg_row = DataTransformer.transform_rawdatacor_row(mysql_row)
assert pg_row["event_timestamp"] is not None
assert pg_row["event_timestamp"].year == 2024
assert pg_row["event_timestamp"].month == 1
assert pg_row["event_timestamp"].day == 1
assert pg_row["event_timestamp"].hour == 12
assert pg_row["event_timestamp"].minute == 0
def test_rawdatacor_with_null_eventtime(self):
"""Test RAWDATACOR transformation with NULL EventTime uses default timestamp."""
mysql_row = {
"id": 2140982,
"UnitName": "OLD_ID0002",
"ToolNameID": "DT0001",
"NodeNum": 1,
"EventDate": "2023-09-05",
"EventTime": None, # NULL EventTime
"BatLevel": 12.90,
"Temperature": 13.40,
"Val0": "-1709",
"Val1": None,
"Val0_unitmisure": None,
"Val1_unitmisure": None,
}
# Add remaining Val columns as None
for i in range(2, 16):
col = f"Val{i:X}"
mysql_row[col] = None
mysql_row[f"{col}_unitmisure"] = None
pg_row = DataTransformer.transform_rawdatacor_row(mysql_row)
# Should use default timestamp 1970-01-01 00:00:00
assert pg_row["event_timestamp"] is not None
assert pg_row["event_timestamp"].year == 1970
assert pg_row["event_timestamp"].month == 1
assert pg_row["event_timestamp"].day == 1
assert pg_row["event_timestamp"].hour == 0
assert pg_row["event_timestamp"].minute == 0
class TestFieldMapping: class TestFieldMapping:
"""Test field mapping configuration.""" """Test field mapping configuration."""