feat: Add error logging and fix incremental migration state tracking
Implement comprehensive error handling and fix state management bug in incremental migration: Error Logging System: - Add validation for consolidation keys (NULL dates, empty IDs, corrupted Java strings) - Log invalid keys to dedicated error files with detailed reasons - Full migration: migration_errors_<table>_<partition>.log - Incremental migration: migration_errors_<table>_incremental_<timestamp>.log (timestamped to preserve history) - Report total count of skipped invalid keys at migration completion - Auto-delete empty error log files State Tracking Fix: - Fix critical bug where last_key wasn't updated after final buffer flush - Track last_processed_key throughout migration loop - Update state both during periodic flushes and after final flush - Ensures incremental migration correctly resumes from last migrated key Validation Checks: - EventDate IS NULL or EventDate = '0000-00-00' - EventTime IS NULL - ToolNameID IS NULL or empty string - UnitName IS NULL or empty string - UnitName starting with '[L' (corrupted Java strings) Documentation: - Update README.md with error logging behavior - Update MIGRATION_WORKFLOW.md with validation details - Update CHANGELOG.md with new features and fixes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
29
CHANGELOG.md
29
CHANGELOG.md
@@ -8,6 +8,8 @@
|
|||||||
- **State management in PostgreSQL**: Replaced JSON file with `migration_state` table for more reliable tracking
|
- **State management in PostgreSQL**: Replaced JSON file with `migration_state` table for more reliable tracking
|
||||||
- **Sync utility**: Added `scripts/sync_migration_state.py` to sync state with actual data
|
- **Sync utility**: Added `scripts/sync_migration_state.py` to sync state with actual data
|
||||||
- **Performance optimization**: MySQL queries now instant using PRIMARY KEY filter
|
- **Performance optimization**: MySQL queries now instant using PRIMARY KEY filter
|
||||||
|
- **Data quality validation**: Automatically validates and logs invalid consolidation keys to dedicated error files
|
||||||
|
- **Error logging**: Invalid keys (null dates, empty tool IDs, corrupted Java strings) are logged and skipped during migration
|
||||||
- **Better documentation**: Consolidated and updated all documentation files
|
- **Better documentation**: Consolidated and updated all documentation files
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
@@ -35,6 +37,33 @@
|
|||||||
- **State synchronization**: Can now sync `migration_state` with actual data using utility script
|
- **State synchronization**: Can now sync `migration_state` with actual data using utility script
|
||||||
- **Duplicate handling**: Uses `ON CONFLICT DO NOTHING` to prevent duplicates
|
- **Duplicate handling**: Uses `ON CONFLICT DO NOTHING` to prevent duplicates
|
||||||
- **Last key tracking**: Properly updates global state after full migration
|
- **Last key tracking**: Properly updates global state after full migration
|
||||||
|
- **Corrupted data handling**: Both full and incremental migrations now validate keys and log errors instead of crashing
|
||||||
|
|
||||||
|
### Error Logging
|
||||||
|
|
||||||
|
Both full and incremental migrations now handle corrupted consolidation keys gracefully:
|
||||||
|
|
||||||
|
**Error files:**
|
||||||
|
- Full migration: `migration_errors_<table>_<partition>.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
|
||||||
|
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
|
||||||
|
|
||||||
|
Each incremental migration creates a new timestamped file to preserve error history across runs.
|
||||||
|
|
||||||
|
**File format:**
|
||||||
|
```
|
||||||
|
# Migration errors for <table> partition <partition>
|
||||||
|
# Format: UnitName|ToolNameID|EventDate|EventTime|Reason
|
||||||
|
|
||||||
|
ID0350||0000-00-00|0:00:00|EventDate is invalid: 0000-00-00
|
||||||
|
[Ljava.lang.String;@abc123|TOOL1|2024-01-01|10:00:00|UnitName is corrupted Java string: [Ljava.lang.String;@abc123
|
||||||
|
UNIT1||2024-01-01|10:00:00|ToolNameID is NULL or empty
|
||||||
|
```
|
||||||
|
|
||||||
|
**Behavior:**
|
||||||
|
- Invalid keys are automatically skipped to prevent migration failure
|
||||||
|
- Each skipped key is logged with the reason for rejection
|
||||||
|
- Total count of skipped keys is reported at the end of migration
|
||||||
|
- Empty error files (no errors) are automatically deleted
|
||||||
|
|
||||||
### Migration Guide (from old to new)
|
### Migration Guide (from old to new)
|
||||||
|
|
||||||
|
|||||||
@@ -81,6 +81,26 @@ Both tables use consolidation to group multiple measurements into a single JSONB
|
|||||||
(UnitName, ToolNameID, EventDate, EventTime)
|
(UnitName, ToolNameID, EventDate, EventTime)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Data Quality Validation
|
||||||
|
|
||||||
|
The migration automatically validates and logs invalid consolidation keys:
|
||||||
|
- `EventDate IS NULL` or `EventDate = '0000-00-00'`
|
||||||
|
- `ToolNameID IS NULL` or `ToolNameID = ''` (empty string)
|
||||||
|
- `UnitName IS NULL` or `UnitName = ''` (empty string)
|
||||||
|
- `UnitName` starting with `[L` (corrupted Java strings like `[Ljava.lang.String;@...`)
|
||||||
|
- `EventTime IS NULL`
|
||||||
|
|
||||||
|
Invalid keys are:
|
||||||
|
- **Logged to error files** for tracking and analysis
|
||||||
|
- **Skipped automatically** to prevent migration failures
|
||||||
|
- **Counted and reported** at the end of migration
|
||||||
|
|
||||||
|
Error log files:
|
||||||
|
- Full migration: `migration_errors_<table>_<partition>.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
|
||||||
|
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
|
||||||
|
|
||||||
|
Each incremental migration creates a new timestamped file to preserve history.
|
||||||
|
|
||||||
### Why Consolidation?
|
### Why Consolidation?
|
||||||
|
|
||||||
Instead of migrating individual sensor readings, we:
|
Instead of migrating individual sensor readings, we:
|
||||||
|
|||||||
24
README.md
24
README.md
@@ -138,6 +138,30 @@ Il tool non migra le righe MySQL 1:1 in PostgreSQL. Invece, **consolida** multip
|
|||||||
(UnitName, ToolNameID, EventDate, EventTime)
|
(UnitName, ToolNameID, EventDate, EventTime)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Validazione e Gestione Dati Corrotti
|
||||||
|
|
||||||
|
La migrazione valida automaticamente le chiavi di consolidamento e gestisce dati corrotti:
|
||||||
|
|
||||||
|
**Validazioni applicate:**
|
||||||
|
- `EventDate IS NULL` o `EventDate = '0000-00-00'`
|
||||||
|
- `ToolNameID IS NULL` o `ToolNameID = ''` (stringa vuota)
|
||||||
|
- `UnitName IS NULL` o `UnitName = ''` (stringa vuota)
|
||||||
|
- `UnitName` che inizia con `[L` (stringhe Java corrotte come `[Ljava.lang.String;@...`)
|
||||||
|
- `EventTime IS NULL`
|
||||||
|
|
||||||
|
**Comportamento:**
|
||||||
|
- Le chiavi non valide vengono **saltate automaticamente** per evitare interruzioni
|
||||||
|
- Ogni chiave scartata viene **loggata in file dedicati** per tracciabilità
|
||||||
|
- Il numero totale di chiavi scartate viene **riportato alla fine** della migrazione
|
||||||
|
|
||||||
|
**File di log degli errori:**
|
||||||
|
- Full migration: `migration_errors_<table>_<partition>.log` (es. `migration_errors_rawdatacor_p2024.log`)
|
||||||
|
- Incremental migration: `migration_errors_<table>_incremental_<timestamp>.log` (es. `migration_errors_rawdatacor_incremental_20260101_194500.log`)
|
||||||
|
|
||||||
|
Ogni esecuzione incrementale crea un nuovo file con timestamp per mantenere lo storico.
|
||||||
|
|
||||||
|
Questo approccio garantisce che la migrazione non si interrompa per dati corrotti, permettendo comunque di tracciare e analizzare le anomalie.
|
||||||
|
|
||||||
### Perché Consolidare?
|
### Perché Consolidare?
|
||||||
|
|
||||||
**MySQL** ha molte righe per lo stesso momento:
|
**MySQL** ha molte righe per lo stesso momento:
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from src.migrator.consolidator import consolidate_rows
|
|||||||
from src.migrator.state_manager import StateManager
|
from src.migrator.state_manager import StateManager
|
||||||
from src.utils.logger import get_logger
|
from src.utils.logger import get_logger
|
||||||
from src.utils.progress import ProgressTracker
|
from src.utils.progress import ProgressTracker
|
||||||
|
from src.utils.validation import validate_consolidation_key, ErrorLogger
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
@@ -60,6 +61,9 @@ class IncrementalMigrator:
|
|||||||
# Initialize state manager
|
# Initialize state manager
|
||||||
state_mgr = StateManager(pg_conn, pg_table)
|
state_mgr = StateManager(pg_conn, pg_table)
|
||||||
|
|
||||||
|
# Initialize error logger
|
||||||
|
error_logger = ErrorLogger(pg_table, "incremental")
|
||||||
|
|
||||||
# Get last migrated key from migration_state
|
# Get last migrated key from migration_state
|
||||||
# This was saved during the last full/incremental migration
|
# This was saved during the last full/incremental migration
|
||||||
last_key = state_mgr.get_last_key()
|
last_key = state_mgr.get_last_key()
|
||||||
@@ -141,6 +145,7 @@ class IncrementalMigrator:
|
|||||||
offset = 0
|
offset = 0
|
||||||
insert_buffer = []
|
insert_buffer = []
|
||||||
buffer_size = self.settings.migration.consolidation_group_limit // 10
|
buffer_size = self.settings.migration.consolidation_group_limit // 10
|
||||||
|
last_processed_key = None # Track last key for final state update
|
||||||
|
|
||||||
with ProgressTracker(
|
with ProgressTracker(
|
||||||
total=None, # Unknown total
|
total=None, # Unknown total
|
||||||
@@ -179,7 +184,20 @@ class IncrementalMigrator:
|
|||||||
event_date = key.get("EventDate")
|
event_date = key.get("EventDate")
|
||||||
event_time = key.get("EventTime")
|
event_time = key.get("EventTime")
|
||||||
|
|
||||||
|
# Validate consolidation key before fetching
|
||||||
|
is_valid, error_reason = validate_consolidation_key(
|
||||||
|
unit_name, tool_name_id, event_date, event_time
|
||||||
|
)
|
||||||
|
|
||||||
|
if not is_valid:
|
||||||
|
# Log invalid key and skip
|
||||||
|
error_logger.log_invalid_key(
|
||||||
|
unit_name, tool_name_id, event_date, event_time, error_reason
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
# Fetch all MySQL rows for this key (all nodes, all partitions)
|
# Fetch all MySQL rows for this key (all nodes, all partitions)
|
||||||
|
try:
|
||||||
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
|
mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
|
||||||
mysql_table,
|
mysql_table,
|
||||||
unit_name,
|
unit_name,
|
||||||
@@ -187,6 +205,13 @@ class IncrementalMigrator:
|
|||||||
event_date,
|
event_date,
|
||||||
event_time
|
event_time
|
||||||
)
|
)
|
||||||
|
except Exception as e:
|
||||||
|
# Log corrupted key that caused fetch error
|
||||||
|
error_logger.log_invalid_key(
|
||||||
|
unit_name, tool_name_id, event_date, event_time,
|
||||||
|
f"Fetch failed: {e}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
if not mysql_rows:
|
if not mysql_rows:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
@@ -208,6 +233,14 @@ class IncrementalMigrator:
|
|||||||
# Add to insert buffer
|
# Add to insert buffer
|
||||||
insert_buffer.append(pg_row)
|
insert_buffer.append(pg_row)
|
||||||
|
|
||||||
|
# Track last processed key
|
||||||
|
last_processed_key = {
|
||||||
|
"unit_name": unit_name,
|
||||||
|
"tool_name_id": tool_name_id,
|
||||||
|
"event_date": str(event_date) if event_date else None,
|
||||||
|
"event_time": str(event_time) if event_time else None,
|
||||||
|
}
|
||||||
|
|
||||||
# Flush buffer when full
|
# Flush buffer when full
|
||||||
if len(insert_buffer) >= buffer_size:
|
if len(insert_buffer) >= buffer_size:
|
||||||
# Use COPY with ON CONFLICT to handle duplicates
|
# Use COPY with ON CONFLICT to handle duplicates
|
||||||
@@ -220,13 +253,7 @@ class IncrementalMigrator:
|
|||||||
migrated_rows += inserted
|
migrated_rows += inserted
|
||||||
progress.update(inserted)
|
progress.update(inserted)
|
||||||
|
|
||||||
# Update state with last key
|
# Update state with last key (from tracked variable)
|
||||||
last_processed_key = {
|
|
||||||
"unit_name": unit_name,
|
|
||||||
"tool_name_id": tool_name_id,
|
|
||||||
"event_date": str(event_date) if event_date else None,
|
|
||||||
"event_time": str(event_time) if event_time else None,
|
|
||||||
}
|
|
||||||
state_mgr.update_state(
|
state_mgr.update_state(
|
||||||
last_key=last_processed_key,
|
last_key=last_processed_key,
|
||||||
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
||||||
@@ -257,13 +284,24 @@ class IncrementalMigrator:
|
|||||||
progress.update(inserted)
|
progress.update(inserted)
|
||||||
logger.debug(f"Final flush: {inserted} rows")
|
logger.debug(f"Final flush: {inserted} rows")
|
||||||
|
|
||||||
|
# Update state with last key after final flush
|
||||||
|
if last_processed_key:
|
||||||
|
state_mgr.update_state(
|
||||||
|
last_key=last_processed_key,
|
||||||
|
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
|
||||||
|
)
|
||||||
|
|
||||||
# Get final row count
|
# Get final row count
|
||||||
final_count = pg_conn.get_row_count(pg_table)
|
final_count = pg_conn.get_row_count(pg_table)
|
||||||
logger.info(f"Total PostgreSQL rows: {final_count}")
|
logger.info(f"Total PostgreSQL rows: {final_count}")
|
||||||
|
|
||||||
|
# Close error logger and get count
|
||||||
|
error_logger.close()
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"✓ Incremental migration complete: "
|
f"✓ Incremental migration complete: "
|
||||||
f"{migrated_rows} new rows migrated to {pg_table}"
|
f"{migrated_rows} new rows migrated to {pg_table}, "
|
||||||
|
f"{error_logger.get_error_count()} invalid keys skipped"
|
||||||
)
|
)
|
||||||
|
|
||||||
return migrated_rows
|
return migrated_rows
|
||||||
|
|||||||
@@ -10,21 +10,30 @@ logger = get_logger(__name__)
|
|||||||
class ErrorLogger:
|
class ErrorLogger:
|
||||||
"""Log invalid migration keys to a file."""
|
"""Log invalid migration keys to a file."""
|
||||||
|
|
||||||
def __init__(self, table: str, partition: str):
|
def __init__(self, table: str, partition: str, use_timestamp: bool = False):
|
||||||
"""Initialize error logger.
|
"""Initialize error logger.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
table: Table name
|
table: Table name
|
||||||
partition: Partition name
|
partition: Partition name (e.g., 'p2024' or 'incremental')
|
||||||
|
use_timestamp: If True, add timestamp to filename (for incremental migrations)
|
||||||
"""
|
"""
|
||||||
self.table = table
|
self.table = table
|
||||||
self.partition = partition
|
self.partition = partition
|
||||||
|
|
||||||
|
# Add timestamp to filename for incremental migrations to avoid overwriting
|
||||||
|
if use_timestamp or partition == "incremental":
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
self.error_file = f"migration_errors_{table}_{partition}_{timestamp}.log"
|
||||||
|
else:
|
||||||
self.error_file = f"migration_errors_{table}_{partition}.log"
|
self.error_file = f"migration_errors_{table}_{partition}.log"
|
||||||
|
|
||||||
self.error_count = 0
|
self.error_count = 0
|
||||||
|
|
||||||
# Create error file with header
|
# Create error file with header
|
||||||
with open(self.error_file, "w") as f:
|
with open(self.error_file, "w") as f:
|
||||||
f.write(f"# Migration errors for {table} partition {partition}\n")
|
f.write(f"# Migration errors for {table} partition {partition}\n")
|
||||||
|
f.write(f"# Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||||
f.write("# Format: UnitName|ToolNameID|EventDate|EventTime|Reason\n\n")
|
f.write("# Format: UnitName|ToolNameID|EventDate|EventTime|Reason\n\n")
|
||||||
|
|
||||||
logger.info(f"Error log file created: {self.error_file}")
|
logger.info(f"Error log file created: {self.error_file}")
|
||||||
@@ -99,6 +108,10 @@ def validate_consolidation_key(
|
|||||||
if unit_name is None or unit_name == "":
|
if unit_name is None or unit_name == "":
|
||||||
return False, "UnitName is NULL or empty"
|
return False, "UnitName is NULL or empty"
|
||||||
|
|
||||||
|
# Check for corrupted Java strings (like '[Ljava.lang.String;@...')
|
||||||
|
if isinstance(unit_name, str) and unit_name.startswith("[L"):
|
||||||
|
return False, f"UnitName is corrupted Java string: {unit_name}"
|
||||||
|
|
||||||
if tool_name_id is None or tool_name_id == "":
|
if tool_name_id is None or tool_name_id == "":
|
||||||
return False, "ToolNameID is NULL or empty"
|
return False, "ToolNameID is NULL or empty"
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user