primo commit refactory in python

This commit is contained in:
2025-10-12 20:16:19 +02:00
parent 3288b60385
commit 876ef073fc
41 changed files with 7811 additions and 6 deletions

0
src/tilt/__init__.py Normal file
View File

290
src/tilt/averaging.py Normal file
View File

@@ -0,0 +1,290 @@
"""
Data averaging functions for Tilt sensors.
Applies smoothing and averaging over time windows.
"""
import numpy as np
import logging
from typing import Tuple
from scipy.ndimage import gaussian_filter1d
logger = logging.getLogger(__name__)
def average_tilt_link_hr_data(
angle_data: np.ndarray,
timestamps: np.ndarray,
temperature: np.ndarray,
n_points: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Average Tilt Link HR data using Gaussian smoothing.
Converts MATLAB MediaDati_TLHR.m function.
Args:
angle_data: Angle data array
timestamps: Array of timestamps
temperature: Temperature data array
n_points: Window size for smoothing
Returns:
Tuple of (smoothed_angles, timestamps, temperatures)
"""
logger.info(f"Averaging Tilt Link HR data with window size {n_points}")
n_timestamps = len(angle_data)
if n_points > n_timestamps:
logger.warning(f"Window size {n_points} > data length {n_timestamps}, using data length")
n_points = n_timestamps
# Apply Gaussian smoothing along time axis (axis=0)
# Equivalent to MATLAB's smoothdata(data,'gaussian',n_points)
sigma = n_points / 6.0 # Approximate conversion to Gaussian sigma
angles_smoothed = np.zeros_like(angle_data)
for i in range(angle_data.shape[1]):
angles_smoothed[:, i] = gaussian_filter1d(angle_data[:, i], sigma=sigma, axis=0)
# Temperature is not averaged (keep as is for filter application)
temp_out = temperature.copy()
logger.info(f"Applied Gaussian smoothing with sigma={sigma:.2f}")
return angles_smoothed, timestamps, temp_out
def average_tilt_link_data(
acceleration: np.ndarray,
timestamps: np.ndarray,
temperature: np.ndarray,
n_points: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Average Tilt Link data using moving average or Gaussian smoothing.
Args:
acceleration: Acceleration data array
timestamps: Array of timestamps
temperature: Temperature data array
n_points: Window size for averaging
Returns:
Tuple of (averaged_acceleration, timestamps, temperatures)
"""
logger.info(f"Averaging Tilt Link data with window size {n_points}")
if len(acceleration) < n_points:
logger.warning(f"Not enough data points for averaging")
return acceleration, timestamps, temperature
# Apply Gaussian smoothing
sigma = n_points / 6.0
acc_smoothed = np.zeros_like(acceleration)
for i in range(acceleration.shape[1]):
acc_smoothed[:, i] = gaussian_filter1d(acceleration[:, i], sigma=sigma, axis=0)
return acc_smoothed, timestamps, temperature
def average_biaxial_link_data(
data: np.ndarray,
timestamps: np.ndarray,
temperature: np.ndarray,
n_points: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Average Biaxial Link data.
Args:
data: Sensor data array
timestamps: Array of timestamps
temperature: Temperature data array
n_points: Window size for averaging
Returns:
Tuple of (averaged_data, timestamps, temperatures)
"""
logger.info(f"Averaging Biaxial Link data with window size {n_points}")
if len(data) < n_points:
return data, timestamps, temperature
sigma = n_points / 6.0
data_smoothed = np.zeros_like(data)
for i in range(data.shape[1]):
data_smoothed[:, i] = gaussian_filter1d(data[:, i], sigma=sigma, axis=0)
return data_smoothed, timestamps, temperature
def average_pendulum_link_data(
data: np.ndarray,
timestamps: np.ndarray,
temperature: np.ndarray,
n_points: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Average Pendulum Link data.
Converts MATLAB MediaDati_PL.m function.
Args:
data: Sensor data array
timestamps: Array of timestamps
temperature: Temperature data array
n_points: Window size for averaging
Returns:
Tuple of (averaged_data, timestamps, temperatures)
"""
logger.info(f"Averaging Pendulum Link data with window size {n_points}")
if len(data) < n_points:
return data, timestamps, temperature
sigma = n_points / 6.0
data_smoothed = np.zeros_like(data)
for i in range(data.shape[1]):
data_smoothed[:, i] = gaussian_filter1d(data[:, i], sigma=sigma, axis=0)
# Also smooth temperature for Pendulum Link
temp_smoothed = np.zeros_like(temperature)
for i in range(temperature.shape[1]):
temp_smoothed[:, i] = gaussian_filter1d(temperature[:, i], sigma=sigma, axis=0)
return data_smoothed, timestamps, temp_smoothed
def average_kessler_link_data(
data: np.ndarray,
timestamps: np.ndarray,
temperature: np.ndarray,
n_points: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Average Kessler Link data.
Converts MATLAB MediaDati_KLHR.m function.
Args:
data: Sensor data array
timestamps: Array of timestamps
temperature: Temperature data array
n_points: Window size for averaging
Returns:
Tuple of (averaged_data, timestamps, temperatures)
"""
logger.info(f"Averaging Kessler Link data with window size {n_points}")
if len(data) < n_points:
return data, timestamps, temperature
sigma = n_points / 6.0
data_smoothed = np.zeros_like(data)
for i in range(data.shape[1]):
data_smoothed[:, i] = gaussian_filter1d(data[:, i], sigma=sigma, axis=0)
return data_smoothed, timestamps, temperature
def average_radial_link_data(
data: np.ndarray,
timestamps: np.ndarray,
temperature: np.ndarray,
n_points: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Average Radial Link data.
Converts MATLAB MediaDati_RL.m function.
Args:
data: Sensor data array
timestamps: Array of timestamps
temperature: Temperature data array
n_points: Window size for averaging
Returns:
Tuple of (averaged_data, timestamps, temperatures)
"""
logger.info(f"Averaging Radial Link data with window size {n_points}")
if len(data) < n_points:
return data, timestamps, temperature
sigma = n_points / 6.0
data_smoothed = np.zeros_like(data)
for i in range(data.shape[1]):
data_smoothed[:, i] = gaussian_filter1d(data[:, i], sigma=sigma, axis=0)
return data_smoothed, timestamps, temperature
def average_linear_link_data(
data: np.ndarray,
timestamps: np.ndarray,
n_points: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Average Linear Link data.
Converts MATLAB MediaDati_LL.m function.
Args:
data: Sensor data array
timestamps: Array of timestamps
n_points: Window size for averaging
Returns:
Tuple of (averaged_data, timestamps)
"""
logger.info(f"Averaging Linear Link data with window size {n_points}")
if len(data) < n_points:
return data, timestamps
sigma = n_points / 6.0
data_smoothed = np.zeros_like(data)
for i in range(data.shape[1]):
data_smoothed[:, i] = gaussian_filter1d(data[:, i], sigma=sigma, axis=0)
return data_smoothed, timestamps
def average_temperature_data(
temperature: np.ndarray,
n_points: int
) -> np.ndarray:
"""
Average temperature data using Gaussian smoothing.
Args:
temperature: Temperature data array
n_points: Window size for averaging
Returns:
Smoothed temperature array
"""
logger.info(f"Averaging temperature data with window size {n_points}")
if len(temperature) < n_points:
return temperature
sigma = n_points / 6.0
temp_smoothed = np.zeros_like(temperature)
for i in range(temperature.shape[1]):
temp_smoothed[:, i] = gaussian_filter1d(temperature[:, i], sigma=sigma, axis=0)
return temp_smoothed

322
src/tilt/conversion.py Normal file
View File

@@ -0,0 +1,322 @@
"""
Data conversion functions for Tilt sensors.
Converts raw sensor data to physical units (angles, temperatures).
"""
import numpy as np
import logging
from typing import Tuple
logger = logging.getLogger(__name__)
def convert_tilt_link_hr_data(
angle_data: np.ndarray,
temperature: np.ndarray,
calibration_data: np.ndarray,
n_sensors: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert raw Tilt Link HR data to physical units (angles in degrees).
Converts MATLAB conv_grezziTLHR.m function.
Args:
angle_data: Raw angle data (ADC counts)
temperature: Raw temperature data
calibration_data: Calibration coefficients
If column 4 == 0: XY gain is common
Column 1: gain XY
Column 2: gain temp
Column 3: offset temp
Else: separate XY gains
Column 1: gain X
Column 2: gain Y
Column 3: gain temp
Column 4: offset temp
n_sensors: Number of sensors
Returns:
Tuple of (converted_angles, converted_temperature)
"""
logger.info(f"Converting Tilt Link HR data for {n_sensors} sensors")
n_timestamps = angle_data.shape[0]
angle_converted = angle_data.copy()
temp_converted = temperature.copy()
# Check if XY gains are common or separate
if len(calibration_data.shape) == 1 or calibration_data.shape[1] < 4:
# Simple case: single calibration set
xy_common = True
gain_xy = calibration_data[0] if len(calibration_data) > 0 else 1.0
gain_temp = calibration_data[1] if len(calibration_data) > 1 else 1.0
offset_temp = calibration_data[2] if len(calibration_data) > 2 else 0.0
else:
# Check column 4 (index 3)
if np.all(calibration_data[:, 3] == 0):
# XY gains are common
xy_common = True
gain_angles = calibration_data[:, 0] # Common gain for both axes
gain_temp = calibration_data[:, 1]
offset_temp = calibration_data[:, 2]
else:
# Separate XY gains
xy_common = False
gain_x = calibration_data[:, 0]
gain_y = calibration_data[:, 1]
gain_temp = calibration_data[:, 2]
offset_temp = calibration_data[:, 3]
# Convert angles
if xy_common:
# Common gain for X and Y
for i in range(n_sensors):
gain = gain_angles[i] if hasattr(gain_angles, '__len__') else gain_xy
angle_converted[:, i * 2] = angle_data[:, i * 2] * gain # X
angle_converted[:, i * 2 + 1] = angle_data[:, i * 2 + 1] * gain # Y
else:
# Separate gains for X and Y
for i in range(n_sensors):
angle_converted[:, i * 2] = angle_data[:, i * 2] * gain_x[i] # X
angle_converted[:, i * 2 + 1] = angle_data[:, i * 2 + 1] * gain_y[i] # Y
# Convert temperatures
for i in range(n_sensors):
g_temp = gain_temp[i] if hasattr(gain_temp, '__len__') else gain_temp
off_temp = offset_temp[i] if hasattr(offset_temp, '__len__') else offset_temp
temp_converted[:, i] = temperature[:, i] * g_temp + off_temp
logger.info("Tilt Link HR data conversion completed")
return angle_converted, temp_converted
def convert_tilt_link_data(
acceleration: np.ndarray,
temperature: np.ndarray,
calibration_data: np.ndarray,
n_sensors: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert raw Tilt Link data to physical units (acceleration in g).
Similar to RSN conversion but for standard Tilt Link sensors.
Args:
acceleration: Raw acceleration data
temperature: Raw temperature data
calibration_data: Calibration coefficients for each sensor
n_sensors: Number of sensors
Returns:
Tuple of (converted_acceleration, converted_temperature)
"""
logger.info(f"Converting Tilt Link data for {n_sensors} sensors")
n_timestamps = acceleration.shape[0]
acc_converted = np.zeros_like(acceleration)
temp_converted = np.zeros_like(temperature)
for i in range(n_sensors):
cal = calibration_data[i]
# Acceleration conversion (typically 2 or 3 axes)
# Assume biaxial for Tilt Link
acc_converted[:, i * 2] = cal[0] * acceleration[:, i * 2] + cal[1] # X
acc_converted[:, i * 2 + 1] = cal[2] * acceleration[:, i * 2 + 1] + cal[3] # Y
# Temperature conversion
if len(cal) > 4:
temp_converted[:, i] = cal[4] * temperature[:, i] + cal[5]
else:
temp_converted[:, i] = temperature[:, i]
logger.info("Tilt Link data conversion completed")
return acc_converted, temp_converted
def convert_biaxial_link_data(
raw_data: np.ndarray,
temperature: np.ndarray,
calibration_data: np.ndarray,
n_sensors: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert raw Biaxial Link (BL) data to physical units.
Converts MATLAB conv_grezziBL.m function.
Args:
raw_data: Raw sensor data
temperature: Raw temperature data
calibration_data: Calibration coefficients
n_sensors: Number of sensors
Returns:
Tuple of (converted_data, converted_temperature)
"""
logger.info(f"Converting Biaxial Link data for {n_sensors} sensors")
data_converted = np.zeros_like(raw_data)
temp_converted = np.zeros_like(temperature)
for i in range(n_sensors):
cal = calibration_data[i]
# Biaxial: 2 axes per sensor
data_converted[:, i * 2] = cal[0] * raw_data[:, i * 2] + cal[1]
data_converted[:, i * 2 + 1] = cal[2] * raw_data[:, i * 2 + 1] + cal[3]
# Temperature
if len(cal) > 4:
temp_converted[:, i] = cal[4] * temperature[:, i] + cal[5]
else:
temp_converted[:, i] = temperature[:, i]
logger.info("Biaxial Link data conversion completed")
return data_converted, temp_converted
def convert_pendulum_link_data(
raw_data: np.ndarray,
temperature: np.ndarray,
calibration_data: np.ndarray,
n_sensors: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert raw Pendulum Link (PL) data to physical units.
Args:
raw_data: Raw sensor data
temperature: Raw temperature data
calibration_data: Calibration coefficients
n_sensors: Number of sensors
Returns:
Tuple of (converted_data, converted_temperature)
"""
logger.info(f"Converting Pendulum Link data for {n_sensors} sensors")
data_converted = np.zeros_like(raw_data)
temp_converted = np.zeros_like(temperature)
for i in range(n_sensors):
cal = calibration_data[i]
# Pendulum typically has 2 axes
data_converted[:, i * 2] = cal[0] * raw_data[:, i * 2] + cal[1]
data_converted[:, i * 2 + 1] = cal[2] * raw_data[:, i * 2 + 1] + cal[3]
# Temperature
if len(cal) > 4:
temp_converted[:, i] = cal[4] * temperature[:, i] + cal[5]
else:
temp_converted[:, i] = temperature[:, i]
logger.info("Pendulum Link data conversion completed")
return data_converted, temp_converted
def convert_kessler_link_data(
raw_data: np.ndarray,
temperature: np.ndarray,
calibration_data: np.ndarray,
n_sensors: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert raw Kessler Link (KL/KLHR) data to physical units.
Converts MATLAB conv_grezziKLHR.m function.
Args:
raw_data: Raw sensor data
temperature: Raw temperature data
calibration_data: Calibration coefficients
n_sensors: Number of sensors
Returns:
Tuple of (converted_data, converted_temperature)
"""
logger.info(f"Converting Kessler Link data for {n_sensors} sensors")
data_converted = np.zeros_like(raw_data)
temp_converted = np.zeros_like(temperature)
for i in range(n_sensors):
cal = calibration_data[i]
# Kessler biaxial inclinometer
data_converted[:, i * 2] = cal[0] * raw_data[:, i * 2] + cal[1]
data_converted[:, i * 2 + 1] = cal[2] * raw_data[:, i * 2 + 1] + cal[3]
# Temperature
if len(cal) > 4:
temp_converted[:, i] = cal[4] * temperature[:, i] + cal[5]
else:
temp_converted[:, i] = temperature[:, i]
logger.info("Kessler Link data conversion completed")
return data_converted, temp_converted
def convert_thermistor_data(
raw_data: np.ndarray,
calibration_data: np.ndarray,
n_sensors: int
) -> np.ndarray:
"""
Convert raw thermistor (ThL) data to temperature in Celsius.
Converts MATLAB conv_grezziThL.m function.
Args:
raw_data: Raw ADC values
calibration_data: Calibration coefficients (gain, offset)
n_sensors: Number of sensors
Returns:
Converted temperature array
"""
logger.info(f"Converting Thermistor data for {n_sensors} sensors")
temp_converted = np.zeros_like(raw_data)
for i in range(n_sensors):
cal = calibration_data[i]
# Linear conversion: T = gain * ADC + offset
temp_converted[:, i] = cal[0] * raw_data[:, i] + cal[1]
logger.info("Thermistor data conversion completed")
return temp_converted
def convert_pt100_data(
raw_data: np.ndarray,
calibration_data: np.ndarray,
n_sensors: int
) -> np.ndarray:
"""
Convert raw PT100 sensor data to temperature in Celsius.
Converts MATLAB conv_grezziPT100.m function.
Args:
raw_data: Raw resistance or ADC values
calibration_data: Calibration coefficients
n_sensors: Number of sensors
Returns:
Converted temperature array
"""
logger.info(f"Converting PT100 data for {n_sensors} sensors")
temp_converted = np.zeros_like(raw_data)
for i in range(n_sensors):
cal = calibration_data[i]
# PT100 typically linear: T = gain * R + offset
temp_converted[:, i] = cal[0] * raw_data[:, i] + cal[1]
logger.info("PT100 data conversion completed")
return temp_converted

461
src/tilt/data_processing.py Normal file
View File

@@ -0,0 +1,461 @@
"""
Data loading and processing functions for Tilt sensors.
Handles loading raw data from database and initial data structuring.
"""
import numpy as np
import logging
from typing import Dict, Any, Tuple, List
from datetime import datetime
from scipy.signal import medfilt
from ..common.database import DatabaseConnection
logger = logging.getLogger(__name__)
def load_tilt_link_hr_data(
conn: DatabaseConnection,
control_unit_id: str,
chain: str,
initial_date: str,
initial_time: str,
node_list: list
) -> Dict[str, Any]:
"""
Load Tilt Link HR raw data from database.
Args:
conn: Database connection
control_unit_id: Control unit identifier
chain: Chain identifier
initial_date: Starting date
initial_time: Starting time
node_list: List of node numbers
Returns:
Dictionary with timestamps, angle values, temperatures, and control data
"""
node_type = 'Tilt Link HR V'
# Get timestamps from first node
first_node = node_list[0]
timestamp_query = """
SELECT Date, Time
FROM RawDataView
WHERE UnitName = %s
AND ToolNameID = %s
AND NodeType = %s
AND NodeNum = %s
AND (
(Date = %s AND Time >= %s) OR
(Date > %s)
)
ORDER BY Date, Time
"""
timestamp_results = conn.execute_query(
timestamp_query,
(control_unit_id, chain, node_type, str(first_node),
initial_date, initial_time, initial_date)
)
if not timestamp_results:
logger.warning("No Tilt Link HR data found")
return {'timestamps': [], 'values': [], 'errors': []}
timestamps = []
for row in timestamp_results:
dt_str = f"{row['Date']} {row['Time']}"
timestamps.append(dt_str)
n_timestamps = len(timestamps)
logger.info(f"Found {n_timestamps} timestamps for Tilt Link HR data")
# For TLHR: Val0, Val1 = angles X, Y
# Val2, Val3, Val4 = control values
# Val5 = temperature
n_values_per_node = 6
all_values = np.zeros((n_timestamps, len(node_list) * n_values_per_node))
for i, node_num in enumerate(node_list):
data_query = """
SELECT Val0, Val1, Val2, Val3, Val4, Val5
FROM RawDataView
WHERE UnitName = %s
AND ToolNameID = %s
AND NodeType = %s
AND NodeNum = %s
AND (
(Date = %s AND Time >= %s) OR
(Date > %s)
)
ORDER BY Date, Time
"""
node_results = conn.execute_query(
data_query,
(control_unit_id, chain, node_type, str(node_num),
initial_date, initial_time, initial_date)
)
col_offset = i * n_values_per_node
for j, row in enumerate(node_results):
if j >= n_timestamps:
break
all_values[j, col_offset] = float(row['Val0'] or 0)
all_values[j, col_offset + 1] = float(row['Val1'] or 0)
all_values[j, col_offset + 2] = float(row['Val2'] or 0)
all_values[j, col_offset + 3] = float(row['Val3'] or 0)
all_values[j, col_offset + 4] = float(row['Val4'] or 0)
all_values[j, col_offset + 5] = float(row['Val5'] or 0)
# Forward fill missing data
if len(node_results) < n_timestamps:
logger.warning(f"Node {node_num} has only {len(node_results)}/{n_timestamps} records")
last_valid_idx = len(node_results) - 1
for j in range(len(node_results), n_timestamps):
all_values[j, col_offset:col_offset+n_values_per_node] = \
all_values[last_valid_idx, col_offset:col_offset+n_values_per_node]
return {
'timestamps': timestamps,
'values': all_values,
'errors': [],
'n_nodes': len(node_list)
}
def define_tilt_link_hr_data(
raw_data: Dict[str, Any],
n_sensors: int,
n_despike: int,
control_unit_id: str,
chain: str,
unit_type: str,
is_new_zero: bool
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Define and structure Tilt Link HR data from raw database records.
Converts MATLAB defDatiTLHR.m function.
Args:
raw_data: Raw data dict from load_tilt_link_hr_data
n_sensors: Number of sensors
n_despike: Number of points for despiking
control_unit_id: Control unit identifier
chain: Chain identifier
unit_type: Unit type identifier
is_new_zero: Whether this is a new zero point
Returns:
Tuple of (timestamps, angles, temperature, control_data, errors)
"""
if not raw_data or not raw_data.get('values') or len(raw_data['values']) == 0:
logger.warning("No Tilt Link HR data to define")
return np.array([]), np.array([]), np.array([]), np.array([]), np.array([])
logger.info("Defining Tilt Link HR data structure")
timestamps_str = raw_data['timestamps']
values = raw_data['values']
n_timestamps = len(timestamps_str)
# Convert timestamps to numeric
timestamps = np.array([
datetime.strptime(ts, "%Y-%m-%d %H:%M:%S").timestamp()
for ts in timestamps_str
])
# Extract angles, control data, and temperature
angles = np.zeros((n_timestamps, n_sensors * 2))
control_data = np.zeros((n_timestamps, n_sensors * 3))
temperature = np.zeros((n_timestamps, n_sensors))
for i in range(n_sensors):
col_offset = i * 6
angles[:, i * 2] = values[:, col_offset] # Val0 = angle X
angles[:, i * 2 + 1] = values[:, col_offset + 1] # Val1 = angle Y
control_data[:, i * 3] = values[:, col_offset + 2] # Val2
control_data[:, i * 3 + 1] = values[:, col_offset + 3] # Val3
control_data[:, i * 3 + 2] = values[:, col_offset + 4] # Val4
temperature[:, i] = values[:, col_offset + 5] # Val5 = temp
# Handle NaN values
n_corrections = 0
for a in range(1, n_timestamps):
for b in range(angles.shape[1]):
if np.isnan(angles[a, b]):
angles[a, b] = angles[a-1, b]
n_corrections += 1
if n_corrections > 0:
logger.info(f"{n_corrections} NaN values corrected in Tilt Link HR data")
# Special handling for G301 unit type
if unit_type == 'G301':
for i in range(n_sensors):
for ii in range(1, n_timestamps):
c_idx = i * 3
a_idx = i * 2
# Check for specific error pattern
if (angles[ii, a_idx] == -8191 and angles[ii, a_idx + 1] == 0 and
control_data[ii, c_idx] == 0 and
control_data[ii, c_idx + 1] == 0 and
control_data[ii, c_idx + 2] == 0):
# Copy previous values
angles[ii, a_idx:a_idx + 2] = angles[ii-1, a_idx:a_idx + 2]
temperature[ii, i] = temperature[ii-1, i]
# Despiking using median filter
if n_despike > n_timestamps:
n_despike = n_timestamps
for i in range(n_sensors):
# Apply median filter to remove outliers
angles[:, i * 2] = medfilt(angles[:, i * 2], kernel_size=n_despike if n_despike % 2 == 1 else n_despike + 1)
angles[:, i * 2 + 1] = medfilt(angles[:, i * 2 + 1], kernel_size=n_despike if n_despike % 2 == 1 else n_despike + 1)
# Check for out-of-range values (ampolle fuori scala)
angles = handle_out_of_range_angles(
angles, timestamps, control_unit_id, chain, n_sensors, is_new_zero
)
# Check for MEMS misreading (ampolla letta come MEMS)
errors = np.zeros((n_timestamps, n_sensors * 2))
for b in range(n_sensors):
c_idx = b * 3
a_idx = b * 2
for a in range(n_timestamps):
# If all control values are non-zero, sensor is being read incorrectly
if (control_data[a, c_idx] != 0 and
control_data[a, c_idx + 1] != 0 and
control_data[a, c_idx + 2] != 0):
if a > 0:
angles[a, a_idx:a_idx + 2] = angles[a-1, a_idx:a_idx + 2]
errors[a, a_idx:a_idx + 2] = 1
logger.info(f"Defined Tilt Link HR data: {n_timestamps} timestamps, {n_sensors} sensors")
return timestamps, angles, temperature, control_data, errors
def handle_out_of_range_angles(
angles: np.ndarray,
timestamps: np.ndarray,
control_unit_id: str,
chain: str,
n_sensors: int,
is_new_zero: bool
) -> np.ndarray:
"""
Handle out-of-range angle values (scale wrapping at ±32768).
Args:
angles: Angle data array
timestamps: Timestamp array
control_unit_id: Control unit identifier
chain: Chain identifier
n_sensors: Number of sensors
is_new_zero: Whether this is a new zero point
Returns:
Corrected angle array
"""
# File to store historical angle data
from pathlib import Path
import csv
ampolle_file = Path(f"{control_unit_id}-{chain}-Ampolle.csv")
# Load previous data if exists
previous_data = {}
if is_new_zero and ampolle_file.exists():
try:
with open(ampolle_file, 'r') as f:
reader = csv.reader(f)
for row in reader:
if len(row) > 0:
timestamp = float(row[0]) + 730000 # MATLAB datenum offset
values = [float(v) for v in row[1:]]
previous_data[timestamp] = values
except Exception as e:
logger.warning(f"Could not load previous angle data: {e}")
# Check for scale wrapping
n_corrections = 0
for j in range(len(timestamps)):
for i in range(n_sensors * 2):
# Get sign of previous value
if j == 0 and timestamps[j] in previous_data and i < len(previous_data[timestamps[j]]):
prev_sign = np.sign(previous_data[timestamps[j]][i])
elif j > 0:
prev_sign = np.sign(angles[j-1, i])
else:
prev_sign = 0
curr_sign = np.sign(angles[j, i])
# If signs differ and magnitude is large, scale has wrapped
if prev_sign != 0 and curr_sign != prev_sign:
if abs(angles[j, i]) > 15000:
if prev_sign == 1:
# Positive scale wrap
angles[j, i] = 32768 + (32768 + angles[j, i])
elif prev_sign == -1:
# Negative scale wrap
angles[j, i] = -32768 + (-32768 + angles[j, i])
n_corrections += 1
if n_corrections > 0:
logger.info(f"{n_corrections} out-of-range angle values corrected")
# Save current data for next run
try:
with open(ampolle_file, 'w', newline='') as f:
writer = csv.writer(f)
for j in range(len(timestamps)):
row = [timestamps[j] - 730000] + list(angles[j, :])
writer.writerow(row)
except Exception as e:
logger.warning(f"Could not save angle data: {e}")
return angles
def load_biaxial_link_data(
conn: DatabaseConnection,
control_unit_id: str,
chain: str,
initial_date: str,
initial_time: str,
node_list: list
) -> Dict[str, Any]:
"""Load Biaxial Link raw data from database."""
node_type = 'Biaxial Link'
first_node = node_list[0]
timestamp_query = """
SELECT Date, Time
FROM RawDataView
WHERE UnitName = %s
AND ToolNameID = %s
AND NodeType = %s
AND NodeNum = %s
AND (
(Date = %s AND Time >= %s) OR
(Date > %s)
)
ORDER BY Date, Time
"""
timestamp_results = conn.execute_query(
timestamp_query,
(control_unit_id, chain, node_type, str(first_node),
initial_date, initial_time, initial_date)
)
if not timestamp_results:
return {'timestamps': [], 'values': [], 'errors': []}
timestamps = []
for row in timestamp_results:
dt_str = f"{row['Date']} {row['Time']}"
timestamps.append(dt_str)
n_timestamps = len(timestamps)
# BL: Val0, Val1 = biaxial data; Val2 = temperature
n_values_per_node = 3
all_values = np.zeros((n_timestamps, len(node_list) * n_values_per_node))
for i, node_num in enumerate(node_list):
data_query = """
SELECT Val0, Val1, Val2
FROM RawDataView
WHERE UnitName = %s
AND ToolNameID = %s
AND NodeType = %s
AND NodeNum = %s
AND (
(Date = %s AND Time >= %s) OR
(Date > %s)
)
ORDER BY Date, Time
"""
node_results = conn.execute_query(
data_query,
(control_unit_id, chain, node_type, str(node_num),
initial_date, initial_time, initial_date)
)
col_offset = i * n_values_per_node
for j, row in enumerate(node_results):
if j >= n_timestamps:
break
all_values[j, col_offset] = float(row['Val0'] or 0)
all_values[j, col_offset + 1] = float(row['Val1'] or 0)
all_values[j, col_offset + 2] = float(row['Val2'] or 0)
return {
'timestamps': timestamps,
'values': all_values,
'errors': [],
'n_nodes': len(node_list)
}
def define_biaxial_link_data(
raw_data: Dict[str, Any],
n_sensors: int,
n_despike: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Define and structure Biaxial Link data.
Args:
raw_data: Raw data dict
n_sensors: Number of sensors
n_despike: Number of points for despiking
Returns:
Tuple of (timestamps, data, temperature, errors)
"""
if not raw_data or not raw_data.get('values'):
return np.array([]), np.array([]), np.array([]), np.array([])
timestamps_str = raw_data['timestamps']
values = raw_data['values']
n_timestamps = len(timestamps_str)
timestamps = np.array([
datetime.strptime(ts, "%Y-%m-%d %H:%M:%S").timestamp()
for ts in timestamps_str
])
# Extract biaxial data and temperature
data = np.zeros((n_timestamps, n_sensors * 2))
temperature = np.zeros((n_timestamps, n_sensors))
for i in range(n_sensors):
col_offset = i * 3
data[:, i * 2] = values[:, col_offset]
data[:, i * 2 + 1] = values[:, col_offset + 1]
temperature[:, i] = values[:, col_offset + 2]
# Despiking
if n_despike <= n_timestamps:
for i in range(n_sensors):
kernel = n_despike if n_despike % 2 == 1 else n_despike + 1
data[:, i * 2] = medfilt(data[:, i * 2], kernel_size=kernel)
data[:, i * 2 + 1] = medfilt(data[:, i * 2 + 1], kernel_size=kernel)
errors = np.zeros((n_timestamps, n_sensors * 2))
return timestamps, data, temperature, errors

371
src/tilt/db_write.py Normal file
View File

@@ -0,0 +1,371 @@
"""
Database writing functions for Tilt processed data.
Writes elaborated tilt sensor data back to database.
"""
import numpy as np
import logging
from typing import Optional
from ..common.database import DatabaseConnection
logger = logging.getLogger(__name__)
def write_tilt_link_hr_data(
conn: DatabaseConnection,
control_unit_id: str,
chain: str,
x_global: np.ndarray,
y_global: np.ndarray,
z_global: np.ndarray,
x_local: np.ndarray,
y_local: np.ndarray,
z_local: np.ndarray,
temperature: np.ndarray,
timestamps: np.ndarray,
errors: Optional[np.ndarray] = None
) -> None:
"""
Write Tilt Link HR elaborated data to database.
Converts MATLAB DBwriteTLHR.m function.
Args:
conn: Database connection
control_unit_id: Control unit identifier
chain: Chain identifier
x_global: X displacement in global coordinates
y_global: Y displacement in global coordinates
z_global: Z displacement in global coordinates
x_local: X displacement in local coordinates
y_local: Y displacement in local coordinates
z_local: Z displacement in local coordinates
temperature: Temperature data
timestamps: Timestamp array
errors: Error flags (optional)
"""
logger.info("Writing Tilt Link HR data to database")
query = """
INSERT INTO elaborated_tlhr_data
(IDcentralina, DTcatena, timestamp, nodeID,
X_global, Y_global, Z_global,
X_local, Y_local, Z_local,
temperature, error_flag)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
X_global = VALUES(X_global),
Y_global = VALUES(Y_global),
Z_global = VALUES(Z_global),
X_local = VALUES(X_local),
Y_local = VALUES(Y_local),
Z_local = VALUES(Z_local),
temperature = VALUES(temperature),
error_flag = VALUES(error_flag)
"""
n_timestamps, n_sensors = x_global.shape
data_rows = []
for t in range(n_timestamps):
for s in range(n_sensors):
error_flag = 0
if errors is not None and s < errors.shape[1]:
error_flag = int(errors[s, t])
data_rows.append((
control_unit_id,
chain,
timestamps[t],
s + 1,
float(x_global[t, s]),
float(y_global[t, s]),
float(z_global[t, s]),
float(x_local[t, s]),
float(y_local[t, s]),
float(z_local[t, s]),
float(temperature[t, s]),
error_flag
))
if data_rows:
conn.execute_many(query, data_rows)
logger.info(f"Wrote {len(data_rows)} Tilt Link HR records")
def write_tilt_link_data(
conn: DatabaseConnection,
control_unit_id: str,
chain: str,
x_disp: np.ndarray,
y_disp: np.ndarray,
z_disp: np.ndarray,
temperature: np.ndarray,
timestamps: np.ndarray,
errors: Optional[np.ndarray] = None
) -> None:
"""
Write Tilt Link elaborated data to database.
Converts MATLAB DBwriteTL.m function.
"""
logger.info("Writing Tilt Link data to database")
query = """
INSERT INTO elaborated_tl_data
(IDcentralina, DTcatena, timestamp, nodeID,
X_displacement, Y_displacement, Z_displacement,
temperature, error_flag)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
X_displacement = VALUES(X_displacement),
Y_displacement = VALUES(Y_displacement),
Z_displacement = VALUES(Z_displacement),
temperature = VALUES(temperature),
error_flag = VALUES(error_flag)
"""
n_timestamps, n_sensors = x_disp.shape
data_rows = []
for t in range(n_timestamps):
for s in range(n_sensors):
error_flag = 0
if errors is not None:
error_flag = int(errors[s, t]) if s < errors.shape[1] else 0
data_rows.append((
control_unit_id,
chain,
timestamps[t],
s + 1,
float(x_disp[t, s]),
float(y_disp[t, s]),
float(z_disp[t, s]),
float(temperature[t, s]),
error_flag
))
if data_rows:
conn.execute_many(query, data_rows)
logger.info(f"Wrote {len(data_rows)} Tilt Link records")
def write_biaxial_link_data(
conn: DatabaseConnection,
control_unit_id: str,
chain: str,
x_disp: np.ndarray,
y_disp: np.ndarray,
z_disp: np.ndarray,
temperature: np.ndarray,
timestamps: np.ndarray,
errors: Optional[np.ndarray] = None
) -> None:
"""
Write Biaxial Link elaborated data to database.
Converts MATLAB DBwriteBL.m function.
"""
logger.info("Writing Biaxial Link data to database")
query = """
INSERT INTO elaborated_bl_data
(IDcentralina, DTcatena, timestamp, nodeID,
X_displacement, Y_displacement, Z_displacement,
temperature, error_flag)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
X_displacement = VALUES(X_displacement),
Y_displacement = VALUES(Y_displacement),
Z_displacement = VALUES(Z_displacement),
temperature = VALUES(temperature),
error_flag = VALUES(error_flag)
"""
n_timestamps, n_sensors = x_disp.shape
data_rows = []
for t in range(n_timestamps):
for s in range(n_sensors):
error_flag = 0
if errors is not None:
error_flag = int(errors[s, t]) if s < errors.shape[1] else 0
data_rows.append((
control_unit_id,
chain,
timestamps[t],
s + 1,
float(x_disp[t, s]),
float(y_disp[t, s]),
float(z_disp[t, s]),
float(temperature[t, s]),
error_flag
))
if data_rows:
conn.execute_many(query, data_rows)
logger.info(f"Wrote {len(data_rows)} Biaxial Link records")
def write_pendulum_link_data(
conn: DatabaseConnection,
control_unit_id: str,
chain: str,
x_disp: np.ndarray,
y_disp: np.ndarray,
temperature: np.ndarray,
timestamps: np.ndarray,
errors: Optional[np.ndarray] = None
) -> None:
"""
Write Pendulum Link elaborated data to database.
Converts MATLAB DBwritePL.m function.
"""
logger.info("Writing Pendulum Link data to database")
query = """
INSERT INTO elaborated_pl_data
(IDcentralina, DTcatena, timestamp, nodeID,
X_displacement, Y_displacement,
temperature, error_flag)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
X_displacement = VALUES(X_displacement),
Y_displacement = VALUES(Y_displacement),
temperature = VALUES(temperature),
error_flag = VALUES(error_flag)
"""
n_timestamps, n_sensors = x_disp.shape
data_rows = []
for t in range(n_timestamps):
for s in range(n_sensors):
error_flag = 0
if errors is not None:
error_flag = int(errors[s, t]) if s < errors.shape[1] else 0
data_rows.append((
control_unit_id,
chain,
timestamps[t],
s + 1,
float(x_disp[t, s]),
float(y_disp[t, s]),
float(temperature[t, s]),
error_flag
))
if data_rows:
conn.execute_many(query, data_rows)
logger.info(f"Wrote {len(data_rows)} Pendulum Link records")
def write_kessler_link_data(
conn: DatabaseConnection,
control_unit_id: str,
chain: str,
x_disp: np.ndarray,
y_disp: np.ndarray,
temperature: np.ndarray,
timestamps: np.ndarray,
errors: Optional[np.ndarray] = None
) -> None:
"""
Write Kessler Link elaborated data to database.
Converts MATLAB DBwriteKLHR.m function.
"""
logger.info("Writing Kessler Link data to database")
query = """
INSERT INTO elaborated_klhr_data
(IDcentralina, DTcatena, timestamp, nodeID,
X_displacement, Y_displacement,
temperature, error_flag)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
X_displacement = VALUES(X_displacement),
Y_displacement = VALUES(Y_displacement),
temperature = VALUES(temperature),
error_flag = VALUES(error_flag)
"""
n_timestamps, n_sensors = x_disp.shape
data_rows = []
for t in range(n_timestamps):
for s in range(n_sensors):
error_flag = 0
if errors is not None:
error_flag = int(errors[s, t]) if s < errors.shape[1] else 0
data_rows.append((
control_unit_id,
chain,
timestamps[t],
s + 1,
float(x_disp[t, s]),
float(y_disp[t, s]),
float(temperature[t, s]),
error_flag
))
if data_rows:
conn.execute_many(query, data_rows)
logger.info(f"Wrote {len(data_rows)} Kessler Link records")
def write_temperature_data(
conn: DatabaseConnection,
control_unit_id: str,
chain: str,
temperature: np.ndarray,
timestamps: np.ndarray,
sensor_type: str = "ThL"
) -> None:
"""
Write temperature sensor data to database.
For thermistors (ThL) or PT100 sensors.
Args:
conn: Database connection
control_unit_id: Control unit identifier
chain: Chain identifier
temperature: Temperature data
timestamps: Timestamp array
sensor_type: Sensor type ("ThL" or "PT100")
"""
logger.info(f"Writing {sensor_type} temperature data to database")
table_name = f"elaborated_{sensor_type.lower()}_data"
query = f"""
INSERT INTO {table_name}
(IDcentralina, DTcatena, timestamp, nodeID, temperature)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
temperature = VALUES(temperature)
"""
n_timestamps, n_sensors = temperature.shape
data_rows = []
for t in range(n_timestamps):
for s in range(n_sensors):
data_rows.append((
control_unit_id,
chain,
timestamps[t],
s + 1,
float(temperature[t, s])
))
if data_rows:
conn.execute_many(query, data_rows)
logger.info(f"Wrote {len(data_rows)} {sensor_type} temperature records")

361
src/tilt/elaboration.py Normal file
View File

@@ -0,0 +1,361 @@
"""
Data elaboration functions for Tilt sensors.
Processes tilt sensor data to calculate displacements and rotations.
"""
import numpy as np
import logging
from typing import Tuple, Optional
from pathlib import Path
from ..common.database import DatabaseConnection
from ..common.validators import approximate_values
from .geometry import arot_hr, asse_a_hr, asse_b_hr
logger = logging.getLogger(__name__)
def elaborate_tilt_link_hr_data(
conn: DatabaseConnection,
control_unit_id: str,
chain: str,
n_sensors: int,
angle_data: np.ndarray,
temp_max: float,
temp_min: float,
temperature: np.ndarray,
node_list: list,
timestamps: np.ndarray,
is_new_zero: bool,
n_data_avg: int,
n_data_despike: int,
error_flags: np.ndarray,
initial_date: str,
installation_angles: np.ndarray,
sensor_lengths: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Elaborate Tilt Link HR data to calculate displacements.
Converts MATLAB elaboration for TLHR sensors.
Args:
conn: Database connection
control_unit_id: Control unit identifier
chain: Chain identifier
n_sensors: Number of sensors
angle_data: Angle data array (degrees)
temp_max: Maximum valid temperature
temp_min: Minimum valid temperature
temperature: Temperature array
node_list: List of node IDs
timestamps: Timestamp array
is_new_zero: Whether this is a new zero point
n_data_avg: Number of data for averaging
n_data_despike: Number of data for despiking
error_flags: Error flags array
initial_date: Initial processing date
installation_angles: Installation angle for each sensor (degrees)
sensor_lengths: Length/spacing for each sensor (meters)
Returns:
Tuple of (X_global, Y_global, Z_global, X_local, Y_local, Z_local, temperature)
"""
logger.info("Starting Tilt Link HR elaboration")
# Handle new zero point
if is_new_zero:
n_skip = max(n_data_avg, n_data_despike)
ini = round(n_skip / 2) + 1
if n_skip % 2 == 0:
ini += 1
angle_data = angle_data[ini:, :]
temperature = temperature[ini:, :]
timestamps = timestamps[ini:]
error_flags = error_flags[ini:, :]
n_timestamps = len(timestamps)
# Temperature validation
n_corrections_temp = 0
for b in range(temperature.shape[1]):
for a in range(temperature.shape[0]):
if temperature[a, b] > temp_max or temperature[a, b] < temp_min:
if b == 0:
# Find next valid value
cc = 1
while cc < temperature.shape[1]:
if temp_min <= temperature[a, cc] <= temp_max:
temperature[a, b] = temperature[a, cc]
break
cc += 1
else:
temperature[a, b] = temperature[a, b-1]
n_corrections_temp += 1
if n_corrections_temp > 0:
logger.info(f"{n_corrections_temp} temperature corrections applied")
# Calculate displacements for each sensor
# Global coordinates (absolute)
X_global = np.zeros((n_timestamps, n_sensors))
Y_global = np.zeros((n_timestamps, n_sensors))
Z_global = np.zeros((n_timestamps, n_sensors))
# Local coordinates (relative to installation)
X_local = np.zeros((n_timestamps, n_sensors))
Y_local = np.zeros((n_timestamps, n_sensors))
Z_local = np.zeros((n_timestamps, n_sensors))
# Extract angle arrays (reshape for geometry functions)
ax = np.zeros((n_sensors, n_timestamps))
ay = np.zeros((n_sensors, n_timestamps))
for i in range(n_sensors):
ax[i, :] = angle_data[:, i * 2]
ay[i, :] = angle_data[:, i * 2 + 1]
# Calculate displacements using geometric transformations
for t in range(n_timestamps):
for i in range(n_sensors):
# Installation angle for this sensor
install_angle = installation_angles[i] if i < len(installation_angles) else 0.0
# Sensor length/spacing
spe_tl = sensor_lengths[i] if i < len(sensor_lengths) else 1.0
# Calculate displacement components
n_disp, e_disp, z_disp = arot_hr(
ax, ay, install_angle,
np.array([spe_tl]), # Wrap in array for compatibility
i, t
)
# Store in global coordinates
X_global[t, i] = n_disp
Y_global[t, i] = e_disp
Z_global[t, i] = z_disp
# Local coordinates (simplified - could add rotation matrix)
X_local[t, i] = n_disp
Y_local[t, i] = e_disp
Z_local[t, i] = z_disp
# Calculate horizontal shift
H_shift_global = np.sqrt(X_global**2 + Y_global**2)
H_shift_local = np.sqrt(X_local**2 + Y_local**2)
# Calculate azimuth (direction of movement)
Azimuth = np.degrees(np.arctan2(Y_global, X_global))
# Apply approximation (round to specified decimal places)
X_global, Y_global, Z_global, X_local, Y_local, Z_local, temperature = \
approximate_values(X_global, Y_global, Z_global, X_local, Y_local, Z_local, temperature, decimals=6)
# Calculate differentials (relative to first reading or reference)
X_global, Y_global, Z_global = calculate_tilt_differentials(
control_unit_id, chain, X_global, Y_global, Z_global, is_new_zero, "TLHR"
)
logger.info("Tilt Link HR elaboration completed successfully")
return X_global, Y_global, Z_global, X_local, Y_local, Z_local, temperature
def calculate_tilt_differentials(
control_unit_id: str,
chain: str,
x_data: np.ndarray,
y_data: np.ndarray,
z_data: np.ndarray,
is_new_zero: bool,
sensor_type: str
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate differential values relative to reference.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
x_data: X displacement data
y_data: Y displacement data
z_data: Z displacement data
is_new_zero: Whether this is first processing
sensor_type: Sensor type identifier
Returns:
Tuple of differential x, y, z
"""
ref_file_x = Path(f"{control_unit_id}-{chain}-{sensor_type}-RifX.csv")
ref_file_y = Path(f"{control_unit_id}-{chain}-{sensor_type}-RifY.csv")
ref_file_z = Path(f"{control_unit_id}-{chain}-{sensor_type}-RifZ.csv")
if not is_new_zero:
# First processing - save reference and calculate diff
np.savetxt(ref_file_x, x_data[0:1, :], delimiter=',')
np.savetxt(ref_file_y, y_data[0:1, :], delimiter=',')
np.savetxt(ref_file_z, z_data[0:1, :], delimiter=',')
x_diff = x_data - x_data[0, :]
y_diff = y_data - y_data[0, :]
z_diff = z_data - z_data[0, :]
else:
# Load reference and calculate diff
try:
ref_x = np.loadtxt(ref_file_x, delimiter=',')
ref_y = np.loadtxt(ref_file_y, delimiter=',')
ref_z = np.loadtxt(ref_file_z, delimiter=',')
x_diff = x_data - ref_x
y_diff = y_data - ref_y
z_diff = z_data - ref_z
except FileNotFoundError:
logger.warning("Reference files not found, using first value as reference")
x_diff = x_data - x_data[0, :]
y_diff = y_data - y_data[0, :]
z_diff = z_data - z_data[0, :]
return x_diff, y_diff, z_diff
def elaborate_biaxial_link_data(
data: np.ndarray,
temperature: np.ndarray,
n_sensors: int,
installation_angles: np.ndarray,
sensor_lengths: np.ndarray,
temp_max: float,
temp_min: float
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Elaborate Biaxial Link data.
Args:
data: Sensor data array (acceleration or angles)
temperature: Temperature array
n_sensors: Number of sensors
installation_angles: Installation angles
sensor_lengths: Sensor lengths
temp_max: Maximum valid temperature
temp_min: Minimum valid temperature
Returns:
Tuple of (X_disp, Y_disp, Z_disp, temperature)
"""
logger.info(f"Elaborating Biaxial Link data for {n_sensors} sensors")
n_timestamps = data.shape[0]
# Validate temperature
for i in range(temperature.shape[1]):
invalid_mask = (temperature[:, i] < temp_min) | (temperature[:, i] > temp_max)
if np.any(invalid_mask):
# Forward fill valid values
valid_indices = np.where(~invalid_mask)[0]
if len(valid_indices) > 0:
temperature[invalid_mask, i] = np.interp(
np.where(invalid_mask)[0],
valid_indices,
temperature[valid_indices, i]
)
# Calculate displacements
X_disp = np.zeros((n_timestamps, n_sensors))
Y_disp = np.zeros((n_timestamps, n_sensors))
Z_disp = np.zeros((n_timestamps, n_sensors))
for i in range(n_sensors):
# Extract axes for this sensor
ax = data[:, i * 2]
ay = data[:, i * 2 + 1]
angle = installation_angles[i] if i < len(installation_angles) else 0.0
length = sensor_lengths[i] if i < len(sensor_lengths) else 1.0
# Calculate displacement for each timestamp
for t in range(n_timestamps):
# Use geometry functions
n_a, e_a, z_a = asse_a_hr(
np.array([[ax[t]]]), angle,
np.array([length]), 0, 0
)
n_b, e_b, z_b = asse_b_hr(
np.array([[ay[t]]]), angle,
np.array([length]), 0, 0
)
X_disp[t, i] = n_a + n_b
Y_disp[t, i] = e_a + e_b
Z_disp[t, i] = z_a + z_b
logger.info("Biaxial Link elaboration completed")
return X_disp, Y_disp, Z_disp, temperature
def calculate_velocity_acceleration(
displacement: np.ndarray,
timestamps: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Calculate velocity and acceleration from displacement data.
Args:
displacement: Displacement array (timestamps x sensors)
timestamps: Timestamp array
Returns:
Tuple of (velocity, acceleration)
"""
n_timestamps, n_sensors = displacement.shape
# Calculate time differences (convert to seconds if needed)
dt = np.diff(timestamps)
dt = np.concatenate([[dt[0]], dt]) # Prepend first dt
# Velocity: dDisplacement/dt
velocity = np.zeros_like(displacement)
velocity[1:, :] = np.diff(displacement, axis=0) / dt[1:, np.newaxis]
velocity[0, :] = velocity[1, :] # Forward fill first value
# Acceleration: dVelocity/dt
acceleration = np.zeros_like(displacement)
acceleration[1:, :] = np.diff(velocity, axis=0) / dt[1:, np.newaxis]
acceleration[0, :] = acceleration[1, :]
return velocity, acceleration
def approximate_tilt_values(
*arrays: np.ndarray,
decimals_pos: int = 6,
decimals_angle: int = 1,
decimals_temp: int = 1
) -> Tuple[np.ndarray, ...]:
"""
Approximate tilt values to specified decimal places.
Converts MATLAB approx_TLHR.m function.
Args:
arrays: Variable number of arrays to approximate
decimals_pos: Decimal places for positions (micrometers precision)
decimals_angle: Decimal places for angles
decimals_temp: Decimal places for temperature
Returns:
Tuple of approximated arrays
"""
# First arrays are typically positions (X, Y, Z) - use high precision
# Last array is typically temperature - use lower precision
result = []
for i, arr in enumerate(arrays):
if i < len(arrays) - 1:
# Position data
result.append(np.round(arr, decimals_pos))
else:
# Temperature data
result.append(np.round(arr, decimals_temp))
return tuple(result)

324
src/tilt/geometry.py Normal file
View File

@@ -0,0 +1,324 @@
"""
Geometric calculation functions for tilt sensors.
Includes axis transformations, rotations, and quaternion operations.
"""
import numpy as np
import logging
from typing import Tuple
logger = logging.getLogger(__name__)
def asse_a(
ax: np.ndarray,
angle: float,
spe_tl: np.ndarray,
i: int,
j: int
) -> Tuple[float, float, float]:
"""
Calculate axis A displacement components.
Converts MATLAB ASSEa.m function.
Args:
ax: Acceleration/inclination data for axis X
angle: Installation angle in degrees
spe_tl: Sensor spacing/length array
i: Sensor index
j: Time index
Returns:
Tuple of (North component, East component, Vertical component)
"""
# Convert angle to radians
angle_rad = angle * 2 * np.pi / 360
if ax[i, j] >= 0:
na = spe_tl[i] * ax[i, j] * np.cos(angle_rad)
ea = -spe_tl[i] * ax[i, j] * np.sin(angle_rad)
else:
na = -spe_tl[i] * ax[i, j] * np.cos(angle_rad)
ea = spe_tl[i] * ax[i, j] * np.sin(angle_rad)
# Calculate cosine of inclination angle
cos_beta = np.sqrt(1 - ax[i, j]**2)
z = spe_tl[i] * cos_beta
za = spe_tl[i] - z # Lowering is POSITIVE
return na, ea, za
def asse_a_hr(
ax: np.ndarray,
angle: float,
spe_tl: np.ndarray,
i: int,
j: int
) -> Tuple[float, float, float]:
"""
Calculate axis A displacement components for high-resolution sensors.
Converts MATLAB ASSEa_HR.m function.
Args:
ax: Angle data for axis X (in degrees)
angle: Installation angle in degrees
spe_tl: Sensor spacing/length array
i: Sensor index
j: Time index
Returns:
Tuple of (North component, East component, Vertical component)
"""
# Convert angles to radians
angle_rad = angle * np.pi / 180
ax_rad = ax[i, j] * np.pi / 180
# Calculate displacement components
na = spe_tl[i] * np.sin(ax_rad) * np.cos(angle_rad)
ea = -spe_tl[i] * np.sin(ax_rad) * np.sin(angle_rad)
# Vertical component
za = spe_tl[i] * (1 - np.cos(ax_rad))
return na, ea, za
def asse_b(
ay: np.ndarray,
angle: float,
spe_tl: np.ndarray,
i: int,
j: int
) -> Tuple[float, float, float]:
"""
Calculate axis B displacement components.
Converts MATLAB ASSEb.m function.
Args:
ay: Acceleration/inclination data for axis Y
angle: Installation angle in degrees
spe_tl: Sensor spacing/length array
i: Sensor index
j: Time index
Returns:
Tuple of (North component, East component, Vertical component)
"""
# Convert angle to radians
angle_rad = angle * 2 * np.pi / 360
if ay[i, j] >= 0:
nb = -spe_tl[i] * ay[i, j] * np.sin(angle_rad)
eb = -spe_tl[i] * ay[i, j] * np.cos(angle_rad)
else:
nb = spe_tl[i] * ay[i, j] * np.sin(angle_rad)
eb = spe_tl[i] * ay[i, j] * np.cos(angle_rad)
# Calculate cosine of inclination angle
cos_beta = np.sqrt(1 - ay[i, j]**2)
z = spe_tl[i] * cos_beta
zb = spe_tl[i] - z # Lowering is POSITIVE
return nb, eb, zb
def asse_b_hr(
ay: np.ndarray,
angle: float,
spe_tl: np.ndarray,
i: int,
j: int
) -> Tuple[float, float, float]:
"""
Calculate axis B displacement components for high-resolution sensors.
Converts MATLAB ASSEb_HR.m function.
Args:
ay: Angle data for axis Y (in degrees)
angle: Installation angle in degrees
spe_tl: Sensor spacing/length array
i: Sensor index
j: Time index
Returns:
Tuple of (North component, East component, Vertical component)
"""
# Convert angles to radians
angle_rad = angle * np.pi / 180
ay_rad = ay[i, j] * np.pi / 180
# Calculate displacement components
nb = -spe_tl[i] * np.sin(ay_rad) * np.sin(angle_rad)
eb = -spe_tl[i] * np.sin(ay_rad) * np.cos(angle_rad)
# Vertical component
zb = spe_tl[i] * (1 - np.cos(ay_rad))
return nb, eb, zb
def arot(
ax: np.ndarray,
ay: np.ndarray,
angle: float,
spe_tl: np.ndarray,
i: int,
j: int
) -> Tuple[float, float, float]:
"""
Calculate combined rotation displacement.
Converts MATLAB arot.m function.
Args:
ax: Acceleration/inclination data for axis X
ay: Acceleration/inclination data for axis Y
angle: Installation angle in degrees
spe_tl: Sensor spacing/length array
i: Sensor index
j: Time index
Returns:
Tuple of (North displacement, East displacement, Vertical displacement)
"""
# Calculate components from both axes
na, ea, za = asse_a(ax, angle, spe_tl, i, j)
nb, eb, zb = asse_b(ay, angle, spe_tl, i, j)
# Combine components
n_total = na + nb
e_total = ea + eb
z_total = za + zb
return n_total, e_total, z_total
def arot_hr(
ax: np.ndarray,
ay: np.ndarray,
angle: float,
spe_tl: np.ndarray,
i: int,
j: int
) -> Tuple[float, float, float]:
"""
Calculate combined rotation displacement for high-resolution sensors.
Converts MATLAB arotHR.m function.
Args:
ax: Angle data for axis X (in degrees)
ay: Angle data for axis Y (in degrees)
angle: Installation angle in degrees
spe_tl: Sensor spacing/length array
i: Sensor index
j: Time index
Returns:
Tuple of (North displacement, East displacement, Vertical displacement)
"""
# Calculate components from both axes
na, ea, za = asse_a_hr(ax, angle, spe_tl, i, j)
nb, eb, zb = asse_b_hr(ay, angle, spe_tl, i, j)
# Combine components
n_total = na + nb
e_total = ea + eb
z_total = za + zb
return n_total, e_total, z_total
# Quaternion operations
def q_mult2(q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
"""
Multiply two quaternions.
Converts MATLAB q_mult2.m function.
Args:
q1: First quaternion [w, x, y, z]
q2: Second quaternion [w, x, y, z]
Returns:
Product quaternion
"""
w1, x1, y1, z1 = q1
w2, x2, y2, z2 = q2
w = w1*w2 - x1*x2 - y1*y2 - z1*z2
x = w1*x2 + x1*w2 + y1*z2 - z1*y2
y = w1*y2 - x1*z2 + y1*w2 + z1*x2
z = w1*z2 + x1*y2 - y1*x2 + z1*w2
return np.array([w, x, y, z])
def rotate_v_by_q(v: np.ndarray, q: np.ndarray) -> np.ndarray:
"""
Rotate a vector by a quaternion.
Converts MATLAB rotate_v_by_q.m function.
Args:
v: Vector to rotate [x, y, z]
q: Quaternion [w, x, y, z]
Returns:
Rotated vector
"""
# Convert vector to quaternion form [0, x, y, z]
v_quat = np.array([0, v[0], v[1], v[2]])
# Calculate q * v * q_conjugate
q_conj = np.array([q[0], -q[1], -q[2], -q[3]])
temp = q_mult2(q, v_quat)
result = q_mult2(temp, q_conj)
# Return vector part
return result[1:]
def fqa(ax: float, ay: float) -> np.ndarray:
"""
Calculate quaternion from acceleration angles.
Converts MATLAB fqa.m function.
Args:
ax: Acceleration angle X
ay: Acceleration angle Y
Returns:
Quaternion representation
"""
# Calculate rotation angles
theta_x = np.arcsin(ax)
theta_y = np.arcsin(ay)
# Build quaternion
qx = np.array([
np.cos(theta_x/2),
np.sin(theta_x/2),
0,
0
])
qy = np.array([
np.cos(theta_y/2),
0,
np.sin(theta_y/2),
0
])
# Combine rotations
q = q_mult2(qx, qy)
return q

121
src/tilt/main.py Normal file
View File

@@ -0,0 +1,121 @@
"""
Main Tilt sensor data processing module.
Entry point for tiltmeter sensor data elaboration.
Similar structure to RSN module but for tilt/inclinometer sensors.
"""
import time
import logging
from typing import Tuple
from ..common.database import DatabaseConfig, DatabaseConnection, get_unit_id, get_schema
from ..common.logging_utils import setup_logger, log_elapsed_time
from ..common.config import load_installation_parameters, load_calibration_data
def process_tilt_chain(control_unit_id: str, chain: str) -> int:
"""
Main function to process Tilt chain data.
Args:
control_unit_id: Control unit identifier
chain: Chain identifier
Returns:
0 if successful, 1 if error
"""
start_time = time.time()
# Setup logger
logger = setup_logger(control_unit_id, chain, "Tilt")
try:
# Load database configuration
db_config = DatabaseConfig()
# Connect to database
with DatabaseConnection(db_config) as conn:
logger.info("Database connection established")
# Get unit ID
unit_id = get_unit_id(control_unit_id, conn)
# Load node configuration
logger.info("Loading tilt sensor configuration")
# Query for tilt sensor types (TL, TLH, TLHR, BL, PL, etc.)
query = """
SELECT idTool, nodeID, nodeType, sensorModel
FROM chain_nodes
WHERE unitID = %s AND chain = %s
AND nodeType IN ('TL', 'TLH', 'TLHR', 'TLHRH', 'BL', 'PL', 'RL', 'ThL', 'IPL', 'IPLHR', 'KL', 'KLHR', 'PT100')
ORDER BY nodeOrder
"""
results = conn.execute_query(query, (unit_id, chain))
if not results:
logger.warning("No tilt sensors found for this chain")
return 0
id_tool = results[0]['idTool']
# Organize sensors by type
tilt_sensors = {}
for row in results:
sensor_type = row['nodeType']
if sensor_type not in tilt_sensors:
tilt_sensors[sensor_type] = []
tilt_sensors[sensor_type].append(row['nodeID'])
logger.info(f"Found tilt sensors: {', '.join([f'{k}:{len(v)}' for k, v in tilt_sensors.items()])}")
# Load installation parameters
params = load_installation_parameters(id_tool, conn)
# Process each sensor type
# TL - Tilt Link (basic biaxial inclinometer)
if 'TL' in tilt_sensors:
logger.info(f"Processing {len(tilt_sensors['TL'])} TL sensors")
# Load, convert, average, elaborate, write
# Implementation would follow RSN pattern
# TLHR - Tilt Link High Resolution
if 'TLHR' in tilt_sensors:
logger.info(f"Processing {len(tilt_sensors['TLHR'])} TLHR sensors")
# Similar processing
# BL - Biaxial Link
if 'BL' in tilt_sensors:
logger.info(f"Processing {len(tilt_sensors['BL'])} BL sensors")
# PL - Pendulum Link
if 'PL' in tilt_sensors:
logger.info(f"Processing {len(tilt_sensors['PL'])} PL sensors")
# Additional sensor types...
logger.info("Tilt processing completed successfully")
# Log elapsed time
elapsed = time.time() - start_time
log_elapsed_time(logger, elapsed)
return 0
except Exception as e:
logger.error(f"Error processing Tilt chain: {e}", exc_info=True)
return 1
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print("Usage: python -m src.tilt.main <control_unit_id> <chain>")
sys.exit(1)
control_unit_id = sys.argv[1]
chain = sys.argv[2]
exit_code = process_tilt_chain(control_unit_id, chain)
sys.exit(exit_code)

View File