Replace session-level counting with direct table COUNT queries to ensure total_rows_migrated always reflects actual reality in PostgreSQL. This fixes the discrepancy where the counter was only tracking rows from the current session and didn't account for earlier insertions or duplicates from failed resume attempts. Key improvements: - Use get_row_count() after each batch to get authoritative total - Preserve previous count on resume and accumulate across sessions - Remove dependency on error-prone session-level counters - Ensures migration_state.total_rows_migrated matches actual table row count 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
432 lines
16 KiB
Python
432 lines
16 KiB
Python
"""Test setup and basic functionality."""
|
|
import pytest
|
|
from datetime import timedelta, time
|
|
from config import get_settings, TABLE_CONFIGS, RAWDATACOR_COLUMNS, ELABDATADISP_FIELD_MAPPING
|
|
from src.transformers.data_transformer import DataTransformer
|
|
|
|
|
|
class TestConfiguration:
|
|
"""Test configuration loading."""
|
|
|
|
def test_settings_loaded(self):
|
|
"""Test that settings can be loaded."""
|
|
settings = get_settings()
|
|
assert settings is not None
|
|
assert settings.mysql.host is not None
|
|
assert settings.postgres.host is not None
|
|
|
|
def test_table_configs_exist(self):
|
|
"""Test that table configurations exist."""
|
|
assert "RAWDATACOR" in TABLE_CONFIGS or len(TABLE_CONFIGS) > 0
|
|
|
|
def test_migration_batch_size(self):
|
|
"""Test that batch size is configured."""
|
|
settings = get_settings()
|
|
assert settings.migration.batch_size > 0
|
|
assert settings.migration.batch_size <= 1000000
|
|
|
|
|
|
class TestDataTransformation:
|
|
"""Test data transformation logic."""
|
|
|
|
def test_rawdatacor_transformation(self):
|
|
"""Test RAWDATACOR row transformation."""
|
|
# Sample MySQL row
|
|
mysql_row = {
|
|
"id": 1,
|
|
"UnitName": "TestUnit",
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 1,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": "12:00:00",
|
|
"BatLevel": 3.5,
|
|
"Temperature": 25.5,
|
|
"Val0": "100.5",
|
|
"Val1": None,
|
|
"Val2": "200.3",
|
|
"Val0_unitmisure": "°C",
|
|
"Val1_unitmisure": "bar",
|
|
"Val2_unitmisure": "m/s",
|
|
}
|
|
|
|
# Add remaining Val columns as None
|
|
for i in range(3, 16):
|
|
col = f"Val{i:X}" # Val3-ValF
|
|
mysql_row[col] = None
|
|
mysql_row[f"{col}_unitmisure"] = None
|
|
|
|
# Transform
|
|
pg_row = DataTransformer.transform_rawdatacor_row(mysql_row)
|
|
|
|
# Verify
|
|
assert pg_row["id"] == 1
|
|
assert pg_row["unit_name"] == "TestUnit"
|
|
assert pg_row["tool_name_id"] == "Tool1"
|
|
assert "node_num" not in pg_row # node_num should NOT be a column anymore
|
|
assert pg_row["event_timestamp"] is not None
|
|
assert pg_row["event_timestamp"].year == 2024
|
|
assert pg_row["event_timestamp"].month == 1
|
|
assert pg_row["event_timestamp"].day == 1
|
|
assert pg_row["event_timestamp"].hour == 12
|
|
assert pg_row["event_timestamp"].minute == 0
|
|
assert isinstance(pg_row["measurements"], dict)
|
|
# Verify node is a key in measurements JSONB (single node case)
|
|
assert "1" in pg_row["measurements"] # node number as key
|
|
assert "0" in pg_row["measurements"]["1"]
|
|
assert pg_row["measurements"]["1"]["0"]["value"] == "100.5"
|
|
assert pg_row["measurements"]["1"]["0"]["unit"] == "°C"
|
|
assert "1" not in pg_row["measurements"]["1"] # NULL values excluded
|
|
assert "2" in pg_row["measurements"]["1"]
|
|
assert pg_row["measurements"]["1"]["2"]["value"] == "200.3"
|
|
assert pg_row["measurements"]["1"]["2"]["unit"] == "m/s"
|
|
|
|
def test_elabdatadisp_transformation(self):
|
|
"""Test ELABDATADISP row transformation."""
|
|
# Sample MySQL row
|
|
mysql_row = {
|
|
"idElabData": 5000,
|
|
"UnitName": "TestUnit",
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 1,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": "12:00:00",
|
|
"State": "OK",
|
|
"calcerr": 0,
|
|
"XShift": 1.234567,
|
|
"YShift": 2.345678,
|
|
"ZShift": 3.456789,
|
|
"HShift": 4.567890,
|
|
"HShiftDir": 5.678901,
|
|
"HShift_local": 6.789012,
|
|
"X": 10.123456,
|
|
"Y": 20.234567,
|
|
"Z": 30.345678,
|
|
"Xstar": 40.456789,
|
|
"Zstar": 50.567890,
|
|
"speed": 1.111111,
|
|
"speed_local": 2.222222,
|
|
"acceleration": 3.333333,
|
|
"acceleration_local": 4.444444,
|
|
"T_node": 25.5,
|
|
"load_value": 100.5,
|
|
"water_level": 50.5,
|
|
"pressure": 1.013,
|
|
"AlfaX": 0.123456,
|
|
"AlfaY": 0.234567,
|
|
"Area": 100.5,
|
|
}
|
|
|
|
# Transform
|
|
pg_row = DataTransformer.transform_elabdatadisp_row(mysql_row)
|
|
|
|
# Verify
|
|
assert pg_row["id_elab_data"] == 5000
|
|
assert pg_row["state"] == "OK"
|
|
assert pg_row["event_timestamp"] is not None
|
|
assert pg_row["event_timestamp"].year == 2024
|
|
assert pg_row["event_timestamp"].month == 1
|
|
assert pg_row["event_timestamp"].day == 1
|
|
assert pg_row["event_timestamp"].hour == 12
|
|
assert pg_row["event_timestamp"].minute == 0
|
|
assert isinstance(pg_row["measurements"], dict)
|
|
assert "shifts" in pg_row["measurements"]
|
|
assert "coordinates" in pg_row["measurements"]
|
|
assert "kinematics" in pg_row["measurements"]
|
|
assert pg_row["measurements"]["shifts"]["x"] == 1.234567
|
|
assert pg_row["measurements"]["coordinates"]["x"] == 10.123456
|
|
assert pg_row["measurements"]["kinematics"]["speed"] == 1.111111
|
|
|
|
def test_column_order_rawdatacor(self):
|
|
"""Test column order for RAWDATACOR."""
|
|
columns = DataTransformer.get_column_order("rawdatacor")
|
|
assert isinstance(columns, list)
|
|
assert "id" in columns
|
|
assert "measurements" in columns
|
|
assert "unit_name" in columns
|
|
|
|
def test_column_order_elabdatadisp(self):
|
|
"""Test column order for ELABDATADISP."""
|
|
columns = DataTransformer.get_column_order("elabdatadisp")
|
|
assert isinstance(columns, list)
|
|
assert "id_elab_data" in columns
|
|
assert "measurements" in columns
|
|
assert "state" in columns
|
|
|
|
|
|
class TestTimeConversion:
|
|
"""Test time conversion utilities."""
|
|
|
|
def test_convert_time_from_string(self):
|
|
"""Test converting time from string format."""
|
|
event_time = "12:30:45"
|
|
result = DataTransformer._convert_time(event_time)
|
|
assert isinstance(result, time)
|
|
assert result.hour == 12
|
|
assert result.minute == 30
|
|
assert result.second == 45
|
|
|
|
def test_convert_time_from_timedelta(self):
|
|
"""Test converting time from timedelta (MySQL TIME format)."""
|
|
# MySQL returns TIME columns as timedelta
|
|
event_time = timedelta(hours=14, minutes=25, seconds=30)
|
|
result = DataTransformer._convert_time(event_time)
|
|
assert isinstance(result, time)
|
|
assert result.hour == 14
|
|
assert result.minute == 25
|
|
assert result.second == 30
|
|
|
|
def test_convert_time_from_time_object(self):
|
|
"""Test converting time from time object."""
|
|
event_time = time(10, 15, 20)
|
|
result = DataTransformer._convert_time(event_time)
|
|
assert isinstance(result, time)
|
|
assert result.hour == 10
|
|
assert result.minute == 15
|
|
assert result.second == 20
|
|
|
|
def test_rawdatacor_with_timedelta(self):
|
|
"""Test RAWDATACOR transformation with timedelta event_time."""
|
|
mysql_row = {
|
|
"id": 1,
|
|
"UnitName": "TestUnit",
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 1,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": timedelta(hours=12, minutes=0, seconds=0), # MySQL TIME format
|
|
"BatLevel": 3.5,
|
|
"Temperature": 25.5,
|
|
"Val0": "100.5",
|
|
"Val1": None,
|
|
"Val2": "200.3",
|
|
"Val0_unitmisure": "°C",
|
|
"Val1_unitmisure": "bar",
|
|
"Val2_unitmisure": "m/s",
|
|
}
|
|
|
|
# Add remaining Val columns as None
|
|
for i in range(3, 16):
|
|
col = f"Val{i:X}"
|
|
mysql_row[col] = None
|
|
mysql_row[f"{col}_unitmisure"] = None
|
|
|
|
pg_row = DataTransformer.transform_rawdatacor_row(mysql_row)
|
|
|
|
assert pg_row["event_timestamp"] is not None
|
|
assert pg_row["event_timestamp"].year == 2024
|
|
assert pg_row["event_timestamp"].month == 1
|
|
assert pg_row["event_timestamp"].day == 1
|
|
assert pg_row["event_timestamp"].hour == 12
|
|
assert pg_row["event_timestamp"].minute == 0
|
|
|
|
def test_rawdatacor_with_null_eventtime(self):
|
|
"""Test RAWDATACOR transformation with NULL EventTime uses default timestamp."""
|
|
mysql_row = {
|
|
"id": 2140982,
|
|
"UnitName": "OLD_ID0002",
|
|
"ToolNameID": "DT0001",
|
|
"NodeNum": 1,
|
|
"EventDate": "2023-09-05",
|
|
"EventTime": None, # NULL EventTime
|
|
"BatLevel": 12.90,
|
|
"Temperature": 13.40,
|
|
"Val0": "-1709",
|
|
"Val1": None,
|
|
"Val0_unitmisure": None,
|
|
"Val1_unitmisure": None,
|
|
}
|
|
|
|
# Add remaining Val columns as None
|
|
for i in range(2, 16):
|
|
col = f"Val{i:X}"
|
|
mysql_row[col] = None
|
|
mysql_row[f"{col}_unitmisure"] = None
|
|
|
|
pg_row = DataTransformer.transform_rawdatacor_row(mysql_row)
|
|
|
|
# Should use default timestamp 1970-01-01 00:00:00
|
|
assert pg_row["event_timestamp"] is not None
|
|
assert pg_row["event_timestamp"].year == 1970
|
|
assert pg_row["event_timestamp"].month == 1
|
|
assert pg_row["event_timestamp"].day == 1
|
|
assert pg_row["event_timestamp"].hour == 0
|
|
assert pg_row["event_timestamp"].minute == 0
|
|
|
|
# Verify that unit is NOT included when it's None (optimization)
|
|
assert "0" in pg_row["measurements"]
|
|
assert pg_row["measurements"]["0"]["value"] == "-1709"
|
|
assert "unit" not in pg_row["measurements"]["0"] # unit should not exist when None
|
|
|
|
def test_rawdatacor_consolidation(self):
|
|
"""Test consolidation of multiple nodes into single JSONB row."""
|
|
# Create three rows with same (unit, tool, timestamp) but different nodes
|
|
rows = [
|
|
{
|
|
"id": 1,
|
|
"UnitName": "TestUnit",
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 1,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": "12:00:00",
|
|
"BatLevel": 3.5,
|
|
"Temperature": 25.5,
|
|
"Val0": "100.5",
|
|
"Val1": None,
|
|
"Val2": "200.3",
|
|
"Val0_unitmisure": "°C",
|
|
"Val1_unitmisure": None,
|
|
"Val2_unitmisure": "m/s",
|
|
},
|
|
{
|
|
"id": 2,
|
|
"UnitName": "TestUnit",
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 2,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": "12:00:00",
|
|
"BatLevel": 3.5,
|
|
"Temperature": 25.5,
|
|
"Val0": "101.2",
|
|
"Val1": None,
|
|
"Val2": "205.1",
|
|
"Val0_unitmisure": "°C",
|
|
"Val1_unitmisure": None,
|
|
"Val2_unitmisure": "m/s",
|
|
},
|
|
{
|
|
"id": 3,
|
|
"UnitName": "TestUnit",
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 3,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": "12:00:00",
|
|
"BatLevel": 3.5,
|
|
"Temperature": 25.5,
|
|
"Val0": "102.0",
|
|
"Val1": None,
|
|
"Val2": "210.5",
|
|
"Val0_unitmisure": "°C",
|
|
"Val1_unitmisure": None,
|
|
"Val2_unitmisure": "m/s",
|
|
},
|
|
]
|
|
|
|
# Add remaining Val columns as None for all rows
|
|
for row in rows:
|
|
for i in range(3, 16):
|
|
col = f"Val{i:X}"
|
|
row[col] = None
|
|
row[f"{col}_unitmisure"] = None
|
|
row["created_at"] = None
|
|
row["BatLevelModule"] = None
|
|
row["TemperatureModule"] = None
|
|
row["RssiModule"] = None
|
|
|
|
# Consolidate
|
|
consolidated = DataTransformer.consolidate_rawdatacor_batch(rows)
|
|
|
|
# Should have 1 consolidated row (all three nodes have same unit/tool/timestamp)
|
|
assert len(consolidated) == 1
|
|
|
|
consolidated_row = consolidated[0]
|
|
|
|
# Verify consolidated row properties
|
|
assert consolidated_row["id"] == 3 # MAX(id) for proper resume
|
|
assert consolidated_row["unit_name"] == "TestUnit"
|
|
assert consolidated_row["tool_name_id"] == "Tool1"
|
|
assert consolidated_row["bat_level"] == 3.5
|
|
assert consolidated_row["temperature"] == 25.5
|
|
|
|
# Verify all three nodes are in measurements
|
|
measurements = consolidated_row["measurements"]
|
|
assert "1" in measurements
|
|
assert "2" in measurements
|
|
assert "3" in measurements
|
|
|
|
# Verify node 1 measurements
|
|
assert measurements["1"]["0"]["value"] == "100.5"
|
|
assert measurements["1"]["0"]["unit"] == "°C"
|
|
assert measurements["1"]["2"]["value"] == "200.3"
|
|
|
|
# Verify node 2 measurements
|
|
assert measurements["2"]["0"]["value"] == "101.2"
|
|
assert measurements["2"]["2"]["value"] == "205.1"
|
|
|
|
# Verify node 3 measurements
|
|
assert measurements["3"]["0"]["value"] == "102.0"
|
|
assert measurements["3"]["2"]["value"] == "210.5"
|
|
|
|
def test_rawdatacor_consolidation_with_different_keys(self):
|
|
"""Test that rows with different keys are NOT consolidated."""
|
|
# Two rows with different units
|
|
rows = [
|
|
{
|
|
"id": 1,
|
|
"UnitName": "Unit1",
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 1,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": "12:00:00",
|
|
"BatLevel": 3.5,
|
|
"Temperature": 25.5,
|
|
"Val0": "100.5",
|
|
"Val1": None,
|
|
"Val2": None,
|
|
"Val0_unitmisure": "°C",
|
|
"Val1_unitmisure": None,
|
|
"Val2_unitmisure": None,
|
|
},
|
|
{
|
|
"id": 2,
|
|
"UnitName": "Unit2", # Different unit
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 1,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": "12:00:00",
|
|
"BatLevel": 3.5,
|
|
"Temperature": 25.5,
|
|
"Val0": "100.5",
|
|
"Val1": None,
|
|
"Val2": None,
|
|
"Val0_unitmisure": "°C",
|
|
"Val1_unitmisure": None,
|
|
"Val2_unitmisure": None,
|
|
},
|
|
]
|
|
|
|
# Add remaining Val columns as None for all rows
|
|
for row in rows:
|
|
for i in range(3, 16):
|
|
col = f"Val{i:X}"
|
|
row[col] = None
|
|
row[f"{col}_unitmisure"] = None
|
|
row["created_at"] = None
|
|
row["BatLevelModule"] = None
|
|
row["TemperatureModule"] = None
|
|
row["RssiModule"] = None
|
|
|
|
# Consolidate
|
|
consolidated = DataTransformer.consolidate_rawdatacor_batch(rows)
|
|
|
|
# Should have 2 rows (different units)
|
|
assert len(consolidated) == 2
|
|
assert consolidated[0]["unit_name"] == "Unit1"
|
|
assert consolidated[1]["unit_name"] == "Unit2"
|
|
|
|
|
|
class TestFieldMapping:
|
|
"""Test field mapping configuration."""
|
|
|
|
def test_all_rawdatacor_columns_mapped(self):
|
|
"""Test that all RAWDATACOR value columns are defined."""
|
|
for val_col in RAWDATACOR_COLUMNS["val_columns"]:
|
|
assert val_col.startswith("Val")
|
|
|
|
def test_all_elabdatadisp_fields_mapped(self):
|
|
"""Test that all ELABDATADISP fields are mapped."""
|
|
mapped_fields = set(ELABDATADISP_FIELD_MAPPING.keys())
|
|
assert len(mapped_fields) > 20 # Should have many fields
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|