prove
This commit is contained in:
546
test/test_ftp_csv_receicer.py
Normal file
546
test/test_ftp_csv_receicer.py
Normal file
@@ -0,0 +1,546 @@
|
||||
import unittest
|
||||
import os
|
||||
import sys
|
||||
from unittest.mock import patch, MagicMock, mock_open, call, ANY
|
||||
from hashlib import sha256
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace # Used to create mock config objects
|
||||
|
||||
# Add the parent directory to sys.path to allow importing FtpCsvReceiver
|
||||
# Adjust this path if your test file is located differently
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.dirname(script_dir)
|
||||
# If FtpCsvReceiver.py is in the same directory as the test file, you might not need this
|
||||
# If it's in the parent directory (like /home/alex/devel/ASE/), use this:
|
||||
sys.path.insert(0, parent_dir)
|
||||
|
||||
# Now import the components to test
|
||||
# We need to import AFTER modifying sys.path if necessary
|
||||
# Also, mock dependencies BEFORE importing the module that uses them
|
||||
# Mock mysql.connector BEFORE importing FtpCsvReceiver
|
||||
mock_mysql_connector = MagicMock()
|
||||
sys.modules['mysql.connector'] = mock_mysql_connector
|
||||
|
||||
# Mock the custom utils modules as well if they aren't available in the test environment
|
||||
mock_utils_time = MagicMock()
|
||||
mock_utils_config = MagicMock()
|
||||
sys.modules['utils.time'] = mock_utils_time
|
||||
sys.modules['utils.config'] = mock_utils_config
|
||||
# Mock the setting.config() call specifically
|
||||
mock_config_instance = MagicMock()
|
||||
mock_utils_config.set_config.config.return_value = mock_config_instance
|
||||
|
||||
# Mock pyftpdlib classes if needed for specific tests, but often mocking methods is enough
|
||||
# sys.modules['pyftpdlib.handlers'] = MagicMock()
|
||||
# sys.modules['pyftpdlib.authorizers'] = MagicMock()
|
||||
# sys.modules['pyftpdlib.servers'] = MagicMock()
|
||||
|
||||
# Import the module AFTER mocking dependencies
|
||||
import FtpCsvReceiver
|
||||
from FtpCsvReceiver import (
|
||||
extract_value,
|
||||
DummySha256Authorizer,
|
||||
ASEHandler,
|
||||
conn_db, # Import even though we mock mysql.connector
|
||||
)
|
||||
|
||||
# --- Test Configuration Setup ---
|
||||
def create_mock_cfg():
|
||||
"""Creates a mock configuration object for testing."""
|
||||
cfg = SimpleNamespace()
|
||||
cfg.adminuser = ['admin', sha256(b'adminpass').hexdigest(), '/fake/admin/path', 'elradfmwMT']
|
||||
cfg.dbhost = 'mockhost'
|
||||
cfg.dbport = 3306
|
||||
cfg.dbuser = 'mockuser'
|
||||
cfg.dbpass = 'mockpass'
|
||||
cfg.dbname = 'mockdb'
|
||||
cfg.dbusertable = 'mock_virtusers'
|
||||
cfg.dbrectable = 'mock_received'
|
||||
cfg.virtpath = '/fake/ftp/root/'
|
||||
cfg.defperm = 'elmw'
|
||||
cfg.fileext = ['.CSV', '.TXT']
|
||||
# Add patterns as lists of strings
|
||||
cfg.units_name = [r'ID\d{4}', r'IX\d{4}']
|
||||
cfg.units_type = [r'G801', r'G201']
|
||||
cfg.tools_name = [r'LOC\d{4}', r'DT\d{4}']
|
||||
cfg.tools_type = [r'MUX', r'MUMS']
|
||||
# Add other necessary config values
|
||||
cfg.logfilename = 'test_ftp.log'
|
||||
cfg.proxyaddr = '0.0.0.0'
|
||||
cfg.firstport = 40000
|
||||
cfg.portrangewidth = 10
|
||||
return cfg
|
||||
|
||||
# --- Test Cases ---
|
||||
|
||||
class TestExtractValue(unittest.TestCase):
|
||||
|
||||
def test_extract_from_primary(self):
|
||||
patterns = [r'ID(\d+)']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Some other text"
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234")
|
||||
|
||||
def test_extract_from_secondary(self):
|
||||
patterns = [r'Type(A|B)']
|
||||
primary = "Filename_without_type.txt"
|
||||
secondary = "Log data: TypeB found"
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "TypeB")
|
||||
|
||||
def test_no_match(self):
|
||||
patterns = [r'XYZ\d+']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Log data: TypeB found"
|
||||
self.assertEqual(extract_value(patterns, primary, secondary, default="NotFound"), "NotFound")
|
||||
|
||||
def test_case_insensitive(self):
|
||||
patterns = [r'id(\d+)']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Some other text"
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") # Note: re.findall captures original case
|
||||
|
||||
def test_multiple_patterns(self):
|
||||
patterns = [r'Type(A|B)', r'ID(\d+)']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Log data: TypeB found"
|
||||
# Should match the first pattern found in the primary source
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234")
|
||||
|
||||
def test_multiple_patterns_secondary_match(self):
|
||||
patterns = [r'XYZ\d+', r'Type(A|B)']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Log data: TypeB found"
|
||||
# Should match the second pattern in the secondary source
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "TypeB")
|
||||
|
||||
|
||||
class TestDummySha256Authorizer(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mock_cfg = create_mock_cfg()
|
||||
# Mock the database connection and cursor
|
||||
self.mock_conn = MagicMock()
|
||||
self.mock_cursor = MagicMock()
|
||||
mock_mysql_connector.connect.return_value = self.mock_conn
|
||||
self.mock_conn.cursor.return_value = self.mock_cursor
|
||||
|
||||
@patch('FtpCsvReceiver.Path') # Mock Path object
|
||||
def test_init_loads_users(self, mock_path_constructor):
|
||||
# Mock Path instance methods
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
|
||||
# Simulate database result
|
||||
db_users = [
|
||||
('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr'),
|
||||
('user2', sha256(b'pass2').hexdigest(), '/fake/ftp/root/user2', 'elmw'),
|
||||
]
|
||||
self.mock_cursor.fetchall.return_value = db_users
|
||||
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
|
||||
# Verify DB connection
|
||||
mock_mysql_connector.connect.assert_called_once_with(
|
||||
user=self.mock_cfg.dbuser, password=self.mock_cfg.dbpass,
|
||||
host=self.mock_cfg.dbhost, port=self.mock_cfg.dbport
|
||||
)
|
||||
# Verify query
|
||||
self.mock_cursor.execute.assert_called_once_with(
|
||||
f'SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL'
|
||||
)
|
||||
# Verify admin user added
|
||||
self.assertIn('admin', authorizer.user_table)
|
||||
self.assertEqual(authorizer.user_table['admin']['pwd'], self.mock_cfg.adminuser[1])
|
||||
# Verify DB users added
|
||||
self.assertIn('user1', authorizer.user_table)
|
||||
self.assertEqual(authorizer.user_table['user1']['pwd'], db_users[0][1])
|
||||
self.assertEqual(authorizer.user_table['user1']['home'], db_users[0][2])
|
||||
self.assertEqual(authorizer.user_table['user1']['perm'], db_users[0][3])
|
||||
self.assertIn('user2', authorizer.user_table)
|
||||
# Verify directories were "created"
|
||||
expected_path_calls = [
|
||||
call(self.mock_cfg.virtpath + 'user1'),
|
||||
call(self.mock_cfg.virtpath + 'user2'),
|
||||
]
|
||||
mock_path_constructor.assert_has_calls(expected_path_calls, any_order=True)
|
||||
self.assertEqual(mock_path_instance.mkdir.call_count, 2)
|
||||
mock_path_instance.mkdir.assert_called_with(parents=True, exist_ok=True)
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
def test_init_mkdir_exception(self, mock_path_constructor):
|
||||
# Simulate database result
|
||||
db_users = [('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr')]
|
||||
self.mock_cursor.fetchall.return_value = db_users
|
||||
|
||||
# Mock Path to raise an exception
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
mock_path_instance.mkdir.side_effect = OSError("Permission denied")
|
||||
|
||||
# We expect initialization to continue, but maybe log an error (though the code uses self.responde which isn't available here)
|
||||
# For a unit test, we just check that the user is still added
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
self.assertIn('user1', authorizer.user_table)
|
||||
mock_path_instance.mkdir.assert_called_once()
|
||||
|
||||
|
||||
def test_validate_authentication_success(self):
|
||||
self.mock_cursor.fetchall.return_value = [] # No DB users for simplicity
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
# Test admin user
|
||||
authorizer.validate_authentication('admin', 'adminpass', None) # Handler not used in this method
|
||||
|
||||
def test_validate_authentication_wrong_password(self):
|
||||
self.mock_cursor.fetchall.return_value = []
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
with self.assertRaises(FtpCsvReceiver.AuthenticationFailed):
|
||||
authorizer.validate_authentication('admin', 'wrongpass', None)
|
||||
|
||||
def test_validate_authentication_unknown_user(self):
|
||||
self.mock_cursor.fetchall.return_value = []
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
with self.assertRaises(FtpCsvReceiver.AuthenticationFailed):
|
||||
authorizer.validate_authentication('unknown', 'somepass', None)
|
||||
|
||||
|
||||
class TestASEHandler(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mock_cfg = create_mock_cfg()
|
||||
self.mock_conn = MagicMock() # Mock FTP connection object
|
||||
self.mock_server = MagicMock() # Mock FTP server object
|
||||
self.mock_authorizer = MagicMock(spec=DummySha256Authorizer) # Mock authorizer
|
||||
|
||||
# Instantiate the handler
|
||||
# We need to manually set cfg and authorizer as done in main()
|
||||
self.handler = ASEHandler(self.mock_conn, self.mock_server)
|
||||
self.handler.cfg = self.mock_cfg
|
||||
self.handler.authorizer = self.mock_authorizer
|
||||
self.handler.respond = MagicMock() # Mock the respond method
|
||||
self.handler.push = MagicMock() # Mock the push method
|
||||
|
||||
# Mock database for handler methods
|
||||
self.mock_db_conn = MagicMock()
|
||||
self.mock_db_cursor = MagicMock()
|
||||
# Patch conn_db globally for this test class
|
||||
self.patcher_conn_db = patch('FtpCsvReceiver.conn_db', return_value=self.mock_db_conn)
|
||||
self.mock_conn_db = self.patcher_conn_db.start()
|
||||
self.mock_db_conn.cursor.return_value = self.mock_db_cursor
|
||||
|
||||
# Mock logging
|
||||
self.patcher_logging = patch('FtpCsvReceiver.logging')
|
||||
self.mock_logging = self.patcher_logging.start()
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# Stop the patchers
|
||||
self.patcher_conn_db.stop()
|
||||
self.patcher_logging.stop()
|
||||
# Reset mocks if needed between tests (though setUp does this)
|
||||
mock_mysql_connector.reset_mock()
|
||||
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV'))
|
||||
@patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV'))
|
||||
@patch('FtpCsvReceiver.os.stat')
|
||||
@patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3')
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
@patch('FtpCsvReceiver.extract_value') # Mock extract_value for focused testing
|
||||
def test_on_file_received_success(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split):
|
||||
mock_os_stat.return_value.st_size = 100 # Non-empty file
|
||||
test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV'
|
||||
|
||||
# Setup mock return values for extract_value
|
||||
mock_extract.side_effect = ['ID1234', 'G801', 'LOC5678', 'MUX']
|
||||
|
||||
self.handler.on_file_received(test_file_path)
|
||||
|
||||
# Verify file stats checked
|
||||
mock_os_stat.assert_called_once_with(test_file_path)
|
||||
# Verify file opened
|
||||
mock_file_open.assert_called_once_with(test_file_path, 'r')
|
||||
# Verify path splitting
|
||||
mock_split.assert_called_once_with(test_file_path)
|
||||
mock_splitext.assert_called_once_with('ID1234_data.CSV')
|
||||
# Verify extract_value calls
|
||||
expected_extract_calls = [
|
||||
call(self.mock_cfg.units_name, 'ID1234_data', ANY), # ANY for the lines string
|
||||
call(self.mock_cfg.units_type, 'ID1234_data', ANY),
|
||||
call(self.mock_cfg.tools_name, 'ID1234_data', ANY),
|
||||
call(self.mock_cfg.tools_type, 'ID1234_data', ANY),
|
||||
]
|
||||
mock_extract.assert_has_calls(expected_extract_calls)
|
||||
# Verify DB connection
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
# Verify DB insert
|
||||
expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)"
|
||||
expected_data = ('ID1234_data', 'ID1234', 'G801', 'LOC5678', 'MUX', 'G801,col2,col3\nval1,val2,val3')
|
||||
self.mock_db_cursor.execute.assert_called_once_with(expected_sql, expected_data)
|
||||
self.mock_db_conn.commit.assert_called_once()
|
||||
self.mock_db_conn.close.assert_called_once()
|
||||
# Verify file removed
|
||||
mock_os_remove.assert_called_once_with(test_file_path)
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with(f'File {test_file_path} loaded: removed.')
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'data.WRONGEXT'))
|
||||
@patch('FtpCsvReceiver.os.path.splitext', return_value=('data', '.WRONGEXT'))
|
||||
@patch('FtpCsvReceiver.os.stat')
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
def test_on_file_received_wrong_extension(self, mock_os_remove, mock_os_stat, mock_splitext, mock_split):
|
||||
mock_os_stat.return_value.st_size = 100
|
||||
test_file_path = '/fake/ftp/root/user1/data.WRONGEXT'
|
||||
|
||||
self.handler.on_file_received(test_file_path)
|
||||
|
||||
# Verify only stat, split, and splitext were called
|
||||
mock_os_stat.assert_called_once_with(test_file_path)
|
||||
mock_split.assert_called_once_with(test_file_path)
|
||||
mock_splitext.assert_called_once_with('data.WRONGEXT')
|
||||
# Verify DB, open, remove were NOT called
|
||||
self.mock_conn_db.assert_not_called()
|
||||
mock_os_remove.assert_not_called()
|
||||
self.mock_logging.info.assert_not_called() # No logging in this path
|
||||
|
||||
@patch('FtpCsvReceiver.os.stat')
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
def test_on_file_received_empty_file(self, mock_os_remove, mock_os_stat):
|
||||
mock_os_stat.return_value.st_size = 0 # Empty file
|
||||
test_file_path = '/fake/ftp/root/user1/empty.CSV'
|
||||
|
||||
self.handler.on_file_received(test_file_path)
|
||||
|
||||
# Verify stat called
|
||||
mock_os_stat.assert_called_once_with(test_file_path)
|
||||
# Verify file removed
|
||||
mock_os_remove.assert_called_once_with(test_file_path)
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with(f'File {test_file_path} was empty: removed.')
|
||||
# Verify DB not called
|
||||
self.mock_conn_db.assert_not_called()
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV'))
|
||||
@patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV'))
|
||||
@patch('FtpCsvReceiver.os.stat')
|
||||
@patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3')
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
@patch('FtpCsvReceiver.extract_value', side_effect=['ID1234', 'G801', 'LOC5678', 'MUX'])
|
||||
def test_on_file_received_db_error(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split):
|
||||
mock_os_stat.return_value.st_size = 100
|
||||
test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV'
|
||||
db_error = Exception("DB connection failed")
|
||||
self.mock_db_cursor.execute.side_effect = db_error # Simulate DB error
|
||||
|
||||
self.handler.on_file_received(test_file_path)
|
||||
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
self.mock_db_cursor.execute.assert_called_once()
|
||||
# Verify commit/close not called after error
|
||||
self.mock_db_conn.commit.assert_not_called()
|
||||
self.mock_db_conn.close.assert_not_called() # Should close be called in finally? Original code doesn't.
|
||||
# Verify file was NOT removed
|
||||
mock_os_remove.assert_not_called()
|
||||
# Verify error logging
|
||||
self.mock_logging.error.assert_any_call(f'File {test_file_path} not loaded. Held in user path.')
|
||||
self.mock_logging.error.assert_any_call(f'{db_error}')
|
||||
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
def test_on_incomplete_file_received(self, mock_os_remove):
|
||||
test_file_path = '/fake/ftp/root/user1/incomplete.part'
|
||||
self.handler.on_incomplete_file_received(test_file_path)
|
||||
mock_os_remove.assert_called_once_with(test_file_path)
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
|
||||
def test_ftp_SITE_ADDU_success(self, mock_basename, mock_path_constructor):
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
password = 'newpassword'
|
||||
expected_hash = sha256(password.encode("UTF-8")).hexdigest()
|
||||
expected_home = self.mock_cfg.virtpath + 'newuser'
|
||||
|
||||
self.handler.ftp_SITE_ADDU(f'newuser {password}')
|
||||
|
||||
# Verify path creation
|
||||
mock_path_constructor.assert_called_once_with(expected_home)
|
||||
mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True)
|
||||
# Verify authorizer call
|
||||
self.handler.authorizer.add_user.assert_called_once_with(
|
||||
'newuser', expected_hash, expected_home + '/', perm=self.mock_cfg.defperm # Note: Original code adds trailing slash here
|
||||
)
|
||||
# Verify DB interaction
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('newuser', '{expected_hash}', '{expected_home}', '{self.mock_cfg.defperm}')"
|
||||
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
|
||||
self.mock_db_conn.commit.assert_called_once()
|
||||
self.mock_db_conn.close.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('200 SITE ADDU successful.')
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with('User newuser created.')
|
||||
|
||||
def test_ftp_SITE_ADDU_missing_args(self):
|
||||
self.handler.ftp_SITE_ADDU('newuser') # Missing password
|
||||
self.handler.respond.assert_called_once_with('501 SITE ADDU failed. Command needs 2 arguments')
|
||||
self.handler.authorizer.add_user.assert_not_called()
|
||||
self.mock_conn_db.assert_not_called()
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
|
||||
def test_ftp_SITE_ADDU_mkdir_error(self, mock_basename, mock_path_constructor):
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
error = OSError("Cannot create dir")
|
||||
mock_path_instance.mkdir.side_effect = error
|
||||
|
||||
self.handler.ftp_SITE_ADDU('newuser newpassword')
|
||||
|
||||
self.handler.respond.assert_called_once_with(f'551 Error in create virtual user path: {error}')
|
||||
self.handler.authorizer.add_user.assert_not_called()
|
||||
self.mock_conn_db.assert_not_called()
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
|
||||
def test_ftp_SITE_ADDU_db_error(self, mock_basename, mock_path_constructor):
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
error = Exception("DB insert failed")
|
||||
self.mock_db_cursor.execute.side_effect = error
|
||||
|
||||
self.handler.ftp_SITE_ADDU('newuser newpassword')
|
||||
|
||||
# Verify mkdir called
|
||||
mock_path_instance.mkdir.assert_called_once()
|
||||
# Verify authorizer called (happens before DB)
|
||||
self.handler.authorizer.add_user.assert_called_once()
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once()
|
||||
self.mock_db_cursor.execute.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with(f'501 SITE ADDU failed: {error}.')
|
||||
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='olduser')
|
||||
def test_ftp_SITE_DELU_success(self, mock_basename):
|
||||
self.handler.ftp_SITE_DELU('olduser')
|
||||
|
||||
# Verify authorizer call
|
||||
self.handler.authorizer.remove_user.assert_called_once_with('olduser')
|
||||
# Verify DB interaction
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
expected_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = now() WHERE ftpuser = 'olduser'"
|
||||
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
|
||||
self.mock_db_conn.commit.assert_called_once()
|
||||
self.mock_db_conn.close.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('200 SITE DELU successful.')
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with('User olduser deleted.')
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='olduser')
|
||||
def test_ftp_SITE_DELU_error(self, mock_basename):
|
||||
error = Exception("DB update failed")
|
||||
self.mock_db_cursor.execute.side_effect = error
|
||||
|
||||
self.handler.ftp_SITE_DELU('olduser')
|
||||
|
||||
# Verify authorizer call (happens first)
|
||||
self.handler.authorizer.remove_user.assert_called_once_with('olduser')
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once()
|
||||
self.mock_db_cursor.execute.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('501 SITE DELU failed.')
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='restoreme')
|
||||
def test_ftp_SITE_RESU_success(self, mock_basename, mock_path_constructor):
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
user_data = ('restoreme', 'somehash', '/fake/ftp/root/restoreme', 'elmw')
|
||||
self.mock_db_cursor.fetchone.return_value = user_data
|
||||
|
||||
self.handler.ftp_SITE_RESU('restoreme')
|
||||
|
||||
# Verify DB interaction
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
expected_update_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = null WHERE ftpuser = 'restoreme'"
|
||||
expected_select_sql = f"SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE ftpuser = 'restoreme'"
|
||||
expected_db_calls = [
|
||||
call(expected_update_sql),
|
||||
call(expected_select_sql)
|
||||
]
|
||||
self.mock_db_cursor.execute.assert_has_calls(expected_db_calls)
|
||||
self.mock_db_conn.commit.assert_called_once() # For the update
|
||||
self.mock_db_cursor.fetchone.assert_called_once()
|
||||
# Verify authorizer call
|
||||
self.handler.authorizer.add_user.assert_called_once_with(*user_data)
|
||||
# Verify path creation
|
||||
mock_path_constructor.assert_called_once_with(self.mock_cfg.virtpath + 'restoreme')
|
||||
mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True)
|
||||
# Verify DB close
|
||||
self.mock_db_conn.close.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('200 SITE RESU successful.')
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with('User restoreme restored.')
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='restoreme')
|
||||
def test_ftp_SITE_RESU_db_error(self, mock_basename):
|
||||
error = Exception("DB fetch failed")
|
||||
# Simulate error on the SELECT statement
|
||||
self.mock_db_cursor.execute.side_effect = [None, error] # First call (UPDATE) ok, second (SELECT) fails
|
||||
|
||||
self.handler.ftp_SITE_RESU('restoreme')
|
||||
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once()
|
||||
self.assertEqual(self.mock_db_cursor.execute.call_count, 2) # Both UPDATE and SELECT attempted
|
||||
self.mock_db_conn.commit.assert_called_once() # Commit for UPDATE happened
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('501 SITE RESU failed.')
|
||||
# Verify authorizer not called, mkdir not called
|
||||
self.handler.authorizer.add_user.assert_not_called()
|
||||
|
||||
|
||||
def test_ftp_SITE_LSTU_success(self):
|
||||
user_list_data = [
|
||||
('userA', 'elr'),
|
||||
('userB', 'elmw'),
|
||||
]
|
||||
self.mock_db_cursor.fetchall.return_value = user_list_data
|
||||
|
||||
self.handler.ftp_SITE_LSTU('') # No argument needed
|
||||
|
||||
# Verify DB interaction
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
expected_sql = f'SELECT ftpuser, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL '
|
||||
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
|
||||
self.mock_db_cursor.fetchall.assert_called_once()
|
||||
# Verify push calls
|
||||
expected_push_calls = [
|
||||
call("214-The following virtual users are defined:\r\n"),
|
||||
call('Username: userA\tPerms: elr\r\nUsername: userB\tPerms: elmw\r\n')
|
||||
]
|
||||
self.handler.push.assert_has_calls(expected_push_calls)
|
||||
# Verify final response
|
||||
self.handler.respond.assert_called_once_with("214 LSTU SITE command successful.")
|
||||
|
||||
def test_ftp_SITE_LSTU_db_error(self):
|
||||
error = Exception("DB select failed")
|
||||
self.mock_db_cursor.execute.side_effect = error
|
||||
|
||||
self.handler.ftp_SITE_LSTU('')
|
||||
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once()
|
||||
self.mock_db_cursor.execute.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with(f'501 list users failed: {error}')
|
||||
# Verify push not called
|
||||
self.handler.push.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(argv=['first-arg-is-ignored'], exit=False)
|
||||
Reference in New Issue
Block a user