test/: PEP8 compliancy (#24803)

* test/: PEP8 compliancy

- Make PEP8 compliant

* Python3 chokes on casting int to bytes (#24952)

But if we tell the formatter that the var is a number, it works
This commit is contained in:
Dag Wieers 2017-05-30 19:05:19 +02:00 committed by John R Barker
commit 4efec414e7
110 changed files with 1702 additions and 1547 deletions

View file

@ -436,10 +436,19 @@ class TestActionBase(unittest.TestCase):
action_base._make_tmp_path.return_value = '/the/tmp/path'
action_base._low_level_execute_command.return_value = dict(stdout='{"rc": 0, "stdout": "ok"}')
self.assertEqual(action_base._execute_module(module_name=None, module_args=None), dict(_ansible_parsed=True, rc=0, stdout="ok", stdout_lines=['ok']))
self.assertEqual(action_base._execute_module(module_name='foo',
module_args=dict(z=9, y=8, x=7), task_vars=dict(a=1)),
dict(_ansible_parsed=True, rc=0, stdout="ok",
stdout_lines=['ok']))
self.assertEqual(
action_base._execute_module(
module_name='foo',
module_args=dict(z=9, y=8, x=7),
task_vars=dict(a=1)
),
dict(
_ansible_parsed=True,
rc=0,
stdout="ok",
stdout_lines=['ok'],
)
)
# test with needing/removing a remote tmp path
action_base._configure_module.return_value = ('old', '#!/usr/bin/python', 'this is the module data', 'path')
@ -525,9 +534,9 @@ class TestActionBaseCleanReturnedData(unittest.TestCase):
mock_shared_loader_obj.connection_loader = mock_connection_loader
mock_connection = MagicMock()
#mock_connection._shell.env_prefix.side_effect = env_prefix
# mock_connection._shell.env_prefix.side_effect = env_prefix
#action_base = DerivedActionBase(mock_task, mock_connection, play_context, None, None, None)
# action_base = DerivedActionBase(mock_task, mock_connection, play_context, None, None, None)
action_base = DerivedActionBase(task=None,
connection=mock_connection,
play_context=None,
@ -535,7 +544,7 @@ class TestActionBaseCleanReturnedData(unittest.TestCase):
templar=None,
shared_loader_obj=mock_shared_loader_obj)
data = {'ansible_playbook_python': '/usr/bin/python',
#'ansible_rsync_path': '/usr/bin/rsync',
# 'ansible_rsync_path': '/usr/bin/rsync',
'ansible_python_interpreter': '/usr/bin/python',
'ansible_ssh_some_var': 'whatever',
'ansible_ssh_host_key_somehost': 'some key here',
@ -592,7 +601,7 @@ class TestActionBaseParseReturnedData(unittest.TestCase):
'stdout_lines': stdout.splitlines(),
'stderr': err}
res = action_base._parse_returned_data(returned_data)
del res['_ansible_parsed'] # we always have _ansible_parsed
del res['_ansible_parsed'] # we always have _ansible_parsed
self.assertEqual(len(res), 0)
self.assertFalse(res)
@ -610,7 +619,7 @@ class TestActionBaseParseReturnedData(unittest.TestCase):
self.assertTrue(res['ansible_facts'])
self.assertIn('ansible_blip', res['ansible_facts'])
# TODO: Should this be an AnsibleUnsafe?
#self.assertIsInstance(res['ansible_facts'], AnsibleUnsafe)
# self.assertIsInstance(res['ansible_facts'], AnsibleUnsafe)
def test_json_facts_add_host(self):
action_base = self._action_base()
@ -630,4 +639,4 @@ class TestActionBaseParseReturnedData(unittest.TestCase):
self.assertIn('ansible_blip', res['ansible_facts'])
self.assertIn('add_host', res)
# TODO: Should this be an AnsibleUnsafe?
#self.assertIsInstance(res['ansible_facts'], AnsibleUnsafe)
# self.assertIsInstance(res['ansible_facts'], AnsibleUnsafe)

View file

@ -24,6 +24,7 @@ from ansible.compat.tests.mock import patch, MagicMock, Mock
from ansible.plugins.action.raw import ActionModule
from ansible.playbook.task import Task
class TestCopyResultExclude(unittest.TestCase):
def setUp(self):
@ -38,7 +39,6 @@ class TestCopyResultExclude(unittest.TestCase):
# Issue: https://github.com/ansible/ansible/issues/16054
# PR: https://github.com/ansible/ansible/pull/16085
def test_raw_executable_is_not_empty_string(self):
play_context = Mock()
@ -50,7 +50,7 @@ class TestCopyResultExclude(unittest.TestCase):
play_context.check_mode = False
self.mock_am = ActionModule(task, connection, play_context, loader=None, templar=None, shared_loader_obj=None)
self.mock_am._low_level_execute_command = Mock(return_value = {})
self.mock_am._low_level_execute_command = Mock(return_value={})
self.mock_am.display = Mock()
self.mock_am.run()
@ -83,7 +83,7 @@ class TestCopyResultExclude(unittest.TestCase):
play_context.check_mode = False
self.mock_am = ActionModule(task, connection, play_context, loader=None, templar=None, shared_loader_obj=None)
self.mock_am._low_level_execute_command = Mock(return_value = {})
self.mock_am._low_level_execute_command = Mock(return_value={})
self.mock_am.display = Mock()
self.assertEqual(task.environment, None)
@ -100,10 +100,8 @@ class TestCopyResultExclude(unittest.TestCase):
play_context.check_mode = False
self.mock_am = ActionModule(task, connection, play_context, loader=None, templar=None, shared_loader_obj=None)
self.mock_am._low_level_execute_command = Mock(return_value = {})
self.mock_am._low_level_execute_command = Mock(return_value={})
self.mock_am.display = Mock()
self.mock_am.run(task_vars={'a': 'b'})
self.assertEqual(task.environment, None)

View file

@ -17,11 +17,12 @@ import unittest
import yaml
from pprint import pprint
from ansible import plugins
import ansible.plugins
from ansible.compat.tests.mock import patch, MagicMock
from ansible.plugins.action.synchronize import ActionModule
# Getting the incoming and outgoing task vars from the plugin's run method
'''
@ -41,8 +42,6 @@ with open('task_vars.json', 'wb') as f:
'''
class TaskMock(object):
args = {'src': u'/tmp/deleteme',
'dest': '/tmp/deleteme',
@ -52,16 +51,19 @@ class TaskMock(object):
become_user = None
become_method = None
class StdinMock(object):
shell = None
class ConnectionMock(object):
ismock = True
_play_context = None
#transport = 'ssh'
# transport = 'ssh'
transport = None
_new_stdin = StdinMock()
class PlayContextMock(object):
shell = None
private_key_file = None
@ -75,13 +77,16 @@ class PlayContextMock(object):
remote_user = None
password = None
class ModuleLoaderMock(object):
def find_plugin(self, module_name, mod_type):
pass
class SharedLoaderMock(object):
module_loader = ModuleLoaderMock()
class SynchronizeTester(object):
''' A wrapper for mocking out synchronize environments '''
@ -96,7 +101,6 @@ class SynchronizeTester(object):
final_task_vars = None
execute_called = False
def _execute_module(self, module_name, module_args=None, task_vars=None):
self.execute_called = True
self.final_module_args = module_args
@ -114,7 +118,7 @@ class SynchronizeTester(object):
if '_play_context' in test_meta:
if test_meta['_play_context']:
self.task.args = {}
for k,v in test_meta['_play_context'].items():
for (k, v) in test_meta['_play_context'].items():
if v == 'None':
v = None
setattr(self._play_context, k, v)
@ -123,8 +127,8 @@ class SynchronizeTester(object):
if '_task' in test_meta:
if test_meta['_task']:
self.task.args = {}
for k,v in test_meta['_task'].items():
#import epdb; epdb.st()
for (k, v) in test_meta['_task'].items():
# import epdb; epdb.st()
if v == 'None':
v = None
setattr(self.task, k, v)
@ -133,32 +137,30 @@ class SynchronizeTester(object):
if 'task_args' in test_meta:
if test_meta['task_args']:
self.task.args = {}
for k,v in test_meta['task_args'].items():
for (k, v) in test_meta['task_args'].items():
self.task.args[k] = v
# load inital task vars
invarspath = os.path.join(fixturepath,
test_meta.get('fixtures', {}).get('taskvars_in', 'taskvars_in.json'))
invarspath = os.path.join(fixturepath, test_meta.get('fixtures', {}).get('taskvars_in', 'taskvars_in.json'))
with open(invarspath, 'rb') as f:
fdata = f.read()
fdata = fdata.decode("utf-8")
in_task_vars = json.loads(fdata)
# load expected final task vars
outvarspath = os.path.join(fixturepath,
test_meta.get('fixtures', {}).get('taskvars_out', 'taskvars_out.json'))
outvarspath = os.path.join(fixturepath, test_meta.get('fixtures', {}).get('taskvars_out', 'taskvars_out.json'))
with open(outvarspath, 'rb') as f:
fdata = f.read()
fdata = fdata.decode("utf-8")
out_task_vars = json.loads(fdata)
# fixup the connection
for k,v in test_meta['connection'].items():
for (k, v) in test_meta['connection'].items():
setattr(self.connection, k, v)
# fixup the hostvars
if test_meta['hostvars']:
for k,v in test_meta['hostvars'].items():
for (k, v) in test_meta['hostvars'].items():
in_task_vars['hostvars'][k] = v
# initalize and run the module
@ -170,9 +172,9 @@ class SynchronizeTester(object):
# run assertions
for check in test_meta['asserts']:
value = eval(check)
#if not value:
# print(check, value)
# import epdb; epdb.st()
# if not value:
# print(check, value)
# import epdb; epdb.st()
assert value, check
@ -190,54 +192,52 @@ class FakePluginLoader(object):
class TestSynchronizeAction(unittest.TestCase):
fixturedir = os.path.dirname(__file__)
fixturedir = os.path.join(fixturedir, 'fixtures', 'synchronize')
#print(basedir)
# print(basedir)
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic(self):
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir,'basic'))
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_become(self):
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir,'basic_become'))
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_become'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_become_cli(self):
# --become on the cli sets _play_context.become
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir,'basic_become_cli'))
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_become_cli'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_vagrant(self):
# simple vagrant example
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir,'basic_vagrant'))
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_vagrant'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_vagrant_sudo(self):
# vagrant plus sudo
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir,'basic_vagrant_sudo'))
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_vagrant_sudo'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_vagrant_become_cli(self):
# vagrant plus sudo
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir,'basic_vagrant_become_cli'))
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_vagrant_become_cli'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_delegate_remote(self):
# delegate to other remote host
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir,'delegate_remote'))
x.runtest(fixturepath=os.path.join(self.fixturedir, 'delegate_remote'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_delegate_remote_su(self):
# delegate to other remote host with su enabled
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir,'delegate_remote_su'))
x.runtest(fixturepath=os.path.join(self.fixturedir, 'delegate_remote_su'))

View file

@ -25,19 +25,18 @@ from ansible.compat.tests import mock
from ansible.compat.tests import unittest
from ansible.errors import AnsibleError
from ansible.playbook.play_context import PlayContext
from ansible.plugins.connection import ConnectionBase
#from ansible.plugins.connection.accelerate import Connection as AccelerateConnection
#from ansible.plugins.connection.chroot import Connection as ChrootConnection
#from ansible.plugins.connection.funcd import Connection as FuncdConnection
#from ansible.plugins.connection.jail import Connection as JailConnection
#from ansible.plugins.connection.libvirt_lxc import Connection as LibvirtLXCConnection
# from ansible.plugins.connection.accelerate import Connection as AccelerateConnection
# from ansible.plugins.connection.chroot import Connection as ChrootConnection
# from ansible.plugins.connection.funcd import Connection as FuncdConnection
# from ansible.plugins.connection.jail import Connection as JailConnection
# from ansible.plugins.connection.libvirt_lxc import Connection as LibvirtLXCConnection
from ansible.plugins.connection.lxc import Connection as LxcConnection
from ansible.plugins.connection.local import Connection as LocalConnection
from ansible.plugins.connection.paramiko_ssh import Connection as ParamikoConnection
from ansible.plugins.connection.ssh import Connection as SSHConnection
from ansible.plugins.connection.docker import Connection as DockerConnection
#from ansible.plugins.connection.winrm import Connection as WinRmConnection
# from ansible.plugins.connection.winrm import Connection as WinRmConnection
from ansible.plugins.connection.network_cli import Connection as NetworkCliConnection
@ -68,19 +67,26 @@ class TestConnectionBaseClass(unittest.TestCase):
def test_subclass_success(self):
class ConnectionModule3(ConnectionBase):
@property
def transport(self):
pass
def _connect(self):
pass
def exec_command(self):
pass
def put_file(self):
pass
def fetch_file(self):
pass
def close(self):
pass
self.assertIsInstance(ConnectionModule3(self.play_context, self.in_stream), ConnectionModule3)
# def test_accelerate_connection_module(self):
@ -190,17 +196,23 @@ debug1: Sending command: /bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzm
'''
class ConnectionFoo(ConnectionBase):
@property
def transport(self):
pass
def _connect(self):
pass
def exec_command(self):
pass
def put_file(self):
pass
def fetch_file(self):
pass
def close(self):
pass

View file

@ -28,7 +28,6 @@ from io import StringIO
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock, PropertyMock
from ansible.errors import AnsibleConnectionFailure
from ansible.playbook.play_context import PlayContext
@ -38,6 +37,7 @@ builtin_import = __import__
mock_ncclient = MagicMock(name='ncclient')
def import_mock(name, *args):
if name.startswith('ncclient'):
return mock_ncclient
@ -50,6 +50,7 @@ else:
with patch('__builtin__.__import__', side_effect=import_mock):
from ansible.plugins.connection import netconf
class TestNetconfConnectionClass(unittest.TestCase):
def test_netconf_init(self):
@ -117,5 +118,3 @@ class TestNetconfConnectionClass(unittest.TestCase):
self.assertEqual(1, rc)
self.assertEqual('', out)
self.assertEqual('unable to parse request', err)

View file

@ -143,7 +143,6 @@ class TestConnectionClass(unittest.TestCase):
self.assertFalse(mock_open_shell.called)
mock_send.assert_called_with({'command': b'command'})
def test_network_cli_send(self):
pc = PlayContext()
new_stdin = StringIO()

View file

@ -21,19 +21,18 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from io import StringIO
import pytest
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock, PropertyMock
from ansible import constants as C
from ansible.compat.selectors import SelectorKey, EVENT_READ
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock, PropertyMock
from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNotFound
from ansible.module_utils.six.moves import shlex_quote
from ansible.module_utils._text import to_bytes
from ansible.playbook.play_context import PlayContext
from ansible.plugins.connection import ssh
from ansible.module_utils._text import to_bytes
class TestConnectionBaseClass(unittest.TestCase):
@ -91,10 +90,10 @@ class TestConnectionBaseClass(unittest.TestCase):
conn = ssh.Connection(pc, new_stdin)
conn.check_password_prompt = MagicMock()
conn.check_become_success = MagicMock()
conn.check_password_prompt = MagicMock()
conn.check_become_success = MagicMock()
conn.check_incorrect_password = MagicMock()
conn.check_missing_password = MagicMock()
conn.check_missing_password = MagicMock()
def _check_password_prompt(line):
if b'foo' in line:
@ -116,17 +115,17 @@ class TestConnectionBaseClass(unittest.TestCase):
return True
return False
conn.check_password_prompt.side_effect = _check_password_prompt
conn.check_become_success.side_effect = _check_become_success
conn.check_password_prompt.side_effect = _check_password_prompt
conn.check_become_success.side_effect = _check_become_success
conn.check_incorrect_password.side_effect = _check_incorrect_password
conn.check_missing_password.side_effect = _check_missing_password
conn.check_missing_password.side_effect = _check_missing_password
# test examining output for prompt
conn._flags = dict(
become_prompt = False,
become_success = False,
become_error = False,
become_nopasswd_error = False,
become_prompt=False,
become_success=False,
become_error=False,
become_nopasswd_error=False,
)
pc.prompt = True
@ -140,10 +139,10 @@ class TestConnectionBaseClass(unittest.TestCase):
# test examining output for become prompt
conn._flags = dict(
become_prompt = False,
become_success = False,
become_error = False,
become_nopasswd_error = False,
become_prompt=False,
become_success=False,
become_error=False,
become_nopasswd_error=False,
)
pc.prompt = False
@ -158,10 +157,10 @@ class TestConnectionBaseClass(unittest.TestCase):
# test examining output for become failure
conn._flags = dict(
become_prompt = False,
become_success = False,
become_error = False,
become_nopasswd_error = False,
become_prompt=False,
become_success=False,
become_error=False,
become_nopasswd_error=False,
)
pc.prompt = False
@ -176,10 +175,10 @@ class TestConnectionBaseClass(unittest.TestCase):
# test examining output for missing password
conn._flags = dict(
become_prompt = False,
become_success = False,
become_error = False,
become_nopasswd_error = False,
become_prompt=False,
become_success=False,
become_error=False,
become_nopasswd_error=False,
)
pc.prompt = False
@ -236,8 +235,8 @@ class TestConnectionBaseClass(unittest.TestCase):
conn._run.assert_called_with('some command to run', expected_in_data, checkrc=False)
expected_in_data = b' '.join((b'put',
to_bytes(shlex_quote('/path/to/in/file/with/unicode-fö〩')),
to_bytes(shlex_quote('/path/to/dest/file/with/unicode-fö〩')))) + b'\n'
to_bytes(shlex_quote('/path/to/in/file/with/unicode-fö〩')),
to_bytes(shlex_quote('/path/to/dest/file/with/unicode-fö〩')))) + b'\n'
conn.put_file(u'/path/to/in/file/with/unicode-fö〩', u'/path/to/dest/file/with/unicode-fö〩')
conn._run.assert_called_with('some command to run', expected_in_data, checkrc=False)
@ -292,8 +291,8 @@ class TestConnectionBaseClass(unittest.TestCase):
conn._run.assert_called_with('some command to run', expected_in_data, checkrc=False)
expected_in_data = b' '.join((b'get',
to_bytes(shlex_quote('/path/to/in/file/with/unicode-fö〩')),
to_bytes(shlex_quote('/path/to/dest/file/with/unicode-fö〩')))) + b'\n'
to_bytes(shlex_quote('/path/to/in/file/with/unicode-fö〩')),
to_bytes(shlex_quote('/path/to/dest/file/with/unicode-fö〩')))) + b'\n'
conn.fetch_file(u'/path/to/in/file/with/unicode-fö〩', u'/path/to/dest/file/with/unicode-fö〩')
conn._run.assert_called_with('some command to run', expected_in_data, checkrc=False)

View file

@ -29,26 +29,31 @@ class TestINILookup(unittest.TestCase):
# Currently there isn't a new-style
old_style_params_data = (
# Simple case
dict(term=u'keyA section=sectionA file=/path/to/file',
expected=[u'keyA', u'section=sectionA', u'file=/path/to/file'],
),
dict(term=u'keyB section=sectionB with space file=/path/with/embedded spaces and/file',
expected=[u'keyB', u'section=sectionB with space', u'file=/path/with/embedded spaces and/file'],
),
dict(term=u'keyC section=sectionC file=/path/with/equals/cn=com.ansible',
expected=[u'keyC', u'section=sectionC', u'file=/path/with/equals/cn=com.ansible'],
),
dict(term=u'keyD section=sectionD file=/path/with space and/equals/cn=com.ansible',
expected=[u'keyD', u'section=sectionD', u'file=/path/with space and/equals/cn=com.ansible'],
),
dict(term=u'keyE section=sectionE file=/path/with/unicode/くらとみ/file',
expected=[u'keyE', u'section=sectionE', u'file=/path/with/unicode/くらとみ/file'],
),
dict(term=u'keyF section=sectionF file=/path/with/utf 8 and spaces/くらとみ/file',
expected=[u'keyF', u'section=sectionF', u'file=/path/with/utf 8 and spaces/くらとみ/file'],
),
)
dict(
term=u'keyA section=sectionA file=/path/to/file',
expected=[u'keyA', u'section=sectionA', u'file=/path/to/file'],
),
dict(
term=u'keyB section=sectionB with space file=/path/with/embedded spaces and/file',
expected=[u'keyB', u'section=sectionB with space', u'file=/path/with/embedded spaces and/file'],
),
dict(
term=u'keyC section=sectionC file=/path/with/equals/cn=com.ansible',
expected=[u'keyC', u'section=sectionC', u'file=/path/with/equals/cn=com.ansible'],
),
dict(
term=u'keyD section=sectionD file=/path/with space and/equals/cn=com.ansible',
expected=[u'keyD', u'section=sectionD', u'file=/path/with space and/equals/cn=com.ansible'],
),
dict(
term=u'keyE section=sectionE file=/path/with/unicode/くらとみ/file',
expected=[u'keyE', u'section=sectionE', u'file=/path/with/unicode/くらとみ/file'],
),
dict(
term=u'keyF section=sectionF file=/path/with/utf 8 and spaces/くらとみ/file',
expected=[u'keyF', u'section=sectionF', u'file=/path/with/utf 8 and spaces/くらとみ/file'],
),
)
def setUp(self):
pass
@ -58,6 +63,6 @@ class TestINILookup(unittest.TestCase):
def test_parse_parameters(self):
for testcase in self.old_style_params_data:
#print(testcase)
# print(testcase)
params = _parse_params(testcase['term'])
self.assertEqual(params, testcase['expected'])

View file

@ -22,6 +22,7 @@ __metaclass__ = type
import passlib
from passlib.handlers import pbkdf2
from units.mock.loader import DictDataLoader
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import mock_open, patch
@ -29,11 +30,8 @@ from ansible.errors import AnsibleError
from ansible.module_utils.six import text_type
from ansible.module_utils.six.moves import builtins
from ansible.plugins import PluginLoader
from ansible.utils import encrypt
from units.mock.loader import DictDataLoader
from ansible.plugins.lookup import password
from ansible.utils import encrypt
DEFAULT_CHARS = sorted([u'ascii_letters', u'digits', u".,:-_"])
@ -42,128 +40,149 @@ DEFAULT_CANDIDATE_CHARS = u'.,:-_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTU
# Currently there isn't a new-style
old_style_params_data = (
# Simple case
dict(term=u'/path/to/file',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(
term=u'/path/to/file',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
# Special characters in path
dict(term=u'/path/with/embedded spaces and/file',
filename=u'/path/with/embedded spaces and/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(term=u'/path/with/equals/cn=com.ansible',
filename=u'/path/with/equals/cn=com.ansible',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(term=u'/path/with/unicode/くらとみ/file',
filename=u'/path/with/unicode/くらとみ/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(
term=u'/path/with/embedded spaces and/file',
filename=u'/path/with/embedded spaces and/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(
term=u'/path/with/equals/cn=com.ansible',
filename=u'/path/with/equals/cn=com.ansible',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(
term=u'/path/with/unicode/くらとみ/file',
filename=u'/path/with/unicode/くらとみ/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
# Mix several special chars
dict(term=u'/path/with/utf 8 and spaces/くらとみ/file',
filename=u'/path/with/utf 8 and spaces/くらとみ/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(term=u'/path/with/encoding=unicode/くらとみ/file',
filename=u'/path/with/encoding=unicode/くらとみ/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(term=u'/path/with/encoding=unicode/くらとみ/and spaces file',
filename=u'/path/with/encoding=unicode/くらとみ/and spaces file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(
term=u'/path/with/utf 8 and spaces/くらとみ/file',
filename=u'/path/with/utf 8 and spaces/くらとみ/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(
term=u'/path/with/encoding=unicode/くらとみ/file',
filename=u'/path/with/encoding=unicode/くらとみ/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(
term=u'/path/with/encoding=unicode/くらとみ/and spaces file',
filename=u'/path/with/encoding=unicode/くらとみ/and spaces file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
# Simple parameters
dict(term=u'/path/to/file length=42',
filename=u'/path/to/file',
params=dict(length=42, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(term=u'/path/to/file encrypt=pbkdf2_sha256',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt='pbkdf2_sha256', chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(term=u'/path/to/file chars=abcdefghijklmnop',
dict(
term=u'/path/to/file length=42',
filename=u'/path/to/file',
params=dict(length=42, encrypt=None, chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(
term=u'/path/to/file encrypt=pbkdf2_sha256',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt='pbkdf2_sha256', chars=DEFAULT_CHARS),
candidate_chars=DEFAULT_CANDIDATE_CHARS,
),
dict(
term=u'/path/to/file chars=abcdefghijklmnop',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abcdefghijklmnop']),
candidate_chars=u'abcdefghijklmnop',
),
dict(term=u'/path/to/file chars=digits,abc,def',
),
dict(
term=u'/path/to/file chars=digits,abc,def',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc', u'def'])),
candidate_chars=u'abcdef0123456789',
),
),
# Including comma in chars
dict(term=u'/path/to/file chars=abcdefghijklmnop,,digits',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'abcdefghijklmnop', u',', u'digits'])),
candidate_chars = u',abcdefghijklmnop0123456789',
),
dict(term=u'/path/to/file chars=,,',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u',']),
candidate_chars=u',',
),
dict(
term=u'/path/to/file chars=abcdefghijklmnop,,digits',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'abcdefghijklmnop', u',', u'digits'])),
candidate_chars=u',abcdefghijklmnop0123456789',
),
dict(
term=u'/path/to/file chars=,,',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u',']),
candidate_chars=u',',
),
# Including = in chars
dict(term=u'/path/to/file chars=digits,=,,',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'=', u','])),
candidate_chars=u',=0123456789',
),
dict(term=u'/path/to/file chars=digits,abc=def',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc=def'])),
candidate_chars=u'abc=def0123456789',
),
dict(
term=u'/path/to/file chars=digits,=,,',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'=', u','])),
candidate_chars=u',=0123456789',
),
dict(
term=u'/path/to/file chars=digits,abc=def',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc=def'])),
candidate_chars=u'abc=def0123456789',
),
# Including unicode in chars
dict(term=u'/path/to/file chars=digits,くらとみ,,',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'くらとみ', u','])),
candidate_chars=u',0123456789くらとみ',
),
dict(
term=u'/path/to/file chars=digits,くらとみ,,',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'くらとみ', u','])),
candidate_chars=u',0123456789くらとみ',
),
# Including only unicode in chars
dict(term=u'/path/to/file chars=くらとみ',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'くらとみ'])),
candidate_chars=u'くらとみ',
),
dict(
term=u'/path/to/file chars=くらとみ',
filename=u'/path/to/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'くらとみ'])),
candidate_chars=u'くらとみ',
),
# Include ':' in path
dict(term=u'/path/to/file_with:colon chars=ascii_letters,digits',
filename=u'/path/to/file_with:colon',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'ascii_letters', u'digits'])),
candidate_chars=u'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',
),
dict(
term=u'/path/to/file_with:colon chars=ascii_letters,digits',
filename=u'/path/to/file_with:colon',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'ascii_letters', u'digits'])),
candidate_chars=u'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',
),
# Including special chars in both path and chars
# Special characters in path
dict(term=u'/path/with/embedded spaces and/file chars=abc=def',
filename=u'/path/with/embedded spaces and/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']),
candidate_chars=u'abc=def',
),
dict(term=u'/path/with/equals/cn=com.ansible chars=abc=def',
filename=u'/path/with/equals/cn=com.ansible',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']),
candidate_chars=u'abc=def',
),
dict(term=u'/path/with/unicode/くらとみ/file chars=くらとみ',
filename=u'/path/with/unicode/くらとみ/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']),
candidate_chars=u'くらとみ',
),
dict(
term=u'/path/with/embedded spaces and/file chars=abc=def',
filename=u'/path/with/embedded spaces and/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']),
candidate_chars=u'abc=def',
),
dict(
term=u'/path/with/equals/cn=com.ansible chars=abc=def',
filename=u'/path/with/equals/cn=com.ansible',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']),
candidate_chars=u'abc=def',
),
dict(
term=u'/path/with/unicode/くらとみ/file chars=くらとみ',
filename=u'/path/with/unicode/くらとみ/file',
params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']),
candidate_chars=u'くらとみ',
),
)
@ -306,27 +325,26 @@ class TestFormatContent(unittest.TestCase):
def test_no_encrypt(self):
self.assertEqual(
password._format_content(password=u'hunter42',
salt=u'87654321',
encrypt=False),
salt=u'87654321',
encrypt=False),
u'hunter42 salt=87654321')
def test_no_encrypt_no_salt(self):
self.assertEqual(
password._format_content(password=u'hunter42',
salt=None,
encrypt=False),
salt=None,
encrypt=False),
u'hunter42')
def test_encrypt(self):
self.assertEqual(
password._format_content(password=u'hunter42',
salt=u'87654321',
encrypt='pbkdf2_sha256'),
salt=u'87654321',
encrypt='pbkdf2_sha256'),
u'hunter42 salt=87654321')
def test_encrypt_no_salt(self):
self.assertRaises(AssertionError, password._format_content,
u'hunter42', None, 'pbkdf2_sha256')
self.assertRaises(AssertionError, password._format_content, u'hunter42', None, 'pbkdf2_sha256')
class TestWritePasswordFile(unittest.TestCase):
@ -351,13 +369,13 @@ class TestWritePasswordFile(unittest.TestCase):
class TestLookupModule(unittest.TestCase):
def setUp(self):
self.fake_loader = DictDataLoader({'/path/to/somewhere':'sdfsdf'})
self.fake_loader = DictDataLoader({'/path/to/somewhere': 'sdfsdf'})
self.password_lookup = password.LookupModule(loader=self.fake_loader)
self.os_path_exists = password.os.path.exists
# Different releases of passlib default to a different number of rounds
self.sha256 = passlib.registry.get_crypt_handler('pbkdf2_sha256')
sha256_for_tests = pbkdf2.create_pbkdf2_hash("sha256", 32, 20000)
sha256_for_tests = pbkdf2.create_pbkdf2_hash("sha256", 32, 20000)
passlib.registry.register_crypt_handler(sha256_for_tests, force=True)
def tearDown(self):
@ -369,8 +387,7 @@ class TestLookupModule(unittest.TestCase):
def test_no_encrypt(self, mock_get_paths, mock_write_file):
mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
results = self.password_lookup.run([u'/path/to/somewhere'],
None)
results = self.password_lookup.run([u'/path/to/somewhere'], None)
# FIXME: assert something useful
for result in results:
@ -382,8 +399,7 @@ class TestLookupModule(unittest.TestCase):
def test_encrypt(self, mock_get_paths, mock_write_file):
mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
results = self.password_lookup.run([u'/path/to/somewhere encrypt=pbkdf2_sha256'],
None)
results = self.password_lookup.run([u'/path/to/somewhere encrypt=pbkdf2_sha256'], None)
# pbkdf2 format plus hash
expected_password_length = 76
@ -412,8 +428,7 @@ class TestLookupModule(unittest.TestCase):
password.os.path.exists = lambda x: True
with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m:
results = self.password_lookup.run([u'/path/to/somewhere chars=anything encrypt=pbkdf2_sha256'],
None)
results = self.password_lookup.run([u'/path/to/somewhere chars=anything encrypt=pbkdf2_sha256'], None)
for result in results:
self.assertEqual(result, u'$pbkdf2-sha256$20000$ODc2NTQzMjE$Uikde0cv0BKaRaAXMrUQB.zvG4GmnjClwjghwIRf2gU')
@ -424,8 +439,7 @@ class TestLookupModule(unittest.TestCase):
password.os.path.exists = lambda x: True
with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m:
results = self.password_lookup.run([u'/path/to/somewhere chars=anything'],
None)
results = self.password_lookup.run([u'/path/to/somewhere chars=anything'], None)
for result in results:
self.assertEqual(result, u'hunter42')
@ -435,7 +449,6 @@ class TestLookupModule(unittest.TestCase):
def test_only_a(self, mock_get_paths, mock_write_file):
mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three']
results = self.password_lookup.run([u'/path/to/somewhere chars=a'],
None)
results = self.password_lookup.run([u'/path/to/somewhere chars=a'], None)
for result in results:
self.assertEquals(result, u'a' * password.DEFAULT_LENGTH)

View file

@ -19,11 +19,11 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from units.mock.loader import DictDataLoader
import uuid
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.errors import AnsibleError, AnsibleParserError
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_queue_manager import TaskQueueManager
@ -34,7 +34,6 @@ from ansible.playbook.block import Block
from ansible.playbook.handler import Handler
from ansible.plugins.strategy import StrategyBase
from units.mock.loader import DictDataLoader
class TestStrategyBase(unittest.TestCase):
@ -46,13 +45,16 @@ class TestStrategyBase(unittest.TestCase):
def test_strategy_base_init(self):
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)
@ -71,13 +73,16 @@ class TestStrategyBase(unittest.TestCase):
def test_strategy_base_run(self):
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)
@ -96,7 +101,7 @@ class TestStrategyBase(unittest.TestCase):
for attr in ('RUN_OK', 'RUN_ERROR', 'RUN_FAILED_HOSTS', 'RUN_UNREACHABLE_HOSTS'):
setattr(mock_tqm, attr, getattr(TaskQueueManager, attr))
mock_iterator = MagicMock()
mock_iterator = MagicMock()
mock_iterator._play = MagicMock()
mock_iterator._play.handlers = []
@ -124,13 +129,16 @@ class TestStrategyBase(unittest.TestCase):
def test_strategy_base_get_hosts(self):
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)
@ -142,7 +150,7 @@ class TestStrategyBase(unittest.TestCase):
mock_hosts = []
for i in range(0, 5):
mock_host = MagicMock()
mock_host.name = "host%02d" % (i+1)
mock_host.name = "host%02d" % (i + 1)
mock_host.has_hostkey = True
mock_hosts.append(mock_host)
@ -156,7 +164,7 @@ class TestStrategyBase(unittest.TestCase):
mock_tqm.get_inventory.return_value = mock_inventory
mock_play = MagicMock()
mock_play.hosts = ["host%02d" % (i+1) for i in range(0, 5)]
mock_play.hosts = ["host%02d" % (i + 1) for i in range(0, 5)]
strategy_base = StrategyBase(tqm=mock_tqm)
@ -213,7 +221,6 @@ class TestStrategyBase(unittest.TestCase):
finally:
tqm.cleanup()
def test_strategy_base_process_pending_results(self):
mock_tqm = MagicMock()
mock_tqm._terminated = False
@ -224,13 +231,16 @@ class TestStrategyBase(unittest.TestCase):
mock_tqm._listening_handlers = {}
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)
@ -290,6 +300,7 @@ class TestStrategyBase(unittest.TestCase):
if host_name == 'test01':
return mock_host
return None
def _get_group(group_name):
if group_name in ('all', 'foo'):
return mock_group
@ -341,8 +352,8 @@ class TestStrategyBase(unittest.TestCase):
self.assertEqual(results[0], task_result)
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
#self.assertIn('test01', mock_tqm._failed_hosts)
#del mock_tqm._failed_hosts['test01']
# self.assertIn('test01', mock_tqm._failed_hosts)
# del mock_tqm._failed_hosts['test01']
mock_iterator.is_failed.return_value = False
task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"unreachable": true}')
@ -393,18 +404,18 @@ class TestStrategyBase(unittest.TestCase):
self.assertIn(mock_handler_task._uuid, strategy_base._notified_handlers)
self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task._uuid])
#queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar'))
#results = strategy_base._process_pending_results(iterator=mock_iterator)
#self.assertEqual(len(results), 0)
#self.assertEqual(strategy_base._pending_results, 1)
# queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar'))
# results = strategy_base._process_pending_results(iterator=mock_iterator)
# self.assertEqual(len(results), 0)
# self.assertEqual(strategy_base._pending_results, 1)
#queue_items.append(('set_host_facts', mock_host, mock_task, None, 'foo', dict()))
#results = strategy_base._process_pending_results(iterator=mock_iterator)
#self.assertEqual(len(results), 0)
#self.assertEqual(strategy_base._pending_results, 1)
# queue_items.append(('set_host_facts', mock_host, mock_task, None, 'foo', dict()))
# results = strategy_base._process_pending_results(iterator=mock_iterator)
# self.assertEqual(len(results), 0)
# self.assertEqual(strategy_base._pending_results, 1)
#queue_items.append(('bad'))
#self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator)
# queue_items.append(('bad'))
# self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator)
strategy_base.cleanup()
def test_strategy_base_load_included_file(self):
@ -417,13 +428,16 @@ class TestStrategyBase(unittest.TestCase):
})
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
def _queue_put(item, *args, **kwargs):
queue_items.append(item)

View file

@ -21,13 +21,12 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ansible.compat.tests import unittest
from ansible.compat.tests import BUILTINS
from ansible.compat.tests import BUILTINS, unittest
from ansible.compat.tests.mock import mock_open, patch, MagicMock
from ansible.plugins import MODULE_CACHE, PATH_CACHE, PLUGIN_PATH_CACHE, PluginLoader
class TestErrors(unittest.TestCase):
def setUp(self):
@ -69,15 +68,15 @@ class TestErrors(unittest.TestCase):
self.assertEqual(pl._get_paths(), ['/path/one', '/path/two'])
# NOT YET WORKING
#def fake_glob(path):
# if path == 'test/*':
# return ['test/foo', 'test/bar', 'test/bam']
# elif path == 'test/*/*'
#m._paths = None
#mock_glob = MagicMock()
#mock_glob.return_value = []
#with patch('glob.glob', mock_glob):
# pass
# def fake_glob(path):
# if path == 'test/*':
# return ['test/foo', 'test/bar', 'test/bam']
# elif path == 'test/*/*'
# m._paths = None
# mock_glob = MagicMock()
# mock_glob.return_value = []
# with patch('glob.glob', mock_glob):
# pass
def assertPluginLoaderConfigBecomes(self, arg, expected):
pl = PluginLoader('test', '', arg, 'test_plugin')