import unittest import os import sys from unittest.mock import patch, MagicMock, mock_open, call, ANY from hashlib import sha256 from pathlib import Path from types import SimpleNamespace # Used to create mock config objects # Add the parent directory to sys.path to allow importing FtpCsvReceiver # Adjust this path if your test file is located differently script_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(script_dir) # If FtpCsvReceiver.py is in the same directory as the test file, you might not need this # If it's in the parent directory (like /home/alex/devel/ASE/), use this: sys.path.insert(0, parent_dir) # Now import the components to test # We need to import AFTER modifying sys.path if necessary # Also, mock dependencies BEFORE importing the module that uses them # Mock mysql.connector BEFORE importing FtpCsvReceiver mock_mysql_connector = MagicMock() sys.modules['mysql.connector'] = mock_mysql_connector # Mock the custom utils modules as well if they aren't available in the test environment mock_utils_time = MagicMock() mock_utils_config = MagicMock() sys.modules['utils.time'] = mock_utils_time sys.modules['utils.config'] = mock_utils_config # Mock the setting.config() call specifically mock_config_instance = MagicMock() mock_utils_config.set_config.config.return_value = mock_config_instance # Mock pyftpdlib classes if needed for specific tests, but often mocking methods is enough # sys.modules['pyftpdlib.handlers'] = MagicMock() # sys.modules['pyftpdlib.authorizers'] = MagicMock() # sys.modules['pyftpdlib.servers'] = MagicMock() # Import the module AFTER mocking dependencies import ftp_csv_receiver from ftp_csv_receiver import ( extract_value, DummySha256Authorizer, ASEHandler, conn_db, # Import even though we mock mysql.connector ) # --- Test Configuration Setup --- def create_mock_cfg(): """Creates a mock configuration object for testing.""" cfg = SimpleNamespace() cfg.adminuser = ['admin', sha256(b'adminpass').hexdigest(), '/fake/admin/path', 'elradfmwMT'] cfg.dbhost = 'mockhost' cfg.dbport = 3306 cfg.dbuser = 'mockuser' cfg.dbpass = 'mockpass' cfg.dbname = 'mockdb' cfg.dbusertable = 'mock_virtusers' cfg.dbrectable = 'mock_received' cfg.virtpath = '/fake/ftp/root/' cfg.defperm = 'elmw' cfg.fileext = ['.CSV', '.TXT'] # Add patterns as lists of strings cfg.units_name = [r'ID\d{4}', r'IX\d{4}'] cfg.units_type = [r'G801', r'G201'] cfg.tools_name = [r'LOC\d{4}', r'DT\d{4}'] cfg.tools_type = [r'MUX', r'MUMS'] # Add other necessary config values cfg.logfilename = 'test_ftp.log' cfg.proxyaddr = '0.0.0.0' cfg.firstport = 40000 cfg.portrangewidth = 10 return cfg # --- Test Cases --- class TestExtractValue(unittest.TestCase): def test_extract_from_primary(self): patterns = [r'ID(\d+)'] primary = "File_ID1234_data.csv" secondary = "Some other text" self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") def test_extract_from_secondary(self): patterns = [r'Type(A|B)'] primary = "Filename_without_type.txt" secondary = "Log data: TypeB found" self.assertEqual(extract_value(patterns, primary, secondary), "TypeB") def test_no_match(self): patterns = [r'XYZ\d+'] primary = "File_ID1234_data.csv" secondary = "Log data: TypeB found" self.assertEqual(extract_value(patterns, primary, secondary, default="NotFound"), "NotFound") def test_case_insensitive(self): patterns = [r'id(\d+)'] primary = "File_ID1234_data.csv" secondary = "Some other text" self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") # Note: re.findall captures original case def test_multiple_patterns(self): patterns = [r'Type(A|B)', r'ID(\d+)'] primary = "File_ID1234_data.csv" secondary = "Log data: TypeB found" # Should match the first pattern found in the primary source self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") def test_multiple_patterns_secondary_match(self): patterns = [r'XYZ\d+', r'Type(A|B)'] primary = "File_ID1234_data.csv" secondary = "Log data: TypeB found" # Should match the second pattern in the secondary source self.assertEqual(extract_value(patterns, primary, secondary), "TypeB") class TestDummySha256Authorizer(unittest.TestCase): def setUp(self): self.mock_cfg = create_mock_cfg() # Mock the database connection and cursor self.mock_conn = MagicMock() self.mock_cursor = MagicMock() mock_mysql_connector.connect.return_value = self.mock_conn self.mock_conn.cursor.return_value = self.mock_cursor @patch('FtpCsvReceiver.Path') # Mock Path object def test_init_loads_users(self, mock_path_constructor): # Mock Path instance methods mock_path_instance = MagicMock() mock_path_constructor.return_value = mock_path_instance # Simulate database result db_users = [ ('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr'), ('user2', sha256(b'pass2').hexdigest(), '/fake/ftp/root/user2', 'elmw'), ] self.mock_cursor.fetchall.return_value = db_users authorizer = DummySha256Authorizer(self.mock_cfg) # Verify DB connection mock_mysql_connector.connect.assert_called_once_with( user=self.mock_cfg.dbuser, password=self.mock_cfg.dbpass, host=self.mock_cfg.dbhost, port=self.mock_cfg.dbport ) # Verify query self.mock_cursor.execute.assert_called_once_with( f'SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL' ) # Verify admin user added self.assertIn('admin', authorizer.user_table) self.assertEqual(authorizer.user_table['admin']['pwd'], self.mock_cfg.adminuser[1]) # Verify DB users added self.assertIn('user1', authorizer.user_table) self.assertEqual(authorizer.user_table['user1']['pwd'], db_users[0][1]) self.assertEqual(authorizer.user_table['user1']['home'], db_users[0][2]) self.assertEqual(authorizer.user_table['user1']['perm'], db_users[0][3]) self.assertIn('user2', authorizer.user_table) # Verify directories were "created" expected_path_calls = [ call(self.mock_cfg.virtpath + 'user1'), call(self.mock_cfg.virtpath + 'user2'), ] mock_path_constructor.assert_has_calls(expected_path_calls, any_order=True) self.assertEqual(mock_path_instance.mkdir.call_count, 2) mock_path_instance.mkdir.assert_called_with(parents=True, exist_ok=True) @patch('FtpCsvReceiver.Path') def test_init_mkdir_exception(self, mock_path_constructor): # Simulate database result db_users = [('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr')] self.mock_cursor.fetchall.return_value = db_users # Mock Path to raise an exception mock_path_instance = MagicMock() mock_path_constructor.return_value = mock_path_instance mock_path_instance.mkdir.side_effect = OSError("Permission denied") # We expect initialization to continue, but maybe log an error (though the code uses self.responde which isn't available here) # For a unit test, we just check that the user is still added authorizer = DummySha256Authorizer(self.mock_cfg) self.assertIn('user1', authorizer.user_table) mock_path_instance.mkdir.assert_called_once() def test_validate_authentication_success(self): self.mock_cursor.fetchall.return_value = [] # No DB users for simplicity authorizer = DummySha256Authorizer(self.mock_cfg) # Test admin user authorizer.validate_authentication('admin', 'adminpass', None) # Handler not used in this method def test_validate_authentication_wrong_password(self): self.mock_cursor.fetchall.return_value = [] authorizer = DummySha256Authorizer(self.mock_cfg) with self.assertRaises(ftp_csv_receiver.AuthenticationFailed): authorizer.validate_authentication('admin', 'wrongpass', None) def test_validate_authentication_unknown_user(self): self.mock_cursor.fetchall.return_value = [] authorizer = DummySha256Authorizer(self.mock_cfg) with self.assertRaises(ftp_csv_receiver.AuthenticationFailed): authorizer.validate_authentication('unknown', 'somepass', None) class TestASEHandler(unittest.TestCase): def setUp(self): self.mock_cfg = create_mock_cfg() self.mock_conn = MagicMock() # Mock FTP connection object self.mock_server = MagicMock() # Mock FTP server object self.mock_authorizer = MagicMock(spec=DummySha256Authorizer) # Mock authorizer # Instantiate the handler # We need to manually set cfg and authorizer as done in main() self.handler = ASEHandler(self.mock_conn, self.mock_server) self.handler.cfg = self.mock_cfg self.handler.authorizer = self.mock_authorizer self.handler.respond = MagicMock() # Mock the respond method self.handler.push = MagicMock() # Mock the push method # Mock database for handler methods self.mock_db_conn = MagicMock() self.mock_db_cursor = MagicMock() # Patch conn_db globally for this test class self.patcher_conn_db = patch('FtpCsvReceiver.conn_db', return_value=self.mock_db_conn) self.mock_conn_db = self.patcher_conn_db.start() self.mock_db_conn.cursor.return_value = self.mock_db_cursor # Mock logging self.patcher_logging = patch('FtpCsvReceiver.logging') self.mock_logging = self.patcher_logging.start() def tearDown(self): # Stop the patchers self.patcher_conn_db.stop() self.patcher_logging.stop() # Reset mocks if needed between tests (though setUp does this) mock_mysql_connector.reset_mock() @patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV')) @patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV')) @patch('FtpCsvReceiver.os.stat') @patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3') @patch('FtpCsvReceiver.os.remove') @patch('FtpCsvReceiver.extract_value') # Mock extract_value for focused testing def test_on_file_received_success(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split): mock_os_stat.return_value.st_size = 100 # Non-empty file test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV' # Setup mock return values for extract_value mock_extract.side_effect = ['ID1234', 'G801', 'LOC5678', 'MUX'] self.handler.on_file_received(test_file_path) # Verify file stats checked mock_os_stat.assert_called_once_with(test_file_path) # Verify file opened mock_file_open.assert_called_once_with(test_file_path, 'r') # Verify path splitting mock_split.assert_called_once_with(test_file_path) mock_splitext.assert_called_once_with('ID1234_data.CSV') # Verify extract_value calls expected_extract_calls = [ call(self.mock_cfg.units_name, 'ID1234_data', ANY), # ANY for the lines string call(self.mock_cfg.units_type, 'ID1234_data', ANY), call(self.mock_cfg.tools_name, 'ID1234_data', ANY), call(self.mock_cfg.tools_type, 'ID1234_data', ANY), ] mock_extract.assert_has_calls(expected_extract_calls) # Verify DB connection self.mock_conn_db.assert_called_once_with(self.mock_cfg) # Verify DB insert expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)" expected_data = ('ID1234_data', 'ID1234', 'G801', 'LOC5678', 'MUX', 'G801,col2,col3\nval1,val2,val3') self.mock_db_cursor.execute.assert_called_once_with(expected_sql, expected_data) self.mock_db_conn.commit.assert_called_once() self.mock_db_conn.close.assert_called_once() # Verify file removed mock_os_remove.assert_called_once_with(test_file_path) # Verify logging self.mock_logging.info.assert_called_with(f'File {test_file_path} loaded: removed.') @patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'data.WRONGEXT')) @patch('FtpCsvReceiver.os.path.splitext', return_value=('data', '.WRONGEXT')) @patch('FtpCsvReceiver.os.stat') @patch('FtpCsvReceiver.os.remove') def test_on_file_received_wrong_extension(self, mock_os_remove, mock_os_stat, mock_splitext, mock_split): mock_os_stat.return_value.st_size = 100 test_file_path = '/fake/ftp/root/user1/data.WRONGEXT' self.handler.on_file_received(test_file_path) # Verify only stat, split, and splitext were called mock_os_stat.assert_called_once_with(test_file_path) mock_split.assert_called_once_with(test_file_path) mock_splitext.assert_called_once_with('data.WRONGEXT') # Verify DB, open, remove were NOT called self.mock_conn_db.assert_not_called() mock_os_remove.assert_not_called() self.mock_logging.info.assert_not_called() # No logging in this path @patch('FtpCsvReceiver.os.stat') @patch('FtpCsvReceiver.os.remove') def test_on_file_received_empty_file(self, mock_os_remove, mock_os_stat): mock_os_stat.return_value.st_size = 0 # Empty file test_file_path = '/fake/ftp/root/user1/empty.CSV' self.handler.on_file_received(test_file_path) # Verify stat called mock_os_stat.assert_called_once_with(test_file_path) # Verify file removed mock_os_remove.assert_called_once_with(test_file_path) # Verify logging self.mock_logging.info.assert_called_with(f'File {test_file_path} was empty: removed.') # Verify DB not called self.mock_conn_db.assert_not_called() @patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV')) @patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV')) @patch('FtpCsvReceiver.os.stat') @patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3') @patch('FtpCsvReceiver.os.remove') @patch('FtpCsvReceiver.extract_value', side_effect=['ID1234', 'G801', 'LOC5678', 'MUX']) def test_on_file_received_db_error(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split): mock_os_stat.return_value.st_size = 100 test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV' db_error = Exception("DB connection failed") self.mock_db_cursor.execute.side_effect = db_error # Simulate DB error self.handler.on_file_received(test_file_path) # Verify DB interaction attempted self.mock_conn_db.assert_called_once_with(self.mock_cfg) self.mock_db_cursor.execute.assert_called_once() # Verify commit/close not called after error self.mock_db_conn.commit.assert_not_called() self.mock_db_conn.close.assert_not_called() # Should close be called in finally? Original code doesn't. # Verify file was NOT removed mock_os_remove.assert_not_called() # Verify error logging self.mock_logging.error.assert_any_call(f'File {test_file_path} not loaded. Held in user path.') self.mock_logging.error.assert_any_call(f'{db_error}') @patch('FtpCsvReceiver.os.remove') def test_on_incomplete_file_received(self, mock_os_remove): test_file_path = '/fake/ftp/root/user1/incomplete.part' self.handler.on_incomplete_file_received(test_file_path) mock_os_remove.assert_called_once_with(test_file_path) @patch('FtpCsvReceiver.Path') @patch('FtpCsvReceiver.os.path.basename', return_value='newuser') def test_ftp_SITE_ADDU_success(self, mock_basename, mock_path_constructor): mock_path_instance = MagicMock() mock_path_constructor.return_value = mock_path_instance password = 'newpassword' expected_hash = sha256(password.encode("UTF-8")).hexdigest() expected_home = self.mock_cfg.virtpath + 'newuser' self.handler.ftp_SITE_ADDU(f'newuser {password}') # Verify path creation mock_path_constructor.assert_called_once_with(expected_home) mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True) # Verify authorizer call self.handler.authorizer.add_user.assert_called_once_with( 'newuser', expected_hash, expected_home + '/', perm=self.mock_cfg.defperm # Note: Original code adds trailing slash here ) # Verify DB interaction self.mock_conn_db.assert_called_once_with(self.mock_cfg) expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('newuser', '{expected_hash}', '{expected_home}', '{self.mock_cfg.defperm}')" self.mock_db_cursor.execute.assert_called_once_with(expected_sql) self.mock_db_conn.commit.assert_called_once() self.mock_db_conn.close.assert_called_once() # Verify response self.handler.respond.assert_called_once_with('200 SITE ADDU successful.') # Verify logging self.mock_logging.info.assert_called_with('User newuser created.') def test_ftp_SITE_ADDU_missing_args(self): self.handler.ftp_SITE_ADDU('newuser') # Missing password self.handler.respond.assert_called_once_with('501 SITE ADDU failed. Command needs 2 arguments') self.handler.authorizer.add_user.assert_not_called() self.mock_conn_db.assert_not_called() @patch('FtpCsvReceiver.Path') @patch('FtpCsvReceiver.os.path.basename', return_value='newuser') def test_ftp_SITE_ADDU_mkdir_error(self, mock_basename, mock_path_constructor): mock_path_instance = MagicMock() mock_path_constructor.return_value = mock_path_instance error = OSError("Cannot create dir") mock_path_instance.mkdir.side_effect = error self.handler.ftp_SITE_ADDU('newuser newpassword') self.handler.respond.assert_called_once_with(f'551 Error in create virtual user path: {error}') self.handler.authorizer.add_user.assert_not_called() self.mock_conn_db.assert_not_called() @patch('FtpCsvReceiver.Path') @patch('FtpCsvReceiver.os.path.basename', return_value='newuser') def test_ftp_SITE_ADDU_db_error(self, mock_basename, mock_path_constructor): mock_path_instance = MagicMock() mock_path_constructor.return_value = mock_path_instance error = Exception("DB insert failed") self.mock_db_cursor.execute.side_effect = error self.handler.ftp_SITE_ADDU('newuser newpassword') # Verify mkdir called mock_path_instance.mkdir.assert_called_once() # Verify authorizer called (happens before DB) self.handler.authorizer.add_user.assert_called_once() # Verify DB interaction attempted self.mock_conn_db.assert_called_once() self.mock_db_cursor.execute.assert_called_once() # Verify response self.handler.respond.assert_called_once_with(f'501 SITE ADDU failed: {error}.') @patch('FtpCsvReceiver.os.path.basename', return_value='olduser') def test_ftp_SITE_DELU_success(self, mock_basename): self.handler.ftp_SITE_DELU('olduser') # Verify authorizer call self.handler.authorizer.remove_user.assert_called_once_with('olduser') # Verify DB interaction self.mock_conn_db.assert_called_once_with(self.mock_cfg) expected_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = now() WHERE ftpuser = 'olduser'" self.mock_db_cursor.execute.assert_called_once_with(expected_sql) self.mock_db_conn.commit.assert_called_once() self.mock_db_conn.close.assert_called_once() # Verify response self.handler.respond.assert_called_once_with('200 SITE DELU successful.') # Verify logging self.mock_logging.info.assert_called_with('User olduser deleted.') @patch('FtpCsvReceiver.os.path.basename', return_value='olduser') def test_ftp_SITE_DELU_error(self, mock_basename): error = Exception("DB update failed") self.mock_db_cursor.execute.side_effect = error self.handler.ftp_SITE_DELU('olduser') # Verify authorizer call (happens first) self.handler.authorizer.remove_user.assert_called_once_with('olduser') # Verify DB interaction attempted self.mock_conn_db.assert_called_once() self.mock_db_cursor.execute.assert_called_once() # Verify response self.handler.respond.assert_called_once_with('501 SITE DELU failed.') @patch('FtpCsvReceiver.Path') @patch('FtpCsvReceiver.os.path.basename', return_value='restoreme') def test_ftp_SITE_RESU_success(self, mock_basename, mock_path_constructor): mock_path_instance = MagicMock() mock_path_constructor.return_value = mock_path_instance user_data = ('restoreme', 'somehash', '/fake/ftp/root/restoreme', 'elmw') self.mock_db_cursor.fetchone.return_value = user_data self.handler.ftp_SITE_RESU('restoreme') # Verify DB interaction self.mock_conn_db.assert_called_once_with(self.mock_cfg) expected_update_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = null WHERE ftpuser = 'restoreme'" expected_select_sql = f"SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE ftpuser = 'restoreme'" expected_db_calls = [ call(expected_update_sql), call(expected_select_sql) ] self.mock_db_cursor.execute.assert_has_calls(expected_db_calls) self.mock_db_conn.commit.assert_called_once() # For the update self.mock_db_cursor.fetchone.assert_called_once() # Verify authorizer call self.handler.authorizer.add_user.assert_called_once_with(*user_data) # Verify path creation mock_path_constructor.assert_called_once_with(self.mock_cfg.virtpath + 'restoreme') mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True) # Verify DB close self.mock_db_conn.close.assert_called_once() # Verify response self.handler.respond.assert_called_once_with('200 SITE RESU successful.') # Verify logging self.mock_logging.info.assert_called_with('User restoreme restored.') @patch('FtpCsvReceiver.os.path.basename', return_value='restoreme') def test_ftp_SITE_RESU_db_error(self, mock_basename): error = Exception("DB fetch failed") # Simulate error on the SELECT statement self.mock_db_cursor.execute.side_effect = [None, error] # First call (UPDATE) ok, second (SELECT) fails self.handler.ftp_SITE_RESU('restoreme') # Verify DB interaction attempted self.mock_conn_db.assert_called_once() self.assertEqual(self.mock_db_cursor.execute.call_count, 2) # Both UPDATE and SELECT attempted self.mock_db_conn.commit.assert_called_once() # Commit for UPDATE happened # Verify response self.handler.respond.assert_called_once_with('501 SITE RESU failed.') # Verify authorizer not called, mkdir not called self.handler.authorizer.add_user.assert_not_called() def test_ftp_SITE_LSTU_success(self): user_list_data = [ ('userA', 'elr'), ('userB', 'elmw'), ] self.mock_db_cursor.fetchall.return_value = user_list_data self.handler.ftp_SITE_LSTU('') # No argument needed # Verify DB interaction self.mock_conn_db.assert_called_once_with(self.mock_cfg) expected_sql = f'SELECT ftpuser, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL ' self.mock_db_cursor.execute.assert_called_once_with(expected_sql) self.mock_db_cursor.fetchall.assert_called_once() # Verify push calls expected_push_calls = [ call("214-The following virtual users are defined:\r\n"), call('Username: userA\tPerms: elr\r\nUsername: userB\tPerms: elmw\r\n') ] self.handler.push.assert_has_calls(expected_push_calls) # Verify final response self.handler.respond.assert_called_once_with("214 LSTU SITE command successful.") def test_ftp_SITE_LSTU_db_error(self): error = Exception("DB select failed") self.mock_db_cursor.execute.side_effect = error self.handler.ftp_SITE_LSTU('') # Verify DB interaction attempted self.mock_conn_db.assert_called_once() self.mock_db_cursor.execute.assert_called_once() # Verify response self.handler.respond.assert_called_once_with(f'501 list users failed: {error}') # Verify push not called self.handler.push.assert_not_called() if __name__ == '__main__': unittest.main(argv=['first-arg-is-ignored'], exit=False)