Add: - QUICKSTART.md: 5-minute quick start guide with examples - scripts/incus_setup.sh: Automated PostgreSQL container setup - scripts/validate_migration.sql: SQL validation queries - scripts/setup_cron.sh: Cron job setup for incremental migrations - tests/test_setup.py: Unit tests for configuration and transformation - install.sh: Quick installation script Documentation includes: - Step-by-step setup instructions - Example queries for RAWDATACOR and ELABDATADISP - Troubleshooting guide - Performance optimization tips 🤖 Generated with Claude Code Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
154 lines
5.2 KiB
Python
154 lines
5.2 KiB
Python
"""Test setup and basic functionality."""
|
|
import pytest
|
|
from config import get_settings, TABLE_CONFIGS, RAWDATACOR_COLUMNS, ELABDATADISP_FIELD_MAPPING
|
|
from src.transformers.data_transformer import DataTransformer
|
|
|
|
|
|
class TestConfiguration:
|
|
"""Test configuration loading."""
|
|
|
|
def test_settings_loaded(self):
|
|
"""Test that settings can be loaded."""
|
|
settings = get_settings()
|
|
assert settings is not None
|
|
assert settings.mysql.host is not None
|
|
assert settings.postgres.host is not None
|
|
|
|
def test_table_configs_exist(self):
|
|
"""Test that table configurations exist."""
|
|
assert "RAWDATACOR" in TABLE_CONFIGS or len(TABLE_CONFIGS) > 0
|
|
|
|
def test_migration_batch_size(self):
|
|
"""Test that batch size is configured."""
|
|
settings = get_settings()
|
|
assert settings.migration.batch_size > 0
|
|
assert settings.migration.batch_size <= 1000000
|
|
|
|
|
|
class TestDataTransformation:
|
|
"""Test data transformation logic."""
|
|
|
|
def test_rawdatacor_transformation(self):
|
|
"""Test RAWDATACOR row transformation."""
|
|
# Sample MySQL row
|
|
mysql_row = {
|
|
"id": 1,
|
|
"UnitName": "TestUnit",
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 1,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": "12:00:00",
|
|
"BatLevel": 3.5,
|
|
"Temperature": 25.5,
|
|
"Val0": "100.5",
|
|
"Val1": None,
|
|
"Val2": "200.3",
|
|
"Val0_unitmisure": "°C",
|
|
"Val1_unitmisure": "bar",
|
|
"Val2_unitmisure": "m/s",
|
|
}
|
|
|
|
# Add remaining Val columns as None
|
|
for i in range(3, 16):
|
|
col = f"Val{i:X}" # Val3-ValF
|
|
mysql_row[col] = None
|
|
mysql_row[f"{col}_unitmisure"] = None
|
|
|
|
# Transform
|
|
pg_row = DataTransformer.transform_rawdatacor_row(mysql_row)
|
|
|
|
# Verify
|
|
assert pg_row["id"] == 1
|
|
assert pg_row["unit_name"] == "TestUnit"
|
|
assert pg_row["tool_name_id"] == "Tool1"
|
|
assert isinstance(pg_row["measurements"], dict)
|
|
assert "0" in pg_row["measurements"]
|
|
assert pg_row["measurements"]["0"]["value"] == "100.5"
|
|
assert pg_row["measurements"]["0"]["unit"] == "°C"
|
|
assert "1" not in pg_row["measurements"] # NULL values excluded
|
|
assert "2" in pg_row["measurements"]
|
|
|
|
def test_elabdatadisp_transformation(self):
|
|
"""Test ELABDATADISP row transformation."""
|
|
# Sample MySQL row
|
|
mysql_row = {
|
|
"idElabData": 5000,
|
|
"UnitName": "TestUnit",
|
|
"ToolNameID": "Tool1",
|
|
"NodeNum": 1,
|
|
"EventDate": "2024-01-01",
|
|
"EventTime": "12:00:00",
|
|
"State": "OK",
|
|
"calcerr": 0,
|
|
"XShift": 1.234567,
|
|
"YShift": 2.345678,
|
|
"ZShift": 3.456789,
|
|
"HShift": 4.567890,
|
|
"HShiftDir": 5.678901,
|
|
"HShift_local": 6.789012,
|
|
"X": 10.123456,
|
|
"Y": 20.234567,
|
|
"Z": 30.345678,
|
|
"Xstar": 40.456789,
|
|
"Zstar": 50.567890,
|
|
"speed": 1.111111,
|
|
"speed_local": 2.222222,
|
|
"acceleration": 3.333333,
|
|
"acceleration_local": 4.444444,
|
|
"T_node": 25.5,
|
|
"load_value": 100.5,
|
|
"water_level": 50.5,
|
|
"pressure": 1.013,
|
|
"AlfaX": 0.123456,
|
|
"AlfaY": 0.234567,
|
|
"Area": 100.5,
|
|
}
|
|
|
|
# Transform
|
|
pg_row = DataTransformer.transform_elabdatadisp_row(mysql_row)
|
|
|
|
# Verify
|
|
assert pg_row["id_elab_data"] == 5000
|
|
assert pg_row["state"] == "OK"
|
|
assert isinstance(pg_row["measurements"], dict)
|
|
assert "shifts" in pg_row["measurements"]
|
|
assert "coordinates" in pg_row["measurements"]
|
|
assert "kinematics" in pg_row["measurements"]
|
|
assert pg_row["measurements"]["shifts"]["x"] == 1.234567
|
|
assert pg_row["measurements"]["coordinates"]["x"] == 10.123456
|
|
assert pg_row["measurements"]["kinematics"]["speed"] == 1.111111
|
|
|
|
def test_column_order_rawdatacor(self):
|
|
"""Test column order for RAWDATACOR."""
|
|
columns = DataTransformer.get_column_order("rawdatacor")
|
|
assert isinstance(columns, list)
|
|
assert "id" in columns
|
|
assert "measurements" in columns
|
|
assert "unit_name" in columns
|
|
|
|
def test_column_order_elabdatadisp(self):
|
|
"""Test column order for ELABDATADISP."""
|
|
columns = DataTransformer.get_column_order("elabdatadisp")
|
|
assert isinstance(columns, list)
|
|
assert "id_elab_data" in columns
|
|
assert "measurements" in columns
|
|
assert "state" in columns
|
|
|
|
|
|
class TestFieldMapping:
|
|
"""Test field mapping configuration."""
|
|
|
|
def test_all_rawdatacor_columns_mapped(self):
|
|
"""Test that all RAWDATACOR value columns are defined."""
|
|
for val_col in RAWDATACOR_COLUMNS["val_columns"]:
|
|
assert val_col.startswith("Val")
|
|
|
|
def test_all_elabdatadisp_fields_mapped(self):
|
|
"""Test that all ELABDATADISP fields are mapped."""
|
|
mapped_fields = set(ELABDATADISP_FIELD_MAPPING.keys())
|
|
assert len(mapped_fields) > 20 # Should have many fields
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|