Files
matlab-python/src/validation/db_extractor.py
alex 23c53cf747 Add comprehensive validation system and migrate to .env configuration
This commit includes:

1. Database Configuration Migration:
   - Migrated from DB.txt (Java JDBC) to .env (python-dotenv)
   - Added .env.example template with clear variable names
   - Updated database.py to use environment variables
   - Added python-dotenv>=1.0.0 to dependencies
   - Updated .gitignore to exclude sensitive files

2. Validation System (1,294 lines):
   - comparator.py: Statistical comparison with RMSE, correlation, tolerances
   - db_extractor.py: Database queries for all sensor types
   - validator.py: High-level validation orchestration
   - cli.py: Command-line interface for validation
   - README.md: Comprehensive validation documentation

3. Validation Features:
   - Compare Python vs MATLAB outputs from database
   - Support for all sensor types (RSN, Tilt, ATD)
   - Statistical metrics: max abs/rel diff, RMSE, correlation
   - Configurable tolerances (abs, rel, max)
   - Detailed validation reports
   - CLI and programmatic APIs

4. Examples and Documentation:
   - validate_example.sh: Bash script example
   - validate_example.py: Python programmatic example
   - Updated main README with validation section
   - Added validation workflow and troubleshooting guide

Benefits:
-  No Java driver needed (native Python connectors)
-  Secure .env configuration (excluded from git)
-  Comprehensive validation against MATLAB
-  Statistical confidence in migration accuracy
-  Automated validation reports

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 15:34:13 +02:00

418 lines
15 KiB
Python

"""
Database extraction utilities for validation.
Extracts processed data from database tables for Python vs MATLAB comparison.
"""
import numpy as np
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime
import logging
from ..common.database import DatabaseConnection
logger = logging.getLogger(__name__)
class DataExtractor:
"""Extract processed data from database for validation."""
def __init__(self, conn: DatabaseConnection):
"""
Initialize extractor with database connection.
Args:
conn: DatabaseConnection instance
"""
self.conn = conn
def extract_rsn_data(self,
control_unit_id: str,
chain: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract RSN elaborated data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
start_date: Optional start date filter (YYYY-MM-DD)
end_date: Optional end date filter (YYYY-MM-DD)
Returns:
List of dictionaries with RSN data
"""
query = """
SELECT
UnitName, ToolNameID, NodeNum, EventDate, EventTime,
SensorType, RollAngle, InclinAngle, AzimuthAngle,
RollAngleDiff, InclinAngleDiff, AzimuthAngleDiff,
T_node, calcerr
FROM ELABDATARSN
WHERE UnitName = %s AND ToolNameID = %s
"""
params = [control_unit_id, chain]
if start_date:
query += " AND EventDate >= %s"
params.append(start_date)
if end_date:
query += " AND EventDate <= %s"
params.append(end_date)
query += " ORDER BY EventDate, EventTime, NodeNum"
results = self.conn.execute_query(query, tuple(params))
logger.info(f"Extracted {len(results)} RSN records for {control_unit_id}/{chain}")
return results
def extract_tilt_data(self,
control_unit_id: str,
chain: str,
sensor_type: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract Tilt elaborated data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
sensor_type: Sensor type (TLHR, BL, PL, KLHR)
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of dictionaries with Tilt data
"""
query = """
SELECT
UnitName, ToolNameID, NodeNum, EventDate, EventTime,
SensorType, X, Y, Z, X_local, Y_local, Z_local,
XShift, YShift, ZShift, T_node, calcerr
FROM ELABDATATILT
WHERE UnitName = %s AND ToolNameID = %s AND SensorType = %s
"""
params = [control_unit_id, chain, sensor_type]
if start_date:
query += " AND EventDate >= %s"
params.append(start_date)
if end_date:
query += " AND EventDate <= %s"
params.append(end_date)
query += " ORDER BY EventDate, EventTime, NodeNum"
results = self.conn.execute_query(query, tuple(params))
logger.info(f"Extracted {len(results)} Tilt {sensor_type} records for {control_unit_id}/{chain}")
return results
def extract_atd_radial_link_data(self,
control_unit_id: str,
chain: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract ATD Radial Link (RL) elaborated data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of dictionaries with RL data
"""
query = """
SELECT
UnitName, ToolNameID, NodeNum, EventDate, EventTime,
X, Y, Z, X_local, Y_local, Z_local,
XShift, YShift, ZShift, T_node, calcerr
FROM ELABDATARL
WHERE UnitName = %s AND ToolNameID = %s
"""
params = [control_unit_id, chain]
if start_date:
query += " AND EventDate >= %s"
params.append(start_date)
if end_date:
query += " AND EventDate <= %s"
params.append(end_date)
query += " ORDER BY EventDate, EventTime, NodeNum"
results = self.conn.execute_query(query, tuple(params))
logger.info(f"Extracted {len(results)} RL records for {control_unit_id}/{chain}")
return results
def extract_atd_load_link_data(self,
control_unit_id: str,
chain: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract ATD Load Link (LL) elaborated data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of dictionaries with LL data
"""
query = """
SELECT
UnitName, ToolNameID, NodeNum, EventDate, EventTime,
Load, LoadDiff, T_node, calcerr
FROM ELABDATALL
WHERE UnitName = %s AND ToolNameID = %s
"""
params = [control_unit_id, chain]
if start_date:
query += " AND EventDate >= %s"
params.append(start_date)
if end_date:
query += " AND EventDate <= %s"
params.append(end_date)
query += " ORDER BY EventDate, EventTime, NodeNum"
results = self.conn.execute_query(query, tuple(params))
logger.info(f"Extracted {len(results)} LL records for {control_unit_id}/{chain}")
return results
def extract_atd_pressure_link_data(self,
control_unit_id: str,
chain: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract ATD Pressure Link (PL) elaborated data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of dictionaries with PL data
"""
query = """
SELECT
UnitName, ToolNameID, NodeNum, EventDate, EventTime,
Pressure, PressureDiff, T_node, calcerr
FROM ELABDATAPL
WHERE UnitName = %s AND ToolNameID = %s
"""
params = [control_unit_id, chain]
if start_date:
query += " AND EventDate >= %s"
params.append(start_date)
if end_date:
query += " AND EventDate <= %s"
params.append(end_date)
query += " ORDER BY EventDate, EventTime, NodeNum"
results = self.conn.execute_query(query, tuple(params))
logger.info(f"Extracted {len(results)} PL records for {control_unit_id}/{chain}")
return results
def extract_atd_extensometer_3d_data(self,
control_unit_id: str,
chain: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract ATD 3D Extensometer (3DEL) elaborated data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of dictionaries with 3DEL data
"""
query = """
SELECT
UnitName, ToolNameID, NodeNum, EventDate, EventTime,
X, Y, Z, XShift, YShift, ZShift, T_node, calcerr
FROM ELABDATA3DEL
WHERE UnitName = %s AND ToolNameID = %s
"""
params = [control_unit_id, chain]
if start_date:
query += " AND EventDate >= %s"
params.append(start_date)
if end_date:
query += " AND EventDate <= %s"
params.append(end_date)
query += " ORDER BY EventDate, EventTime, NodeNum"
results = self.conn.execute_query(query, tuple(params))
logger.info(f"Extracted {len(results)} 3DEL records for {control_unit_id}/{chain}")
return results
def extract_atd_crackmeter_data(self,
control_unit_id: str,
chain: str,
sensor_type: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract ATD Crackmeter (CrL/2DCrL/3DCrL) elaborated data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
sensor_type: Sensor type (CrL, 2DCrL, 3DCrL)
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of dictionaries with crackmeter data
"""
query = """
SELECT
UnitName, ToolNameID, NodeNum, EventDate, EventTime,
SensorType, X, Y, Z, XShift, YShift, ZShift, T_node, calcerr
FROM ELABDATACRL
WHERE UnitName = %s AND ToolNameID = %s AND SensorType = %s
"""
params = [control_unit_id, chain, sensor_type]
if start_date:
query += " AND EventDate >= %s"
params.append(start_date)
if end_date:
query += " AND EventDate <= %s"
params.append(end_date)
query += " ORDER BY EventDate, EventTime, NodeNum"
results = self.conn.execute_query(query, tuple(params))
logger.info(f"Extracted {len(results)} {sensor_type} records for {control_unit_id}/{chain}")
return results
def extract_atd_pcl_data(self,
control_unit_id: str,
chain: str,
sensor_type: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract ATD Perimeter Cable Link (PCL/PCLHR) elaborated data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
sensor_type: Sensor type (PCL, PCLHR)
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of dictionaries with PCL data
"""
query = """
SELECT
UnitName, ToolNameID, NodeNum, EventDate, EventTime,
SensorType, Y, Z, Y_local, Z_local,
AlphaX, AlphaY, YShift, ZShift, T_node, calcerr
FROM ELABDATAPCL
WHERE UnitName = %s AND ToolNameID = %s AND SensorType = %s
"""
params = [control_unit_id, chain, sensor_type]
if start_date:
query += " AND EventDate >= %s"
params.append(start_date)
if end_date:
query += " AND EventDate <= %s"
params.append(end_date)
query += " ORDER BY EventDate, EventTime, NodeNum"
results = self.conn.execute_query(query, tuple(params))
logger.info(f"Extracted {len(results)} {sensor_type} records for {control_unit_id}/{chain}")
return results
def extract_atd_tube_link_data(self,
control_unit_id: str,
chain: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Extract ATD Tube Link (TuL) elaborated data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
List of dictionaries with TuL data
"""
query = """
SELECT
UnitName, ToolNameID, NodeNum, EventDate, EventTime,
X, Y, Z, X_Star, Y_Star, Z_Star,
XShift, YShift, ZShift, T_node, calcerr
FROM ELABDATATUBE
WHERE UnitName = %s AND ToolNameID = %s
"""
params = [control_unit_id, chain]
if start_date:
query += " AND EventDate >= %s"
params.append(start_date)
if end_date:
query += " AND EventDate <= %s"
params.append(end_date)
query += " ORDER BY EventDate, EventTime, NodeNum"
results = self.conn.execute_query(query, tuple(params))
logger.info(f"Extracted {len(results)} TuL records for {control_unit_id}/{chain}")
return results
def get_latest_timestamp(self,
table: str,
control_unit_id: str,
chain: str) -> Optional[Tuple[str, str]]:
"""
Get the latest timestamp (date, time) for a given table and chain.
Args:
table: Table name (e.g., 'ELABDATARSN')
control_unit_id: Control unit identifier
chain: Chain identifier
Returns:
Tuple of (date, time) or None if no data
"""
query = f"""
SELECT EventDate, EventTime
FROM {table}
WHERE UnitName = %s AND ToolNameID = %s
ORDER BY EventDate DESC, EventTime DESC
LIMIT 1
"""
results = self.conn.execute_query(query, (control_unit_id, chain))
if results:
return (results[0]['EventDate'], results[0]['EventTime'])
return None