#!/usr/bin/env python3 """ Test script per verificare la migrazione da mysql-connector-python ad aiomysql. Questo script testa: 1. Connessione async al database con connetti_db_async() 2. Query semplice SELECT 3. Inserimento parametrizzato 4. Cleanup connessione Usage: python test_db_connection.py """ import asyncio import logging import sys from datetime import datetime from pathlib import Path # Add src directory to Python path src_path = Path(__file__).parent / "src" sys.path.insert(0, str(src_path)) # Setup logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Import custom modules try: from utils.config import loader_send_data as setting from utils.database.connection import connetti_db_async except ImportError as e: logger.error(f"Import error: {e}") logger.error("Make sure you're running from the project root directory") logger.error(f"Current directory: {Path.cwd()}") logger.error(f"Script directory: {Path(__file__).parent}") sys.exit(1) async def test_connection(): """Test basic async database connection.""" logger.info("=" * 60) logger.info("TEST 1: Basic Async Connection") logger.info("=" * 60) try: cfg = setting.Config() logger.info(f"Connecting to {cfg.dbhost}:{cfg.dbport} database={cfg.dbname}") conn = await connetti_db_async(cfg) logger.info("✅ Connection established successfully") # Test connection is valid async with conn.cursor() as cur: await cur.execute("SELECT 1 as test") result = await cur.fetchone() logger.info(f"✅ Test query result: {result}") conn.close() logger.info("✅ Connection closed successfully") return True except Exception as e: logger.error(f"❌ Connection test failed: {e}", exc_info=True) return False async def test_select_query(): """Test SELECT query with async connection.""" logger.info("\n" + "=" * 60) logger.info("TEST 2: SELECT Query Test") logger.info("=" * 60) try: cfg = setting.Config() conn = await connetti_db_async(cfg) async with conn.cursor() as cur: # Test query on received table await cur.execute(f"SELECT COUNT(*) as count FROM {cfg.dbrectable}") result = await cur.fetchone() count = result[0] if result else 0 logger.info(f"✅ Found {count} records in {cfg.dbrectable}") # Test query with LIMIT await cur.execute(f"SELECT id, filename, unit_name, tool_name FROM {cfg.dbrectable} LIMIT 5") results = await cur.fetchall() logger.info(f"✅ Retrieved {len(results)} sample records") for row in results[:3]: # Show first 3 logger.info(f" Record: id={row[0]}, file={row[1]}, unit={row[2]}, tool={row[3]}") conn.close() logger.info("✅ SELECT query test passed") return True except Exception as e: logger.error(f"❌ SELECT query test failed: {e}", exc_info=True) return False async def test_parameterized_query(): """Test parameterized query to verify SQL injection protection.""" logger.info("\n" + "=" * 60) logger.info("TEST 3: Parameterized Query Test") logger.info("=" * 60) try: cfg = setting.Config() conn = await connetti_db_async(cfg) async with conn.cursor() as cur: # Test with safe parameters test_id = 1 await cur.execute(f"SELECT id, filename FROM {cfg.dbrectable} WHERE id = %s", (test_id,)) result = await cur.fetchone() if result: logger.info(f"✅ Parameterized query returned: id={result[0]}, file={result[1]}") else: logger.info(f"✅ Parameterized query executed (no record with id={test_id})") # Test with potentially dangerous input (should be safe with parameters) dangerous_input = "1 OR 1=1" await cur.execute(f"SELECT COUNT(*) FROM {cfg.dbrectable} WHERE id = %s", (dangerous_input,)) result = await cur.fetchone() logger.info(f"✅ SQL injection test: query returned {result[0]} records (should be 0 or 1)") conn.close() logger.info("✅ Parameterized query test passed") return True except Exception as e: logger.error(f"❌ Parameterized query test failed: {e}", exc_info=True) return False async def test_autocommit(): """Test autocommit mode.""" logger.info("\n" + "=" * 60) logger.info("TEST 4: Autocommit Test") logger.info("=" * 60) try: cfg = setting.Config() conn = await connetti_db_async(cfg) # Verify autocommit is enabled logger.info(f"✅ Connection autocommit mode: {conn.get_autocommit()}") conn.close() logger.info("✅ Autocommit test passed") return True except Exception as e: logger.error(f"❌ Autocommit test failed: {e}", exc_info=True) return False async def test_connection_cleanup(): """Test connection cleanup with multiple connections.""" logger.info("\n" + "=" * 60) logger.info("TEST 5: Connection Cleanup Test") logger.info("=" * 60) try: cfg = setting.Config() connections = [] # Create multiple connections for i in range(5): conn = await connetti_db_async(cfg) connections.append(conn) logger.info(f" Created connection {i + 1}/5") # Close all connections for i, conn in enumerate(connections): conn.close() logger.info(f" Closed connection {i + 1}/5") logger.info("✅ Connection cleanup test passed") return True except Exception as e: logger.error(f"❌ Connection cleanup test failed: {e}", exc_info=True) return False async def test_error_handling(): """Test error handling with invalid queries.""" logger.info("\n" + "=" * 60) logger.info("TEST 6: Error Handling Test") logger.info("=" * 60) try: cfg = setting.Config() conn = await connetti_db_async(cfg) try: async with conn.cursor() as cur: # Try to execute invalid query await cur.execute("SELECT * FROM nonexistent_table_xyz") logger.error("❌ Invalid query should have raised an exception") return False except Exception as e: logger.info(f"✅ Invalid query correctly raised exception: {type(e).__name__}") # Verify connection is still usable after error async with conn.cursor() as cur: await cur.execute("SELECT 1") result = await cur.fetchone() logger.info(f"✅ Connection still usable after error: {result}") conn.close() logger.info("✅ Error handling test passed") return True except Exception as e: logger.error(f"❌ Error handling test failed: {e}", exc_info=True) return False async def main(): """Run all tests.""" logger.info("\n" + "=" * 60) logger.info("AIOMYSQL MIGRATION TEST SUITE") logger.info("=" * 60) logger.info(f"Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") tests = [ ("Connection Test", test_connection), ("SELECT Query Test", test_select_query), ("Parameterized Query Test", test_parameterized_query), ("Autocommit Test", test_autocommit), ("Connection Cleanup Test", test_connection_cleanup), ("Error Handling Test", test_error_handling), ] results = [] for test_name, test_func in tests: try: result = await test_func() results.append((test_name, result)) except Exception as e: logger.error(f"❌ {test_name} crashed: {e}") results.append((test_name, False)) # Summary logger.info("\n" + "=" * 60) logger.info("TEST SUMMARY") logger.info("=" * 60) passed = sum(1 for _, result in results if result) total = len(results) for test_name, result in results: status = "✅ PASS" if result else "❌ FAIL" logger.info(f"{status:10} | {test_name}") logger.info("=" * 60) logger.info(f"Results: {passed}/{total} tests passed") logger.info(f"End time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info("=" * 60) if passed == total: logger.info("\n🎉 All tests PASSED! Migration successful!") return 0 else: logger.error(f"\n⚠️ {total - passed} test(s) FAILED. Please review errors above.") return 1 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)