Update the use of no_log values to cover everything that heuristic_log_sanitize does.

Fixes #12792
This commit is contained in:
Toshio Kuratomi 2015-10-20 18:51:34 -07:00
commit 6e5055e786
5 changed files with 229 additions and 149 deletions

View file

@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division)
__metaclass__ = type
import copy
import json
import sys
from ansible.compat.tests import unittest
from ansible.compat.six import StringIO
from ansible.module_utils import basic
from ansible.module_utils.basic import heuristic_log_sanitize
from ansible.module_utils.basic import return_values, remove_values
@unittest.skipIf(sys.version_info[0] >= 3, "Python 3 is not supported on targets (yet)")
class TestAnsibleModuleExitJson(unittest.TestCase):
def setUp(self):
self.COMPLEX_ARGS = basic.MODULE_COMPLEX_ARGS
basic.MODULE_COMPLEX_ARGS = '{}'
self.old_stdout = sys.stdout
self.fake_stream = StringIO()
sys.stdout = self.fake_stream
self.module = basic.AnsibleModule(argument_spec=dict())
def tearDown(self):
basic.MODULE_COMPLEX_ARGS = self.COMPLEX_ARGS
sys.stdout = self.old_stdout
def test_exit_json_no_args_exits(self):
with self.assertRaises(SystemExit) as ctx:
self.module.exit_json()
self.assertEquals(ctx.exception.code, 0)
return_val = json.loads(self.fake_stream.getvalue())
self.assertEquals(return_val, dict(changed=False))
def test_exit_json_args_exits(self):
with self.assertRaises(SystemExit) as ctx:
self.module.exit_json(msg='message')
self.assertEquals(ctx.exception.code, 0)
return_val = json.loads(self.fake_stream.getvalue())
self.assertEquals(return_val, dict(msg="message", changed=False))
def test_fail_json_exits(self):
with self.assertRaises(SystemExit) as ctx:
self.module.fail_json(msg='message')
self.assertEquals(ctx.exception.code, 1)
return_val = json.loads(self.fake_stream.getvalue())
self.assertEquals(return_val, dict(msg="message", failed=True))
def test_exit_json_proper_changed(self):
with self.assertRaises(SystemExit) as ctx:
self.module.exit_json(changed=True, msg='success')
return_val = json.loads(self.fake_stream.getvalue())
self.assertEquals(return_val, dict(changed=True, msg='success'))
@unittest.skipIf(sys.version_info[0] >= 3, "Python 3 is not supported on targets (yet)")
class TestAnsibleModuleExitValuesRemoved(unittest.TestCase):
OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
dataset = (
(dict(username='person', password='$ecret k3y'),
dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/',
not_secret='following the leader', msg='here'),
dict(one=1, pwd=OMIT, url='https://username:password12345@foo.com/login/',
not_secret='following the leader', changed=False, msg='here')
),
(dict(username='person', password='password12345'),
dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/',
not_secret='following the leader', msg='here'),
dict(one=1, pwd='$ecret k3y', url='https://username:********@foo.com/login/',
not_secret='following the leader', changed=False, msg='here')
),
(dict(username='person', password='$ecret k3y'),
dict(one=1, pwd='$ecret k3y', url='https://username:$ecret k3y@foo.com/login/',
not_secret='following the leader', msg='here'),
dict(one=1, pwd=OMIT, url='https://username:********@foo.com/login/',
not_secret='following the leader', changed=False, msg='here')
),
)
def setUp(self):
self.COMPLEX_ARGS = basic.MODULE_COMPLEX_ARGS
self.old_stdout = sys.stdout
def tearDown(self):
basic.MODULE_COMPLEX_ARGS = self.COMPLEX_ARGS
sys.stdout = self.old_stdout
def test_exit_json_removes_values(self):
for args, return_val, expected in self.dataset:
sys.stdout = StringIO()
basic.MODULE_COMPLEX_ARGS = json.dumps(args)
module = basic.AnsibleModule(
argument_spec = dict(
username=dict(),
password=dict(no_log=True),
token=dict(no_log=True),
),
)
with self.assertRaises(SystemExit) as ctx:
self.assertEquals(module.exit_json(**return_val), expected)
self.assertEquals(json.loads(sys.stdout.getvalue()), expected)
def test_fail_json_removes_values(self):
for args, return_val, expected in self.dataset:
expected = copy.deepcopy(expected)
del expected['changed']
expected['failed'] = True
sys.stdout = StringIO()
basic.MODULE_COMPLEX_ARGS = json.dumps(args)
module = basic.AnsibleModule(
argument_spec = dict(
username=dict(),
password=dict(no_log=True),
token=dict(no_log=True),
),
)
with self.assertRaises(SystemExit) as ctx:
self.assertEquals(module.fail_json(**return_val), expected)
self.assertEquals(json.loads(sys.stdout.getvalue()), expected)

View file

@ -85,18 +85,6 @@ class TestHeuristicLogSanitize(unittest.TestCase):
self.assertTrue(ssh_output.endswith("}"))
self.assertIn(":********@foo.com/data'", ssh_output)
class TestStripNoLog(unittest.TestCase):
def setUp(self):
data = ''
def test_return_strings(self):
pass
def test_strip_no_log(self):
pass
class TestAnsibleModuleStripNoLogValues(unittest.TestCase):
pass
def test_hides_parameter_secrets(self):
output = heuristic_log_sanitize('token="secret", user="person", token_entry="test=secret"', frozenset(['secret']))
self.assertNotIn('secret', output)

View file

@ -103,7 +103,7 @@ class TestAnsibleModuleLogSyslog(unittest.TestCase):
u'Toshio くらとみ non-ascii test': u'Toshio くらとみ non-ascii test'.encode('utf-8'),
b'Byte string': b'Byte string',
u'Toshio くらとみ non-ascii test'.encode('utf-8'): u'Toshio くらとみ non-ascii test'.encode('utf-8'),
b'non-utf8 :\xff: test': b'non-utf8 :\xff: test'
b'non-utf8 :\xff: test': b'non-utf8 :\xff: test'.decode('utf-8', 'replace').encode('utf-8'),
}
py3_output_data = {

View file

@ -29,7 +29,7 @@ from ansible.compat.tests.mock import patch, MagicMock
from ansible.module_utils import basic
from ansible.module_utils.basic import heuristic_log_sanitize
from ansible.module_utils.basic import _return_values, _remove_values
from ansible.module_utils.basic import return_values, remove_values
class TestReturnValues(unittest.TestCase):
@ -50,10 +50,10 @@ class TestReturnValues(unittest.TestCase):
def test_return_values(self):
for data, expected in self.dataset:
self.assertEquals(frozenset(_return_values(data)), expected)
self.assertEquals(frozenset(return_values(data)), expected)
def test_unknown_type(self):
self.assertRaises(TypeError, frozenset, _return_values(object()))
self.assertRaises(TypeError, frozenset, return_values(object()))
class TestRemoveValues(unittest.TestCase):
@ -98,54 +98,13 @@ class TestRemoveValues(unittest.TestCase):
def test_no_removal(self):
for value, no_log_strings in self.dataset_no_remove:
self.assertEquals(_remove_values(value, no_log_strings), value)
self.assertEquals(remove_values(value, no_log_strings), value)
def test_strings_to_remove(self):
for value, no_log_strings, expected in self.dataset_remove:
self.assertEquals(_remove_values(value, no_log_strings), expected)
self.assertEquals(remove_values(value, no_log_strings), expected)
def test_unknown_type(self):
self.assertRaises(TypeError, _remove_values, object(), frozenset())
self.assertRaises(TypeError, remove_values, object(), frozenset())
@unittest.skipIf(sys.version_info[0] >= 3, "Python 3 is not supported on targets (yet)")
class TestAnsibleModuleRemoveNoLogValues(unittest.TestCase):
OMIT = 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER'
dataset = (
(dict(username='person', password='$ecret k3y'),
dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/',
not_secret='following the leader'),
dict(one=1, pwd=OMIT, url='https://username:password12345@foo.com/login/',
not_secret='following the leader')
),
(dict(username='person', password='password12345'),
dict(one=1, pwd='$ecret k3y', url='https://username:password12345@foo.com/login/',
not_secret='following the leader'),
dict(one=1, pwd='$ecret k3y', url='https://username:********@foo.com/login/',
not_secret='following the leader')
),
(dict(username='person', password='$ecret k3y'),
dict(one=1, pwd='$ecret k3y', url='https://username:$ecret k3y@foo.com/login/',
not_secret='following the leader'),
dict(one=1, pwd=OMIT, url='https://username:********@foo.com/login/',
not_secret='following the leader')
),
)
def setUp(self):
self.COMPLEX_ARGS = basic.MODULE_COMPLEX_ARGS
def tearDown(self):
basic.MODULE_COMPLEX_ARGS = self.COMPLEX_ARGS
def test_remove_no_log_values(self):
for args, return_val, expected in self.dataset:
basic.MODULE_COMPLEX_ARGS = json.dumps(args)
module = basic.AnsibleModule(
argument_spec = dict(
username=dict(),
password=dict(no_log=True),
token=dict(no_log=True),
),
)
self.assertEquals(module.remove_no_log_values(return_val), expected)