Files
mysql2postgres/tests/test_setup.py
alex 0f217379ea fix: Use actual PostgreSQL row count for total_rows_migrated tracking
Replace session-level counting with direct table COUNT queries to ensure
total_rows_migrated always reflects actual reality in PostgreSQL. This fixes
the discrepancy where the counter was only tracking rows from the current session
and didn't account for earlier insertions or duplicates from failed resume attempts.

Key improvements:
- Use get_row_count() after each batch to get authoritative total
- Preserve previous count on resume and accumulate across sessions
- Remove dependency on error-prone session-level counters
- Ensures migration_state.total_rows_migrated matches actual table row count

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2025-12-23 15:33:27 +01:00

432 lines
16 KiB
Python

"""Test setup and basic functionality."""
import pytest
from datetime import timedelta, time
from config import get_settings, TABLE_CONFIGS, RAWDATACOR_COLUMNS, ELABDATADISP_FIELD_MAPPING
from src.transformers.data_transformer import DataTransformer
class TestConfiguration:
"""Test configuration loading."""
def test_settings_loaded(self):
"""Test that settings can be loaded."""
settings = get_settings()
assert settings is not None
assert settings.mysql.host is not None
assert settings.postgres.host is not None
def test_table_configs_exist(self):
"""Test that table configurations exist."""
assert "RAWDATACOR" in TABLE_CONFIGS or len(TABLE_CONFIGS) > 0
def test_migration_batch_size(self):
"""Test that batch size is configured."""
settings = get_settings()
assert settings.migration.batch_size > 0
assert settings.migration.batch_size <= 1000000
class TestDataTransformation:
"""Test data transformation logic."""
def test_rawdatacor_transformation(self):
"""Test RAWDATACOR row transformation."""
# Sample MySQL row
mysql_row = {
"id": 1,
"UnitName": "TestUnit",
"ToolNameID": "Tool1",
"NodeNum": 1,
"EventDate": "2024-01-01",
"EventTime": "12:00:00",
"BatLevel": 3.5,
"Temperature": 25.5,
"Val0": "100.5",
"Val1": None,
"Val2": "200.3",
"Val0_unitmisure": "°C",
"Val1_unitmisure": "bar",
"Val2_unitmisure": "m/s",
}
# Add remaining Val columns as None
for i in range(3, 16):
col = f"Val{i:X}" # Val3-ValF
mysql_row[col] = None
mysql_row[f"{col}_unitmisure"] = None
# Transform
pg_row = DataTransformer.transform_rawdatacor_row(mysql_row)
# Verify
assert pg_row["id"] == 1
assert pg_row["unit_name"] == "TestUnit"
assert pg_row["tool_name_id"] == "Tool1"
assert "node_num" not in pg_row # node_num should NOT be a column anymore
assert pg_row["event_timestamp"] is not None
assert pg_row["event_timestamp"].year == 2024
assert pg_row["event_timestamp"].month == 1
assert pg_row["event_timestamp"].day == 1
assert pg_row["event_timestamp"].hour == 12
assert pg_row["event_timestamp"].minute == 0
assert isinstance(pg_row["measurements"], dict)
# Verify node is a key in measurements JSONB (single node case)
assert "1" in pg_row["measurements"] # node number as key
assert "0" in pg_row["measurements"]["1"]
assert pg_row["measurements"]["1"]["0"]["value"] == "100.5"
assert pg_row["measurements"]["1"]["0"]["unit"] == "°C"
assert "1" not in pg_row["measurements"]["1"] # NULL values excluded
assert "2" in pg_row["measurements"]["1"]
assert pg_row["measurements"]["1"]["2"]["value"] == "200.3"
assert pg_row["measurements"]["1"]["2"]["unit"] == "m/s"
def test_elabdatadisp_transformation(self):
"""Test ELABDATADISP row transformation."""
# Sample MySQL row
mysql_row = {
"idElabData": 5000,
"UnitName": "TestUnit",
"ToolNameID": "Tool1",
"NodeNum": 1,
"EventDate": "2024-01-01",
"EventTime": "12:00:00",
"State": "OK",
"calcerr": 0,
"XShift": 1.234567,
"YShift": 2.345678,
"ZShift": 3.456789,
"HShift": 4.567890,
"HShiftDir": 5.678901,
"HShift_local": 6.789012,
"X": 10.123456,
"Y": 20.234567,
"Z": 30.345678,
"Xstar": 40.456789,
"Zstar": 50.567890,
"speed": 1.111111,
"speed_local": 2.222222,
"acceleration": 3.333333,
"acceleration_local": 4.444444,
"T_node": 25.5,
"load_value": 100.5,
"water_level": 50.5,
"pressure": 1.013,
"AlfaX": 0.123456,
"AlfaY": 0.234567,
"Area": 100.5,
}
# Transform
pg_row = DataTransformer.transform_elabdatadisp_row(mysql_row)
# Verify
assert pg_row["id_elab_data"] == 5000
assert pg_row["state"] == "OK"
assert pg_row["event_timestamp"] is not None
assert pg_row["event_timestamp"].year == 2024
assert pg_row["event_timestamp"].month == 1
assert pg_row["event_timestamp"].day == 1
assert pg_row["event_timestamp"].hour == 12
assert pg_row["event_timestamp"].minute == 0
assert isinstance(pg_row["measurements"], dict)
assert "shifts" in pg_row["measurements"]
assert "coordinates" in pg_row["measurements"]
assert "kinematics" in pg_row["measurements"]
assert pg_row["measurements"]["shifts"]["x"] == 1.234567
assert pg_row["measurements"]["coordinates"]["x"] == 10.123456
assert pg_row["measurements"]["kinematics"]["speed"] == 1.111111
def test_column_order_rawdatacor(self):
"""Test column order for RAWDATACOR."""
columns = DataTransformer.get_column_order("rawdatacor")
assert isinstance(columns, list)
assert "id" in columns
assert "measurements" in columns
assert "unit_name" in columns
def test_column_order_elabdatadisp(self):
"""Test column order for ELABDATADISP."""
columns = DataTransformer.get_column_order("elabdatadisp")
assert isinstance(columns, list)
assert "id_elab_data" in columns
assert "measurements" in columns
assert "state" in columns
class TestTimeConversion:
"""Test time conversion utilities."""
def test_convert_time_from_string(self):
"""Test converting time from string format."""
event_time = "12:30:45"
result = DataTransformer._convert_time(event_time)
assert isinstance(result, time)
assert result.hour == 12
assert result.minute == 30
assert result.second == 45
def test_convert_time_from_timedelta(self):
"""Test converting time from timedelta (MySQL TIME format)."""
# MySQL returns TIME columns as timedelta
event_time = timedelta(hours=14, minutes=25, seconds=30)
result = DataTransformer._convert_time(event_time)
assert isinstance(result, time)
assert result.hour == 14
assert result.minute == 25
assert result.second == 30
def test_convert_time_from_time_object(self):
"""Test converting time from time object."""
event_time = time(10, 15, 20)
result = DataTransformer._convert_time(event_time)
assert isinstance(result, time)
assert result.hour == 10
assert result.minute == 15
assert result.second == 20
def test_rawdatacor_with_timedelta(self):
"""Test RAWDATACOR transformation with timedelta event_time."""
mysql_row = {
"id": 1,
"UnitName": "TestUnit",
"ToolNameID": "Tool1",
"NodeNum": 1,
"EventDate": "2024-01-01",
"EventTime": timedelta(hours=12, minutes=0, seconds=0), # MySQL TIME format
"BatLevel": 3.5,
"Temperature": 25.5,
"Val0": "100.5",
"Val1": None,
"Val2": "200.3",
"Val0_unitmisure": "°C",
"Val1_unitmisure": "bar",
"Val2_unitmisure": "m/s",
}
# Add remaining Val columns as None
for i in range(3, 16):
col = f"Val{i:X}"
mysql_row[col] = None
mysql_row[f"{col}_unitmisure"] = None
pg_row = DataTransformer.transform_rawdatacor_row(mysql_row)
assert pg_row["event_timestamp"] is not None
assert pg_row["event_timestamp"].year == 2024
assert pg_row["event_timestamp"].month == 1
assert pg_row["event_timestamp"].day == 1
assert pg_row["event_timestamp"].hour == 12
assert pg_row["event_timestamp"].minute == 0
def test_rawdatacor_with_null_eventtime(self):
"""Test RAWDATACOR transformation with NULL EventTime uses default timestamp."""
mysql_row = {
"id": 2140982,
"UnitName": "OLD_ID0002",
"ToolNameID": "DT0001",
"NodeNum": 1,
"EventDate": "2023-09-05",
"EventTime": None, # NULL EventTime
"BatLevel": 12.90,
"Temperature": 13.40,
"Val0": "-1709",
"Val1": None,
"Val0_unitmisure": None,
"Val1_unitmisure": None,
}
# Add remaining Val columns as None
for i in range(2, 16):
col = f"Val{i:X}"
mysql_row[col] = None
mysql_row[f"{col}_unitmisure"] = None
pg_row = DataTransformer.transform_rawdatacor_row(mysql_row)
# Should use default timestamp 1970-01-01 00:00:00
assert pg_row["event_timestamp"] is not None
assert pg_row["event_timestamp"].year == 1970
assert pg_row["event_timestamp"].month == 1
assert pg_row["event_timestamp"].day == 1
assert pg_row["event_timestamp"].hour == 0
assert pg_row["event_timestamp"].minute == 0
# Verify that unit is NOT included when it's None (optimization)
assert "0" in pg_row["measurements"]
assert pg_row["measurements"]["0"]["value"] == "-1709"
assert "unit" not in pg_row["measurements"]["0"] # unit should not exist when None
def test_rawdatacor_consolidation(self):
"""Test consolidation of multiple nodes into single JSONB row."""
# Create three rows with same (unit, tool, timestamp) but different nodes
rows = [
{
"id": 1,
"UnitName": "TestUnit",
"ToolNameID": "Tool1",
"NodeNum": 1,
"EventDate": "2024-01-01",
"EventTime": "12:00:00",
"BatLevel": 3.5,
"Temperature": 25.5,
"Val0": "100.5",
"Val1": None,
"Val2": "200.3",
"Val0_unitmisure": "°C",
"Val1_unitmisure": None,
"Val2_unitmisure": "m/s",
},
{
"id": 2,
"UnitName": "TestUnit",
"ToolNameID": "Tool1",
"NodeNum": 2,
"EventDate": "2024-01-01",
"EventTime": "12:00:00",
"BatLevel": 3.5,
"Temperature": 25.5,
"Val0": "101.2",
"Val1": None,
"Val2": "205.1",
"Val0_unitmisure": "°C",
"Val1_unitmisure": None,
"Val2_unitmisure": "m/s",
},
{
"id": 3,
"UnitName": "TestUnit",
"ToolNameID": "Tool1",
"NodeNum": 3,
"EventDate": "2024-01-01",
"EventTime": "12:00:00",
"BatLevel": 3.5,
"Temperature": 25.5,
"Val0": "102.0",
"Val1": None,
"Val2": "210.5",
"Val0_unitmisure": "°C",
"Val1_unitmisure": None,
"Val2_unitmisure": "m/s",
},
]
# Add remaining Val columns as None for all rows
for row in rows:
for i in range(3, 16):
col = f"Val{i:X}"
row[col] = None
row[f"{col}_unitmisure"] = None
row["created_at"] = None
row["BatLevelModule"] = None
row["TemperatureModule"] = None
row["RssiModule"] = None
# Consolidate
consolidated = DataTransformer.consolidate_rawdatacor_batch(rows)
# Should have 1 consolidated row (all three nodes have same unit/tool/timestamp)
assert len(consolidated) == 1
consolidated_row = consolidated[0]
# Verify consolidated row properties
assert consolidated_row["id"] == 3 # MAX(id) for proper resume
assert consolidated_row["unit_name"] == "TestUnit"
assert consolidated_row["tool_name_id"] == "Tool1"
assert consolidated_row["bat_level"] == 3.5
assert consolidated_row["temperature"] == 25.5
# Verify all three nodes are in measurements
measurements = consolidated_row["measurements"]
assert "1" in measurements
assert "2" in measurements
assert "3" in measurements
# Verify node 1 measurements
assert measurements["1"]["0"]["value"] == "100.5"
assert measurements["1"]["0"]["unit"] == "°C"
assert measurements["1"]["2"]["value"] == "200.3"
# Verify node 2 measurements
assert measurements["2"]["0"]["value"] == "101.2"
assert measurements["2"]["2"]["value"] == "205.1"
# Verify node 3 measurements
assert measurements["3"]["0"]["value"] == "102.0"
assert measurements["3"]["2"]["value"] == "210.5"
def test_rawdatacor_consolidation_with_different_keys(self):
"""Test that rows with different keys are NOT consolidated."""
# Two rows with different units
rows = [
{
"id": 1,
"UnitName": "Unit1",
"ToolNameID": "Tool1",
"NodeNum": 1,
"EventDate": "2024-01-01",
"EventTime": "12:00:00",
"BatLevel": 3.5,
"Temperature": 25.5,
"Val0": "100.5",
"Val1": None,
"Val2": None,
"Val0_unitmisure": "°C",
"Val1_unitmisure": None,
"Val2_unitmisure": None,
},
{
"id": 2,
"UnitName": "Unit2", # Different unit
"ToolNameID": "Tool1",
"NodeNum": 1,
"EventDate": "2024-01-01",
"EventTime": "12:00:00",
"BatLevel": 3.5,
"Temperature": 25.5,
"Val0": "100.5",
"Val1": None,
"Val2": None,
"Val0_unitmisure": "°C",
"Val1_unitmisure": None,
"Val2_unitmisure": None,
},
]
# Add remaining Val columns as None for all rows
for row in rows:
for i in range(3, 16):
col = f"Val{i:X}"
row[col] = None
row[f"{col}_unitmisure"] = None
row["created_at"] = None
row["BatLevelModule"] = None
row["TemperatureModule"] = None
row["RssiModule"] = None
# Consolidate
consolidated = DataTransformer.consolidate_rawdatacor_batch(rows)
# Should have 2 rows (different units)
assert len(consolidated) == 2
assert consolidated[0]["unit_name"] == "Unit1"
assert consolidated[1]["unit_name"] == "Unit2"
class TestFieldMapping:
"""Test field mapping configuration."""
def test_all_rawdatacor_columns_mapped(self):
"""Test that all RAWDATACOR value columns are defined."""
for val_col in RAWDATACOR_COLUMNS["val_columns"]:
assert val_col.startswith("Val")
def test_all_elabdatadisp_fields_mapped(self):
"""Test that all ELABDATADISP fields are mapped."""
mapped_fields = set(ELABDATADISP_FIELD_MAPPING.keys())
assert len(mapped_fields) > 20 # Should have many fields
if __name__ == "__main__":
pytest.main([__file__, "-v"])