mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-05-02 15:21:25 -07:00
Split basic units (#33510)
Split the one monolithic test for basic.py into several files * Split test_basic.py along categories. This is preliminary to get a handle on things. Eventually we may want to further split it so each file is only testing a single function. * Cleanup unused imports from splitting test_basic.py * Port atomic_move test to pytest. Working on getting rid of need to maintain procenv * Split a test of symbolic_mode_to_octal to follow unittest best practices Each test should only invoke the function under test once * Port test_argument_spec to pytest. * Fix suboptions failure
This commit is contained in:
parent
ba32827f3b
commit
370a7ace4b
16 changed files with 1330 additions and 1359 deletions
217
test/units/module_utils/basic/test_atomic_move.py
Normal file
217
test/units/module_utils/basic/test_atomic_move.py
Normal file
|
@ -0,0 +1,217 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
|
||||
# (c) 2016 Toshio Kuratomi <tkuratomi@ansible.com>
|
||||
# (c) 2017 Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
import errno
|
||||
import json
|
||||
from itertools import product
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible.module_utils import basic
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atomic_am(am, mocker):
|
||||
am.selinux_enabled = mocker.MagicMock()
|
||||
am.selinux_context = mocker.MagicMock()
|
||||
am.selinux_default_context = mocker.MagicMock()
|
||||
am.set_context_if_different = mocker.MagicMock()
|
||||
|
||||
yield am
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atomic_mocks(mocker):
|
||||
environ = dict()
|
||||
mocks = {
|
||||
'chmod': mocker.patch('os.chmod'),
|
||||
'chown': mocker.patch('os.chown'),
|
||||
'close': mocker.patch('os.close'),
|
||||
'environ': mocker.patch('os.environ', environ),
|
||||
'getlogin': mocker.patch('os.getlogin'),
|
||||
'getuid': mocker.patch('os.getuid'),
|
||||
'path_exists': mocker.patch('os.path.exists'),
|
||||
'rename': mocker.patch('os.rename'),
|
||||
'stat': mocker.patch('os.stat'),
|
||||
'umask': mocker.patch('os.umask'),
|
||||
'getpwuid': mocker.patch('pwd.getpwuid'),
|
||||
'copy2': mocker.patch('shutil.copy2'),
|
||||
'copyfileobj': mocker.patch('shutil.copyfileobj'),
|
||||
'move': mocker.patch('shutil.move'),
|
||||
'mkstemp': mocker.patch('tempfile.mkstemp'),
|
||||
}
|
||||
|
||||
mocks['getlogin'].return_value = 'root'
|
||||
mocks['getuid'].return_value = 0
|
||||
mocks['getpwuid'].return_value = ('root', '', 0, 0, '', '', '')
|
||||
mocks['umask'].side_effect = [18, 0]
|
||||
mocks['rename'].return_value = None
|
||||
|
||||
yield mocks
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_stat(mocker):
|
||||
stat1 = mocker.MagicMock()
|
||||
stat1.st_mode = 0o0644
|
||||
stat1.st_uid = 0
|
||||
stat1.st_gid = 0
|
||||
yield stat1
|
||||
|
||||
|
||||
@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin'])
|
||||
def test_new_file(atomic_am, atomic_mocks, mocker, selinux):
|
||||
# test destination does not exist, login name = 'root', no environment, os.rename() succeeds
|
||||
mock_context = atomic_am.selinux_default_context.return_value
|
||||
atomic_mocks['path_exists'].return_value = False
|
||||
atomic_am.selinux_enabled.return_value = selinux
|
||||
|
||||
atomic_am.atomic_move('/path/to/src', '/path/to/dest')
|
||||
|
||||
atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest')
|
||||
assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)]
|
||||
|
||||
if selinux:
|
||||
assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')]
|
||||
assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)]
|
||||
else:
|
||||
assert not atomic_am.selinux_default_context.called
|
||||
assert not atomic_am.set_context_if_different.called
|
||||
|
||||
|
||||
@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin'])
|
||||
def test_existing_file(atomic_am, atomic_mocks, fake_stat, mocker, selinux):
|
||||
# Test destination already present
|
||||
mock_context = atomic_am.selinux_context.return_value
|
||||
atomic_mocks['stat'].return_value = fake_stat
|
||||
atomic_mocks['path_exists'].return_value = True
|
||||
atomic_am.selinux_enabled.return_value = selinux
|
||||
|
||||
atomic_am.atomic_move('/path/to/src', '/path/to/dest')
|
||||
|
||||
atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest')
|
||||
assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)]
|
||||
|
||||
if selinux:
|
||||
assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)]
|
||||
assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')]
|
||||
else:
|
||||
assert not atomic_am.selinux_default_context.called
|
||||
assert not atomic_am.set_context_if_different.called
|
||||
|
||||
|
||||
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
||||
def test_no_tty_fallback(atomic_am, atomic_mocks, fake_stat, mocker):
|
||||
"""Raise OSError when using getlogin() to simulate no tty cornercase"""
|
||||
mock_context = atomic_am.selinux_context.return_value
|
||||
atomic_mocks['stat'].return_value = fake_stat
|
||||
atomic_mocks['path_exists'].return_value = True
|
||||
atomic_am.selinux_enabled.return_value = True
|
||||
atomic_mocks['getlogin'].side_effect = OSError()
|
||||
atomic_mocks['environ']['LOGNAME'] = 'root'
|
||||
|
||||
atomic_am.atomic_move('/path/to/src', '/path/to/dest')
|
||||
|
||||
atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest')
|
||||
assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)]
|
||||
|
||||
assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)]
|
||||
assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')]
|
||||
|
||||
|
||||
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
||||
def test_existing_file_stat_failure(atomic_am, atomic_mocks, mocker):
|
||||
"""Failure to stat an existing file in order to copy permissions propogates the error (unless EPERM)"""
|
||||
atomic_mocks['stat'].side_effect = OSError()
|
||||
atomic_mocks['path_exists'].return_value = True
|
||||
|
||||
with pytest.raises(OSError):
|
||||
atomic_am.atomic_move('/path/to/src', '/path/to/dest')
|
||||
|
||||
|
||||
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
||||
def test_existing_file_stat_perms_failure(atomic_am, atomic_mocks, mocker):
|
||||
"""Failure to stat an existing file to copy the permissions due to permissions passes fine"""
|
||||
# and now have os.stat return EPERM, which should not fail
|
||||
mock_context = atomic_am.selinux_context.return_value
|
||||
atomic_mocks['stat'].side_effect = OSError(errno.EPERM, 'testing os stat with EPERM')
|
||||
atomic_mocks['path_exists'].return_value = True
|
||||
atomic_am.selinux_enabled.return_value = True
|
||||
|
||||
atomic_am.atomic_move('/path/to/src', '/path/to/dest')
|
||||
|
||||
atomic_mocks['rename'].assert_called_with(b'/path/to/src', b'/path/to/dest')
|
||||
# FIXME: Should atomic_move() set a default permission value when it cannot retrieve the
|
||||
# existing file's permissions? (Right now it's up to the calling code.
|
||||
# assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/src', basic.DEFAULT_PERM & ~18)]
|
||||
assert atomic_am.set_context_if_different.call_args_list == [mocker.call('/path/to/dest', mock_context, False)]
|
||||
assert atomic_am.selinux_context.call_args_list == [mocker.call('/path/to/dest')]
|
||||
|
||||
|
||||
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
||||
def test_rename_failure(atomic_am, atomic_mocks, mocker, capfd):
|
||||
"""Test os.rename fails with EIO, causing it to bail out"""
|
||||
atomic_mocks['path_exists'].side_effect = [False, False]
|
||||
atomic_mocks['rename'].side_effect = OSError(errno.EIO, 'failing with EIO')
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
atomic_am.atomic_move('/path/to/src', '/path/to/dest')
|
||||
|
||||
out, err = capfd.readouterr()
|
||||
results = json.loads(out)
|
||||
|
||||
assert 'Could not replace file' in results['msg']
|
||||
assert 'failing with EIO' in results['msg']
|
||||
assert results['failed']
|
||||
|
||||
|
||||
@pytest.mark.parametrize('stdin', [{}], indirect=['stdin'])
|
||||
def test_rename_perms_fail_temp_creation_fails(atomic_am, atomic_mocks, mocker, capfd):
|
||||
"""Test os.rename fails with EPERM working but failure in mkstemp"""
|
||||
atomic_mocks['path_exists'].return_value = False
|
||||
atomic_mocks['close'].return_value = None
|
||||
atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None]
|
||||
atomic_mocks['mkstemp'].return_value = None
|
||||
atomic_mocks['mkstemp'].side_effect = OSError()
|
||||
atomic_am.selinux_enabled.return_value = False
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
atomic_am.atomic_move('/path/to/src', '/path/to/dest')
|
||||
|
||||
out, err = capfd.readouterr()
|
||||
results = json.loads(out)
|
||||
|
||||
assert 'is not writable by the current user' in results['msg']
|
||||
assert results['failed']
|
||||
|
||||
|
||||
@pytest.mark.parametrize('stdin, selinux', product([{}], (True, False)), indirect=['stdin'])
|
||||
def test_rename_perms_fail_temp_succeeds(atomic_am, atomic_mocks, fake_stat, mocker, selinux):
|
||||
"""Test os.rename raising an error but fallback to using mkstemp works"""
|
||||
mock_context = atomic_am.selinux_default_context.return_value
|
||||
atomic_mocks['path_exists'].return_value = False
|
||||
atomic_mocks['rename'].side_effect = [OSError(errno.EPERM, 'failing with EPERM'), None]
|
||||
atomic_mocks['stat'].return_value = [fake_stat, fake_stat, fake_stat]
|
||||
atomic_mocks['stat'].side_effect = None
|
||||
atomic_mocks['mkstemp'].return_value = (None, '/path/to/tempfile')
|
||||
atomic_mocks['mkstemp'].side_effect = None
|
||||
atomic_am.selinux_enabled.return_value = selinux
|
||||
|
||||
atomic_am.atomic_move('/path/to/src', '/path/to/dest')
|
||||
assert atomic_mocks['rename'].call_args_list == [mocker.call(b'/path/to/src', b'/path/to/dest'),
|
||||
mocker.call(b'/path/to/tempfile', b'/path/to/dest')]
|
||||
assert atomic_mocks['chmod'].call_args_list == [mocker.call(b'/path/to/dest', basic.DEFAULT_PERM & ~18)]
|
||||
|
||||
if selinux:
|
||||
assert atomic_am.selinux_default_context.call_args_list == [mocker.call('/path/to/dest')]
|
||||
assert atomic_am.set_context_if_different.call_args_list == [mocker.call(b'/path/to/tempfile', mock_context, False),
|
||||
mocker.call('/path/to/dest', mock_context, False)]
|
||||
else:
|
||||
assert not atomic_am.selinux_default_context.called
|
||||
assert not atomic_am.set_context_if_different.called
|
Loading…
Add table
Add a link
Reference in a new issue