Files
matlab-python/example_usage.py
2025-10-12 20:16:19 +02:00

282 lines
7.7 KiB
Python

"""
Example usage of the sensor data processing system.
This file demonstrates how to use the converted Python modules.
"""
import sys
import logging
from pathlib import Path
# Add src to Python path
sys.path.insert(0, str(Path(__file__).parent))
from src.rsn.main import process_rsn_chain
from src.tilt.main import process_tilt_chain
from src.atd.main import process_atd_chain
def example_rsn_processing():
"""
Example: Process RSN chain data for a control unit.
"""
print("=" * 60)
print("Example 1: RSN Chain Processing")
print("=" * 60)
control_unit_id = "CU001" # Control unit identifier
chain = "A" # Chain identifier
print(f"\nProcessing RSN data for unit {control_unit_id}, chain {chain}")
try:
result = process_rsn_chain(control_unit_id, chain)
if result == 0:
print("✓ RSN processing completed successfully")
print(f"Check log file: LogFile_RSN-{control_unit_id}-{chain}-*.txt")
else:
print("✗ RSN processing failed")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
return True
def example_tilt_processing():
"""
Example: Process Tilt sensor data.
"""
print("\n" + "=" * 60)
print("Example 2: Tilt Sensor Processing")
print("=" * 60)
control_unit_id = "CU002"
chain = "B"
print(f"\nProcessing Tilt data for unit {control_unit_id}, chain {chain}")
try:
result = process_tilt_chain(control_unit_id, chain)
if result == 0:
print("✓ Tilt processing completed successfully")
else:
print("✗ Tilt processing failed")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
return True
def example_atd_processing():
"""
Example: Process ATD sensor data (extensometers, crackmeters, etc.).
"""
print("\n" + "=" * 60)
print("Example 3: ATD Sensor Processing")
print("=" * 60)
control_unit_id = "CU003"
chain = "C"
print(f"\nProcessing ATD data for unit {control_unit_id}, chain {chain}")
try:
result = process_atd_chain(control_unit_id, chain)
if result == 0:
print("✓ ATD processing completed successfully")
else:
print("✗ ATD processing failed")
return False
except Exception as e:
print(f"✗ Error: {e}")
return False
return True
def example_database_connection():
"""
Example: Direct database connection and query.
"""
print("\n" + "=" * 60)
print("Example 4: Direct Database Access")
print("=" * 60)
from src.common.database import DatabaseConfig, DatabaseConnection
try:
# Load configuration
config = DatabaseConfig("DB.txt")
print(f"✓ Configuration loaded for database: {config.config['database']}")
# Connect to database
with DatabaseConnection(config) as conn:
print("✓ Database connection established")
# Example query
query = """
SELECT IDcentralina, DTcatena, COUNT(*) as sensor_count
FROM raw_rsn_data
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY IDcentralina, DTcatena
LIMIT 5
"""
print("\nRecent sensor data (last 7 days):")
results = conn.execute_query(query)
for row in results:
print(f" Unit: {row['IDcentralina']}, "
f"Chain: {row['DTcatena']}, "
f"Records: {row['sensor_count']}")
print("\n✓ Database operations completed")
return True
except Exception as e:
print(f"✗ Error: {e}")
return False
def example_data_validation():
"""
Example: Data validation and filtering.
"""
print("\n" + "=" * 60)
print("Example 5: Data Validation")
print("=" * 60)
import numpy as np
from src.common.validators import (
validate_temperature,
despike_data,
check_acceleration_vector
)
# Simulate sensor data
temperature = np.array([
[20.5, 21.3, 22.1],
[20.8, 21.5, 95.0], # Invalid: > 80°C
[20.9, 21.7, 22.3],
[21.0, -35.0, 22.5], # Invalid: < -30°C
])
print("\nOriginal temperature data:")
print(temperature)
# Validate temperature
invalid_mask, n_corrections = validate_temperature(temperature)
print(f"\n✓ Temperature validation: {n_corrections} invalid values found")
# Simulate acceleration data with spikes
acc_data = np.array([
[1.0, 1.0, 1.0],
[1.02, 1.01, 1.03],
[1.04, 5.0, 1.02], # Spike in second sensor
[1.03, 1.01, 1.04],
[1.05, 1.02, 1.03],
])
print("\nOriginal acceleration data:")
print(acc_data)
# Remove spikes
despiked, n_spikes = despike_data(acc_data, n_points=3, threshold=3.0)
print(f"\n✓ Despiking: {n_spikes} spikes removed")
print("Despiked data:")
print(despiked)
return True
def example_logging_setup():
"""
Example: Logging configuration.
"""
print("\n" + "=" * 60)
print("Example 6: Logging Setup")
print("=" * 60)
from src.common.logging_utils import setup_logger, log_function_start, log_function_end
# Setup logger
logger = setup_logger("TEST001", "A", "Example", log_dir=".")
# Log messages
log_function_start(logger, "example_function")
logger.info("Processing sensor data...")
logger.warning("This is a warning message")
log_function_end(logger, "example_function")
print("\n✓ Logging examples written to log file")
return True
def main():
"""
Run all examples.
"""
print("\n")
print("" + "=" * 58 + "")
print("║ Sensor Data Processing System - Python Examples ║")
print("" + "=" * 58 + "")
# Check if DB.txt exists
if not Path("DB.txt").exists():
print("\n⚠ Warning: DB.txt not found")
print("Create DB.txt with database credentials to run examples")
print("See DB.txt.example for format")
print("\nRunning examples that don't require database...")
# Run examples that don't need database
example_data_validation()
example_logging_setup()
return
# Run all examples
examples = [
("Data Validation", example_data_validation),
("Logging Setup", example_logging_setup),
("Database Connection", example_database_connection),
# Uncomment when you have actual data:
# ("RSN Processing", example_rsn_processing),
# ("Tilt Processing", example_tilt_processing),
# ("ATD Processing", example_atd_processing),
]
results = []
for name, func in examples:
try:
success = func()
results.append((name, success))
except Exception as e:
print(f"\n{name} failed with exception: {e}")
results.append((name, False))
# Summary
print("\n" + "=" * 60)
print("Summary")
print("=" * 60)
for name, success in results:
status = "✓ PASS" if success else "✗ FAIL"
print(f"{status}: {name}")
n_passed = sum(1 for _, success in results if success)
n_total = len(results)
print(f"\nTotal: {n_passed}/{n_total} examples passed")
if __name__ == "__main__":
main()