diff --git a/CHANGELOG.md b/CHANGELOG.md
index c89c3d9..122b86d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,8 @@
- **State management in PostgreSQL**: Replaced JSON file with `migration_state` table for more reliable tracking
- **Sync utility**: Added `scripts/sync_migration_state.py` to sync state with actual data
- **Performance optimization**: MySQL queries now instant using PRIMARY KEY filter
+- **Data quality validation**: Automatically validates and logs invalid consolidation keys to dedicated error files
+- **Error logging**: Invalid keys (null dates, empty tool IDs, corrupted Java strings) are logged and skipped during migration
- **Better documentation**: Consolidated and updated all documentation files
### Changed
@@ -35,6 +37,33 @@
- **State synchronization**: Can now sync `migration_state` with actual data using utility script
- **Duplicate handling**: Uses `ON CONFLICT DO NOTHING` to prevent duplicates
- **Last key tracking**: Properly updates global state after full migration
+- **Corrupted data handling**: Both full and incremental migrations now validate keys and log errors instead of crashing
+
+### Error Logging
+
+Both full and incremental migrations now handle corrupted consolidation keys gracefully:
+
+**Error files:**
+- Full migration: `migration_errors_
_.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
+- Incremental migration: `migration_errors__incremental_.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
+
+Each incremental migration creates a new timestamped file to preserve error history across runs.
+
+**File format:**
+```
+# Migration errors for partition
+# Format: UnitName|ToolNameID|EventDate|EventTime|Reason
+
+ID0350||0000-00-00|0:00:00|EventDate is invalid: 0000-00-00
+[Ljava.lang.String;@abc123|TOOL1|2024-01-01|10:00:00|UnitName is corrupted Java string: [Ljava.lang.String;@abc123
+UNIT1||2024-01-01|10:00:00|ToolNameID is NULL or empty
+```
+
+**Behavior:**
+- Invalid keys are automatically skipped to prevent migration failure
+- Each skipped key is logged with the reason for rejection
+- Total count of skipped keys is reported at the end of migration
+- Empty error files (no errors) are automatically deleted
### Migration Guide (from old to new)
diff --git a/MIGRATION_WORKFLOW.md b/MIGRATION_WORKFLOW.md
index c721ca9..87fc077 100644
--- a/MIGRATION_WORKFLOW.md
+++ b/MIGRATION_WORKFLOW.md
@@ -81,6 +81,26 @@ Both tables use consolidation to group multiple measurements into a single JSONB
(UnitName, ToolNameID, EventDate, EventTime)
```
+### Data Quality Validation
+
+The migration automatically validates and logs invalid consolidation keys:
+- `EventDate IS NULL` or `EventDate = '0000-00-00'`
+- `ToolNameID IS NULL` or `ToolNameID = ''` (empty string)
+- `UnitName IS NULL` or `UnitName = ''` (empty string)
+- `UnitName` starting with `[L` (corrupted Java strings like `[Ljava.lang.String;@...`)
+- `EventTime IS NULL`
+
+Invalid keys are:
+- **Logged to error files** for tracking and analysis
+- **Skipped automatically** to prevent migration failures
+- **Counted and reported** at the end of migration
+
+Error log files:
+- Full migration: `migration_errors__.log` (e.g., `migration_errors_rawdatacor_p2024.log`)
+- Incremental migration: `migration_errors__incremental_.log` (e.g., `migration_errors_rawdatacor_incremental_20260101_194500.log`)
+
+Each incremental migration creates a new timestamped file to preserve history.
+
### Why Consolidation?
Instead of migrating individual sensor readings, we:
diff --git a/README.md b/README.md
index ad5980d..68e18a0 100644
--- a/README.md
+++ b/README.md
@@ -138,6 +138,30 @@ Il tool non migra le righe MySQL 1:1 in PostgreSQL. Invece, **consolida** multip
(UnitName, ToolNameID, EventDate, EventTime)
```
+### Validazione e Gestione Dati Corrotti
+
+La migrazione valida automaticamente le chiavi di consolidamento e gestisce dati corrotti:
+
+**Validazioni applicate:**
+- `EventDate IS NULL` o `EventDate = '0000-00-00'`
+- `ToolNameID IS NULL` o `ToolNameID = ''` (stringa vuota)
+- `UnitName IS NULL` o `UnitName = ''` (stringa vuota)
+- `UnitName` che inizia con `[L` (stringhe Java corrotte come `[Ljava.lang.String;@...`)
+- `EventTime IS NULL`
+
+**Comportamento:**
+- Le chiavi non valide vengono **saltate automaticamente** per evitare interruzioni
+- Ogni chiave scartata viene **loggata in file dedicati** per tracciabilità
+- Il numero totale di chiavi scartate viene **riportato alla fine** della migrazione
+
+**File di log degli errori:**
+- Full migration: `migration_errors__.log` (es. `migration_errors_rawdatacor_p2024.log`)
+- Incremental migration: `migration_errors__incremental_.log` (es. `migration_errors_rawdatacor_incremental_20260101_194500.log`)
+
+Ogni esecuzione incrementale crea un nuovo file con timestamp per mantenere lo storico.
+
+Questo approccio garantisce che la migrazione non si interrompa per dati corrotti, permettendo comunque di tracciare e analizzare le anomalie.
+
### Perché Consolidare?
**MySQL** ha molte righe per lo stesso momento:
diff --git a/src/migrator/incremental_migrator.py b/src/migrator/incremental_migrator.py
index b3085d4..77c27a8 100644
--- a/src/migrator/incremental_migrator.py
+++ b/src/migrator/incremental_migrator.py
@@ -11,6 +11,7 @@ from src.migrator.consolidator import consolidate_rows
from src.migrator.state_manager import StateManager
from src.utils.logger import get_logger
from src.utils.progress import ProgressTracker
+from src.utils.validation import validate_consolidation_key, ErrorLogger
logger = get_logger(__name__)
@@ -60,6 +61,9 @@ class IncrementalMigrator:
# Initialize state manager
state_mgr = StateManager(pg_conn, pg_table)
+ # Initialize error logger
+ error_logger = ErrorLogger(pg_table, "incremental")
+
# Get last migrated key from migration_state
# This was saved during the last full/incremental migration
last_key = state_mgr.get_last_key()
@@ -141,6 +145,7 @@ class IncrementalMigrator:
offset = 0
insert_buffer = []
buffer_size = self.settings.migration.consolidation_group_limit // 10
+ last_processed_key = None # Track last key for final state update
with ProgressTracker(
total=None, # Unknown total
@@ -179,15 +184,35 @@ class IncrementalMigrator:
event_date = key.get("EventDate")
event_time = key.get("EventTime")
- # Fetch all MySQL rows for this key (all nodes, all partitions)
- mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
- mysql_table,
- unit_name,
- tool_name_id,
- event_date,
- event_time
+ # Validate consolidation key before fetching
+ is_valid, error_reason = validate_consolidation_key(
+ unit_name, tool_name_id, event_date, event_time
)
+ if not is_valid:
+ # Log invalid key and skip
+ error_logger.log_invalid_key(
+ unit_name, tool_name_id, event_date, event_time, error_reason
+ )
+ continue
+
+ # Fetch all MySQL rows for this key (all nodes, all partitions)
+ try:
+ mysql_rows = mysql_conn.fetch_records_for_key_all_partitions(
+ mysql_table,
+ unit_name,
+ tool_name_id,
+ event_date,
+ event_time
+ )
+ except Exception as e:
+ # Log corrupted key that caused fetch error
+ error_logger.log_invalid_key(
+ unit_name, tool_name_id, event_date, event_time,
+ f"Fetch failed: {e}"
+ )
+ continue
+
if not mysql_rows:
logger.warning(
f"No records found for key: "
@@ -208,6 +233,14 @@ class IncrementalMigrator:
# Add to insert buffer
insert_buffer.append(pg_row)
+ # Track last processed key
+ last_processed_key = {
+ "unit_name": unit_name,
+ "tool_name_id": tool_name_id,
+ "event_date": str(event_date) if event_date else None,
+ "event_time": str(event_time) if event_time else None,
+ }
+
# Flush buffer when full
if len(insert_buffer) >= buffer_size:
# Use COPY with ON CONFLICT to handle duplicates
@@ -220,13 +253,7 @@ class IncrementalMigrator:
migrated_rows += inserted
progress.update(inserted)
- # Update state with last key
- last_processed_key = {
- "unit_name": unit_name,
- "tool_name_id": tool_name_id,
- "event_date": str(event_date) if event_date else None,
- "event_time": str(event_time) if event_time else None,
- }
+ # Update state with last key (from tracked variable)
state_mgr.update_state(
last_key=last_processed_key,
total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
@@ -257,13 +284,24 @@ class IncrementalMigrator:
progress.update(inserted)
logger.debug(f"Final flush: {inserted} rows")
+ # Update state with last key after final flush
+ if last_processed_key:
+ state_mgr.update_state(
+ last_key=last_processed_key,
+ total_rows_migrated=state_mgr.get_total_rows_migrated() + migrated_rows
+ )
+
# Get final row count
final_count = pg_conn.get_row_count(pg_table)
logger.info(f"Total PostgreSQL rows: {final_count}")
+ # Close error logger and get count
+ error_logger.close()
+
logger.info(
f"✓ Incremental migration complete: "
- f"{migrated_rows} new rows migrated to {pg_table}"
+ f"{migrated_rows} new rows migrated to {pg_table}, "
+ f"{error_logger.get_error_count()} invalid keys skipped"
)
return migrated_rows
diff --git a/src/utils/validation.py b/src/utils/validation.py
index 2b82c82..231baf3 100644
--- a/src/utils/validation.py
+++ b/src/utils/validation.py
@@ -10,21 +10,30 @@ logger = get_logger(__name__)
class ErrorLogger:
"""Log invalid migration keys to a file."""
- def __init__(self, table: str, partition: str):
+ def __init__(self, table: str, partition: str, use_timestamp: bool = False):
"""Initialize error logger.
Args:
table: Table name
- partition: Partition name
+ partition: Partition name (e.g., 'p2024' or 'incremental')
+ use_timestamp: If True, add timestamp to filename (for incremental migrations)
"""
self.table = table
self.partition = partition
- self.error_file = f"migration_errors_{table}_{partition}.log"
+
+ # Add timestamp to filename for incremental migrations to avoid overwriting
+ if use_timestamp or partition == "incremental":
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ self.error_file = f"migration_errors_{table}_{partition}_{timestamp}.log"
+ else:
+ self.error_file = f"migration_errors_{table}_{partition}.log"
+
self.error_count = 0
# Create error file with header
with open(self.error_file, "w") as f:
f.write(f"# Migration errors for {table} partition {partition}\n")
+ f.write(f"# Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("# Format: UnitName|ToolNameID|EventDate|EventTime|Reason\n\n")
logger.info(f"Error log file created: {self.error_file}")
@@ -99,6 +108,10 @@ def validate_consolidation_key(
if unit_name is None or unit_name == "":
return False, "UnitName is NULL or empty"
+ # Check for corrupted Java strings (like '[Ljava.lang.String;@...')
+ if isinstance(unit_name, str) and unit_name.startswith("[L"):
+ return False, f"UnitName is corrupted Java string: {unit_name}"
+
if tool_name_id is None or tool_name_id == "":
return False, "ToolNameID is NULL or empty"