Files
ASE/src/refactory_scripts/loaders/sisgeo_loader.py
2025-10-11 22:31:54 +02:00

414 lines
14 KiB
Python

"""
Sisgeo data loader - Refactored version with async support.
This script processes Sisgeo sensor data and loads it into the database.
Handles different node types with different data formats.
Replaces the legacy sisgeoLoadScript.py with modern async/await patterns.
"""
import asyncio
import logging
from datetime import datetime, timedelta
from decimal import Decimal
from refactory_scripts.config import DatabaseConfig
from refactory_scripts.utils import execute_query, get_db_connection
logger = logging.getLogger(__name__)
class SisgeoLoader:
"""Loads Sisgeo sensor data into the database with smart duplicate handling."""
# Node configuration constants
NODE_TYPE_PRESSURE = 1 # Node type 1: Pressure sensor (single value)
NODE_TYPE_VIBRATING_WIRE = 2 # Node type 2-5: Vibrating wire sensors (three values)
# Time threshold for duplicate detection (hours)
DUPLICATE_TIME_THRESHOLD_HOURS = 5
# Default values for missing data
DEFAULT_BAT_LEVEL = -1
DEFAULT_TEMPERATURE = -273
def __init__(self, db_config: DatabaseConfig):
"""
Initialize the Sisgeo loader.
Args:
db_config: Database configuration object
"""
self.db_config = db_config
self.conn = None
async def __aenter__(self):
"""Async context manager entry."""
self.conn = await get_db_connection(self.db_config.as_dict())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.conn:
self.conn.close()
async def _get_latest_record(
self, unit_name: str, tool_name: str, node_num: int
) -> dict | None:
"""
Get the latest record for a specific node.
Args:
unit_name: Unit name
tool_name: Tool name
node_num: Node number
Returns:
Latest record dict or None if not found
"""
query = """
SELECT *
FROM RAWDATACOR
WHERE UnitName = %s AND ToolNameID = %s AND NodeNum = %s
ORDER BY EventDate DESC, EventTime DESC
LIMIT 1
"""
result = await execute_query(
self.conn, query, (unit_name, tool_name, node_num), fetch_one=True
)
return result
async def _insert_pressure_data(
self,
unit_name: str,
tool_name: str,
node_num: int,
date: str,
time: str,
pressure: Decimal,
) -> bool:
"""
Insert or update pressure sensor data (Node type 1).
Logic:
- If no previous record exists, insert new record
- If previous record has NULL BatLevelModule:
- Check time difference
- If >= 5 hours: insert new record
- If < 5 hours: update existing record
- If previous record has non-NULL BatLevelModule: insert new record
Args:
unit_name: Unit name
tool_name: Tool name
node_num: Node number
date: Date string (YYYY-MM-DD)
time: Time string (HH:MM:SS)
pressure: Pressure value (in Pa, will be converted to hPa)
Returns:
True if operation was successful
"""
# Get latest record
latest = await self._get_latest_record(unit_name, tool_name, node_num)
# Convert pressure from Pa to hPa (*100)
pressure_hpa = pressure * 100
if not latest:
# No previous record, insert new
query = """
INSERT INTO RAWDATACOR
(UnitName, ToolNameID, NodeNum, EventDate, EventTime, BatLevel, Temperature, val0, BatLevelModule, TemperatureModule)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
unit_name,
tool_name,
node_num,
date,
time,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
pressure_hpa,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
)
await execute_query(self.conn, query, params)
logger.debug(
f"Inserted new pressure record: {unit_name}/{tool_name}/node{node_num}"
)
return True
# Check BatLevelModule status
if latest["BatLevelModule"] is None:
# Calculate time difference
old_datetime = datetime.strptime(
f"{latest['EventDate']} {latest['EventTime']}", "%Y-%m-%d %H:%M:%S"
)
new_datetime = datetime.strptime(f"{date} {time}", "%Y-%m-%d %H:%M:%S")
time_diff = new_datetime - old_datetime
if time_diff >= timedelta(hours=self.DUPLICATE_TIME_THRESHOLD_HOURS):
# Time difference >= 5 hours, insert new record
query = """
INSERT INTO RAWDATACOR
(UnitName, ToolNameID, NodeNum, EventDate, EventTime, BatLevel, Temperature, val0, BatLevelModule, TemperatureModule)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
unit_name,
tool_name,
node_num,
date,
time,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
pressure_hpa,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
)
await execute_query(self.conn, query, params)
logger.debug(
f"Inserted new pressure record (time diff: {time_diff}): {unit_name}/{tool_name}/node{node_num}"
)
else:
# Time difference < 5 hours, update existing record
query = """
UPDATE RAWDATACOR
SET val0 = %s, EventDate = %s, EventTime = %s
WHERE UnitName = %s AND ToolNameID = %s AND NodeNum = %s AND val0 IS NULL
ORDER BY EventDate DESC, EventTime DESC
LIMIT 1
"""
params = (pressure_hpa, date, time, unit_name, tool_name, node_num)
await execute_query(self.conn, query, params)
logger.debug(
f"Updated existing pressure record (time diff: {time_diff}): {unit_name}/{tool_name}/node{node_num}"
)
else:
# BatLevelModule is not NULL, insert new record
query = """
INSERT INTO RAWDATACOR
(UnitName, ToolNameID, NodeNum, EventDate, EventTime, BatLevel, Temperature, val0, BatLevelModule, TemperatureModule)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
unit_name,
tool_name,
node_num,
date,
time,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
pressure_hpa,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
)
await execute_query(self.conn, query, params)
logger.debug(
f"Inserted new pressure record (BatLevelModule not NULL): {unit_name}/{tool_name}/node{node_num}"
)
return True
async def _insert_vibrating_wire_data(
self,
unit_name: str,
tool_name: str,
node_num: int,
date: str,
time: str,
freq_hz: float,
therm_ohms: float,
freq_digit: float,
) -> bool:
"""
Insert or update vibrating wire sensor data (Node types 2-5).
Logic:
- If no previous record exists, insert new record
- If previous record has NULL BatLevelModule: update existing record
- If previous record has non-NULL BatLevelModule: insert new record
Args:
unit_name: Unit name
tool_name: Tool name
node_num: Node number
date: Date string (YYYY-MM-DD)
time: Time string (HH:MM:SS)
freq_hz: Frequency in Hz
therm_ohms: Thermistor in Ohms
freq_digit: Frequency in digits
Returns:
True if operation was successful
"""
# Get latest record
latest = await self._get_latest_record(unit_name, tool_name, node_num)
if not latest:
# No previous record, insert new
query = """
INSERT INTO RAWDATACOR
(UnitName, ToolNameID, NodeNum, EventDate, EventTime, BatLevel, Temperature, val0, val1, val2, BatLevelModule, TemperatureModule)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
unit_name,
tool_name,
node_num,
date,
time,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
freq_hz,
therm_ohms,
freq_digit,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
)
await execute_query(self.conn, query, params)
logger.debug(
f"Inserted new vibrating wire record: {unit_name}/{tool_name}/node{node_num}"
)
return True
# Check BatLevelModule status
if latest["BatLevelModule"] is None:
# Update existing record
query = """
UPDATE RAWDATACOR
SET val0 = %s, val1 = %s, val2 = %s, EventDate = %s, EventTime = %s
WHERE UnitName = %s AND ToolNameID = %s AND NodeNum = %s AND val0 IS NULL
ORDER BY EventDate DESC, EventTime DESC
LIMIT 1
"""
params = (freq_hz, therm_ohms, freq_digit, date, time, unit_name, tool_name, node_num)
await execute_query(self.conn, query, params)
logger.debug(
f"Updated existing vibrating wire record: {unit_name}/{tool_name}/node{node_num}"
)
else:
# BatLevelModule is not NULL, insert new record
query = """
INSERT INTO RAWDATACOR
(UnitName, ToolNameID, NodeNum, EventDate, EventTime, BatLevel, Temperature, val0, val1, val2, BatLevelModule, TemperatureModule)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
unit_name,
tool_name,
node_num,
date,
time,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
freq_hz,
therm_ohms,
freq_digit,
self.DEFAULT_BAT_LEVEL,
self.DEFAULT_TEMPERATURE,
)
await execute_query(self.conn, query, params)
logger.debug(
f"Inserted new vibrating wire record (BatLevelModule not NULL): {unit_name}/{tool_name}/node{node_num}"
)
return True
async def process_data(
self, raw_data: list[tuple], elab_data: list[tuple]
) -> tuple[int, int]:
"""
Process raw and elaborated data from Sisgeo sensors.
Args:
raw_data: List of raw data tuples
elab_data: List of elaborated data tuples
Returns:
Tuple of (raw_records_processed, elab_records_processed)
"""
raw_count = 0
elab_count = 0
# Process raw data
for record in raw_data:
try:
if len(record) == 6:
# Pressure sensor data (node type 1)
unit_name, tool_name, node_num, pressure, date, time = record
success = await self._insert_pressure_data(
unit_name, tool_name, node_num, date, time, Decimal(pressure)
)
if success:
raw_count += 1
elif len(record) == 8:
# Vibrating wire sensor data (node types 2-5)
(
unit_name,
tool_name,
node_num,
freq_hz,
therm_ohms,
freq_digit,
date,
time,
) = record
success = await self._insert_vibrating_wire_data(
unit_name,
tool_name,
node_num,
date,
time,
freq_hz,
therm_ohms,
freq_digit,
)
if success:
raw_count += 1
else:
logger.warning(f"Unknown record format: {len(record)} fields")
except Exception as e:
logger.error(f"Failed to process raw record: {e}", exc_info=True)
logger.debug(f"Record: {record}")
# Process elaborated data (if needed)
# Note: The legacy script had elab_data parameter but didn't use it
# This can be implemented if elaborated data processing is needed
logger.info(f"Processed {raw_count} raw records, {elab_count} elaborated records")
return raw_count, elab_count
async def main():
"""
Main entry point for the Sisgeo loader.
Note: This is a library module, typically called by other scripts.
Direct execution is provided for testing purposes.
"""
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger.info("Sisgeo Loader module loaded")
logger.info("This is a library module. Use SisgeoLoader class in your scripts.")
if __name__ == "__main__":
asyncio.run(main())