# Refactored Scripts - Modern Async Implementation This directory contains refactored versions of the legacy scripts from `old_scripts/`, reimplemented with modern Python best practices, async/await support, and proper error handling. ## Overview The refactored scripts provide the same functionality as their legacy counterparts but with significant improvements: ### Key Improvements ✅ **Full Async/Await Support** - Uses `aiomysql` for non-blocking database operations - Compatible with asyncio event loops - Can be integrated into existing async orchestrators ✅ **Proper Logging** - Uses Python's `logging` module instead of `print()` statements - Configurable log levels (DEBUG, INFO, WARNING, ERROR) - Structured log messages with context ✅ **Type Hints & Documentation** - Full type hints for all functions - Comprehensive docstrings following Google style - Self-documenting code ✅ **Error Handling** - Proper exception handling with logging - Retry logic available via utility functions - Graceful degradation ✅ **Configuration Management** - Centralized configuration via `DatabaseConfig` class - No hardcoded values - Environment-aware settings ✅ **Code Quality** - Follows PEP 8 style guide - Passes ruff linting - Clean, maintainable code structure ## Directory Structure ``` refactory_scripts/ ├── __init__.py # Package initialization ├── README.md # This file ├── config/ # Configuration management │ └── __init__.py # DatabaseConfig class ├── utils/ # Utility functions │ └── __init__.py # Database helpers, retry logic, etc. └── loaders/ # Data loader modules ├── __init__.py # Loader exports ├── hirpinia_loader.py ├── vulink_loader.py └── sisgeo_loader.py ``` ## Refactored Scripts ### 1. Hirpinia Loader (`hirpinia_loader.py`) **Replaces**: `old_scripts/hirpiniaLoadScript.py` **Purpose**: Processes Hirpinia ODS files and loads sensor data into the database. **Features**: - Parses ODS (OpenDocument Spreadsheet) files - Extracts data from multiple sheets (one per node) - Handles datetime parsing and validation - Batch inserts with `INSERT IGNORE` - Supports MATLAB elaboration triggering **Usage**: ```python from refactory_scripts.loaders import HirpiniaLoader from refactory_scripts.config import DatabaseConfig async def process_hirpinia_file(file_path: str): db_config = DatabaseConfig() async with HirpiniaLoader(db_config) as loader: success = await loader.process_file(file_path) return success ``` **Command Line**: ```bash python -m refactory_scripts.loaders.hirpinia_loader /path/to/file.ods ``` --- ### 2. Vulink Loader (`vulink_loader.py`) **Replaces**: `old_scripts/vulinkScript.py` **Purpose**: Processes Vulink CSV files with battery monitoring and pH alarm management. **Features**: - Serial number to unit/tool name mapping - Node configuration loading (depth, thresholds) - Battery level monitoring with alarm creation - pH threshold checking with multi-level alarms - Time-based alarm suppression (24h interval for battery) **Alarm Types**: - **Type 2**: Low battery alarms (<25%) - **Type 3**: pH threshold alarms (3 levels) **Usage**: ```python from refactory_scripts.loaders import VulinkLoader from refactory_scripts.config import DatabaseConfig async def process_vulink_file(file_path: str): db_config = DatabaseConfig() async with VulinkLoader(db_config) as loader: success = await loader.process_file(file_path) return success ``` **Command Line**: ```bash python -m refactory_scripts.loaders.vulink_loader /path/to/file.csv ``` --- ### 3. Sisgeo Loader (`sisgeo_loader.py`) **Replaces**: `old_scripts/sisgeoLoadScript.py` **Purpose**: Processes Sisgeo sensor data with smart duplicate handling. **Features**: - Handles two sensor types: - **Pressure sensors** (1 value): Piezometers - **Vibrating wire sensors** (3 values): Strain gauges, tiltmeters, etc. - Smart duplicate detection based on time thresholds - Conditional INSERT vs UPDATE logic - Preserves data integrity **Data Processing Logic**: | Scenario | BatLevelModule | Time Diff | Action | |----------|---------------|-----------|--------| | No previous record | N/A | N/A | INSERT | | Previous exists | NULL | >= 5h | INSERT | | Previous exists | NULL | < 5h | UPDATE | | Previous exists | NOT NULL | N/A | INSERT | **Usage**: ```python from refactory_scripts.loaders import SisgeoLoader from refactory_scripts.config import DatabaseConfig async def process_sisgeo_data(raw_data, elab_data): db_config = DatabaseConfig() async with SisgeoLoader(db_config) as loader: raw_count, elab_count = await loader.process_data(raw_data, elab_data) return raw_count, elab_count ``` --- ## Configuration ### Database Configuration Configuration is loaded from `env/config.ini`: ```ini [mysql] host = 10.211.114.173 port = 3306 database = ase_lar user = root password = **** ``` **Loading Configuration**: ```python from refactory_scripts.config import DatabaseConfig # Default: loads from env/config.ini, section [mysql] db_config = DatabaseConfig() # Custom file and section db_config = DatabaseConfig( config_file="/path/to/config.ini", section="production_db" ) # Access configuration print(db_config.host) print(db_config.database) # Get as dict for aiomysql conn_params = db_config.as_dict() ``` --- ## Utility Functions ### Database Helpers ```python from refactory_scripts.utils import get_db_connection, execute_query, execute_many # Get async database connection conn = await get_db_connection(db_config.as_dict()) # Execute query with single result result = await execute_query( conn, "SELECT * FROM table WHERE id = %s", (123,), fetch_one=True ) # Execute query with multiple results results = await execute_query( conn, "SELECT * FROM table WHERE status = %s", ("active",), fetch_all=True ) # Batch insert rows = [(1, "a"), (2, "b"), (3, "c")] count = await execute_many( conn, "INSERT INTO table (id, name) VALUES (%s, %s)", rows ) ``` ### Retry Logic ```python from refactory_scripts.utils import retry_on_failure # Retry with exponential backoff result = await retry_on_failure( some_async_function, max_retries=3, delay=1.0, backoff=2.0, arg1="value1", arg2="value2" ) ``` ### DateTime Parsing ```python from refactory_scripts.utils import parse_datetime # Parse ISO format dt = parse_datetime("2024-10-11T14:30:00") # Parse separate date and time dt = parse_datetime("2024-10-11", "14:30:00") # Parse date only dt = parse_datetime("2024-10-11") ``` --- ## Logging All loaders use Python's standard logging module: ```python import logging # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) # Use in scripts logger = logging.getLogger(__name__) logger.info("Processing started") logger.debug("Debug information") logger.warning("Warning message") logger.error("Error occurred", exc_info=True) ``` **Log Levels**: - `DEBUG`: Detailed diagnostic information - `INFO`: General informational messages - `WARNING`: Warning messages (non-critical issues) - `ERROR`: Error messages with stack traces --- ## Integration with Orchestrators The refactored loaders can be easily integrated into the existing orchestrator system: ```python # In your orchestrator worker from refactory_scripts.loaders import HirpiniaLoader from refactory_scripts.config import DatabaseConfig async def worker(worker_id: int, cfg: dict, pool: object) -> None: db_config = DatabaseConfig() async with HirpiniaLoader(db_config) as loader: # Process files from queue file_path = await get_next_file_from_queue() success = await loader.process_file(file_path) if success: await mark_file_processed(file_path) ``` --- ## Migration from Legacy Scripts ### Mapping Table | Legacy Script | Refactored Module | Class Name | |--------------|------------------|-----------| | `hirpiniaLoadScript.py` | `hirpinia_loader.py` | `HirpiniaLoader` | | `vulinkScript.py` | `vulink_loader.py` | `VulinkLoader` | | `sisgeoLoadScript.py` | `sisgeo_loader.py` | `SisgeoLoader` | | `sorotecPini.py` | ⏳ TODO | `SorotecLoader` | | `TS_PiniScript.py` | ⏳ TODO | `TSPiniLoader` | ### Key Differences 1. **Async/Await**: - Legacy: `conn = MySQLConnection(**db_config)` - Refactored: `conn = await get_db_connection(db_config.as_dict())` 2. **Error Handling**: - Legacy: `print('Error:', e)` - Refactored: `logger.error(f"Error: {e}", exc_info=True)` 3. **Configuration**: - Legacy: `read_db_config()` returns dict - Refactored: `DatabaseConfig()` returns object with validation 4. **Context Managers**: - Legacy: Manual connection management - Refactored: `async with Loader(config) as loader:` --- ## Testing ### Unit Tests (TODO) ```bash # Run tests pytest tests/test_refactory_scripts/ # Run with coverage pytest --cov=refactory_scripts tests/ ``` ### Manual Testing ```bash # Set log level export LOG_LEVEL=DEBUG # Test Hirpinia loader python -m refactory_scripts.loaders.hirpinia_loader /path/to/test.ods # Test with Python directly python3 << 'EOF' import asyncio from refactory_scripts.loaders import HirpiniaLoader from refactory_scripts.config import DatabaseConfig async def test(): db_config = DatabaseConfig() async with HirpiniaLoader(db_config) as loader: result = await loader.process_file("/path/to/file.ods") print(f"Result: {result}") asyncio.run(test()) EOF ``` --- ## Performance Considerations ### Async Benefits - **Non-blocking I/O**: Database operations don't block the event loop - **Concurrent Processing**: Multiple files can be processed simultaneously - **Better Resource Utilization**: CPU-bound operations can run during I/O waits ### Batch Operations - Use `execute_many()` for bulk inserts (faster than individual INSERT statements) - Example: Hirpinia loader processes all rows in one batch operation ### Connection Pooling When integrating with orchestrators, reuse connection pools: ```python # Don't create new connections in loops # ❌ Bad for file in files: async with HirpiniaLoader(db_config) as loader: await loader.process_file(file) # ✅ Good - reuse loader instance async with HirpiniaLoader(db_config) as loader: for file in files: await loader.process_file(file) ``` --- ## Future Enhancements ### Planned Improvements - [ ] Complete refactoring of `sorotecPini.py` - [ ] Complete refactoring of `TS_PiniScript.py` - [ ] Add unit tests with pytest - [ ] Add integration tests - [ ] Implement CSV parsing for Vulink loader - [ ] Add metrics and monitoring (Prometheus?) - [ ] Add data validation schemas (Pydantic?) - [ ] Implement retry policies for transient failures - [ ] Add dry-run mode for testing - [ ] Create CLI tool with argparse ### Potential Features - **Data Validation**: Use Pydantic models for input validation - **Metrics**: Track processing times, error rates, etc. - **Dead Letter Queue**: Handle permanently failed records - **Idempotency**: Ensure repeated processing is safe - **Streaming**: Process large files in chunks --- ## Contributing When adding new loaders: 1. Follow the existing pattern (async context manager) 2. Add comprehensive docstrings 3. Include type hints 4. Use the logging module 5. Add error handling with context 6. Update this README 7. Add unit tests --- ## Support For issues or questions: - Check logs with `LOG_LEVEL=DEBUG` - Review the legacy script comparison - Consult the main project documentation --- ## Version History ### v1.0.0 (2024-10-11) - Initial refactored implementation - HirpiniaLoader complete - VulinkLoader complete (pending CSV parsing) - SisgeoLoader complete - Base utilities and configuration management - Comprehensive documentation --- ## License Same as the main ASE project.