diff --git a/lib/ansible/modules/network/f5/bigip_device_sshd.py b/lib/ansible/modules/network/f5/bigip_device_sshd.py index 9af109154a..991538bb0f 100644 --- a/lib/ansible/modules/network/f5/bigip_device_sshd.py +++ b/lib/ansible/modules/network/f5/bigip_device_sshd.py @@ -3,17 +3,20 @@ # # Copyright (c) 2017 F5 Networks Inc. # GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import absolute_import, division, print_function +__metaclass__ = type + ANSIBLE_METADATA = {'metadata_version': '1.1', 'status': ['preview'], 'supported_by': 'community'} -DOCUMENTATION = ''' +DOCUMENTATION = r''' --- module: bigip_device_sshd short_description: Manage the SSHD settings of a BIG-IP description: - - Manage the SSHD settings of a BIG-IP + - Manage the SSHD settings of a BIG-IP. version_added: "2.2" options: allow: @@ -28,7 +31,6 @@ options: banner: description: - Whether to enable the banner or not. - required: false choices: - enabled - disabled @@ -36,12 +38,10 @@ options: description: - Specifies the text to include on the pre-login banner that displays when a user attempts to login to the system using SSH. - required: false inactivity_timeout: description: - Specifies the number of seconds before inactivity causes an SSH session to log out. - required: false log_level: description: - Specifies the minimum SSHD message level to include in the system log. @@ -62,11 +62,9 @@ options: choices: - enabled - disabled - required: false port: description: - Port that you want the SSH daemon to run on. - required: false notes: - Requires the f5-sdk Python package on the host This is as easy as pip install f5-sdk. @@ -78,259 +76,278 @@ author: - Tim Rupp (@caphrim007) ''' -EXAMPLES = ''' +EXAMPLES = r''' - name: Set the banner for the SSHD service from a string bigip_device_sshd: - banner: "enabled" - banner_text: "banner text goes here" - password: "secret" - server: "lb.mydomain.com" - user: "admin" + banner: enabled + banner_text: banner text goes here + password: secret + server: lb.mydomain.com + user: admin delegate_to: localhost - name: Set the banner for the SSHD service from a file bigip_device_sshd: - banner: "enabled" - banner_text: "{{ lookup('file', '/path/to/file') }}" - password: "secret" - server: "lb.mydomain.com" - user: "admin" + banner: enabled + banner_text: "{{ lookup('file', '/path/to/file') }}" + password: secret + server: lb.mydomain.com + user: admin delegate_to: localhost - name: Set the SSHD service to run on port 2222 bigip_device_sshd: - password: "secret" - port: 2222 - server: "lb.mydomain.com" - user: "admin" + password: secret + port: 2222 + server: lb.mydomain.com + user: admin delegate_to: localhost ''' -RETURN = ''' +RETURN = r''' allow: - description: > - Specifies, if you have enabled SSH access, the IP address or address - range for other systems that can use SSH to communicate with this - system. - returned: changed - type: string - sample: "192.0.2.*" + description: > + Specifies, if you have enabled SSH access, the IP address or address + range for other systems that can use SSH to communicate with this + system. + returned: changed + type: string + sample: 192.0.2.* banner: - description: Whether the banner is enabled or not. - returned: changed - type: string - sample: "true" + description: Whether the banner is enabled or not. + returned: changed + type: string + sample: true banner_text: - description: > - Specifies the text included on the pre-login banner that - displays when a user attempts to login to the system using SSH. - returned: changed and success - type: string - sample: "This is a corporate device. Connecting to it without..." + description: > + Specifies the text included on the pre-login banner that + displays when a user attempts to login to the system using SSH. + returned: changed and success + type: string + sample: This is a corporate device. Connecting to it without... inactivity_timeout: - description: > - The number of seconds before inactivity causes an SSH. - session to log out. - returned: changed - type: int - sample: "10" + description: > + The number of seconds before inactivity causes an SSH + session to log out. + returned: changed + type: int + sample: 10 log_level: - description: The minimum SSHD message level to include in the system log. - returned: changed - type: string - sample: "debug" + description: The minimum SSHD message level to include in the system log. + returned: changed + type: string + sample: debug login: - description: Specifies that the system accepts SSH communications or not. - returned: changed - type: bool - sample: true + description: Specifies that the system accepts SSH communications or not. + returned: changed + type: bool + sample: true port: - description: Port that you want the SSH daemon to run on. - returned: changed - type: int - sample: 22 + description: Port that you want the SSH daemon to run on. + returned: changed + type: int + sample: 22 ''' + +from ansible.module_utils.f5_utils import AnsibleF5Client +from ansible.module_utils.f5_utils import AnsibleF5Parameters +from ansible.module_utils.f5_utils import HAS_F5SDK +from ansible.module_utils.f5_utils import F5ModuleError + try: - from f5.bigip import ManagementRoot - from icontrol.session import iControlUnexpectedHTTPError - HAS_F5SDK = True + from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError except ImportError: HAS_F5SDK = False -CHOICES = ['enabled', 'disabled'] -LEVELS = ['debug', 'debug1', 'debug2', 'debug3', 'error', 'fatal', 'info', - 'quiet', 'verbose'] +class Parameters(AnsibleF5Parameters): + api_map = { + 'bannerText': 'banner_text', + 'inactivityTimeout': 'inactivity_timeout', + 'logLevel': 'log_level' + } -class BigIpDeviceSshd(object): - def __init__(self, *args, **kwargs): - if not HAS_F5SDK: - raise F5ModuleError("The python f5-sdk module is required") + api_attributes = [ + 'allow', 'banner', 'bannerText', 'inactivityTimeout', 'logLevel', + 'login', 'port' + ] - # The params that change in the module - self.cparams = dict() + updatables = [ + 'allow', 'banner', 'banner_text', 'inactivity_timeout', 'log_level', + 'login', 'port' + ] - # Stores the params that are sent to the module - self.params = kwargs - self.api = ManagementRoot(kwargs['server'], - kwargs['user'], - kwargs['password'], - port=kwargs['server_port']) + returnables = [ + 'allow', 'banner', 'banner_text', 'inactivity_timeout', 'log_level', + 'login', 'port' + ] - def update(self): - changed = False - current = self.read() - params = dict() + def to_return(self): + result = {} + for returnable in self.returnables: + result[returnable] = getattr(self, returnable) + result = self._filter_params(result) + return result - allow = self.params['allow'] - banner = self.params['banner'] - banner_text = self.params['banner_text'] - timeout = self.params['inactivity_timeout'] - log_level = self.params['log_level'] - login = self.params['login'] - port = self.params['port'] - check_mode = self.params['check_mode'] - - if allow: - if 'allow' in current: - items = set(allow) - if items != current['allow']: - params['allow'] = list(items) + def api_params(self): + result = {} + for api_attribute in self.api_attributes: + if self.api_map is not None and api_attribute in self.api_map: + result[api_attribute] = getattr(self, self.api_map[api_attribute]) else: - params['allow'] = allow + result[api_attribute] = getattr(self, api_attribute) + result = self._filter_params(result) + return result - if banner: - if 'banner' in current: - if banner != current['banner']: - params['banner'] = banner - else: - params['banner'] = banner + @property + def inactivity_timeout(self): + if self._values['inactivity_timeout'] is None: + return None + return int(self._values['inactivity_timeout']) - if banner_text: - if 'banner_text' in current: - if banner_text != current['banner_text']: - params['bannerText'] = banner_text - else: - params['bannerText'] = banner_text + @property + def port(self): + if self._values['port'] is None: + return None + return int(self._values['port']) - if timeout: - if 'inactivity_timeout' in current: - if timeout != current['inactivity_timeout']: - params['inactivityTimeout'] = timeout - else: - params['inactivityTimeout'] = timeout + @property + def allow(self): + if self._values['allow'] is None: + return None + allow = self._values['allow'] + return list(set([str(x) for x in allow])) - if log_level: - if 'log_level' in current: - if log_level != current['log_level']: - params['logLevel'] = log_level - else: - params['logLevel'] = log_level - if login: - if 'login' in current: - if login != current['login']: - params['login'] = login - else: - params['login'] = login +class ModuleManager(object): + def __init__(self, client): + self.client = client + self.have = None + self.want = Parameters(self.client.module.params) + self.changes = Parameters() - if port: - if 'port' in current: - if port != current['port']: - params['port'] = port - else: - params['port'] = port + def _update_changed_options(self): + changed = {} + for key in Parameters.updatables: + if getattr(self.want, key) is not None: + attr1 = getattr(self.want, key) + attr2 = getattr(self.have, key) + if attr1 != attr2: + changed[key] = attr1 + if changed: + self.changes = Parameters(changed) + return True + return False - if params: - changed = True - if check_mode: - return changed - self.cparams = camel_dict_to_snake_dict(params) - else: - return changed - - r = self.api.tm.sys.sshd.load() - r.update(**params) - r.refresh() - - return changed - - def read(self): - """Read information and transform it - - The values that are returned by BIG-IP in the f5-sdk can have encoding - attached to them as well as be completely missing in some cases. - - Therefore, this method will transform the data from the BIG-IP into a - format that is more easily consumable by the rest of the class and the - parameters that are supported by the module. - """ - p = dict() - r = self.api.tm.sys.sshd.load() - - if hasattr(r, 'allow'): - # Deliberately using sets to suppress duplicates - p['allow'] = set([str(x) for x in r.allow]) - if hasattr(r, 'banner'): - p['banner'] = str(r.banner) - if hasattr(r, 'bannerText'): - p['banner_text'] = str(r.bannerText) - if hasattr(r, 'inactivityTimeout'): - p['inactivity_timeout'] = str(r.inactivityTimeout) - if hasattr(r, 'logLevel'): - p['log_level'] = str(r.logLevel) - if hasattr(r, 'login'): - p['login'] = str(r.login) - if hasattr(r, 'port'): - p['port'] = int(r.port) - return p - - def flush(self): + def exec_module(self): result = dict() - changed = False try: changed = self.update() except iControlUnexpectedHTTPError as e: raise F5ModuleError(str(e)) - result.update(**self.cparams) + changes = self.changes.to_return() + result.update(**changes) result.update(dict(changed=changed)) return result + def read_current_from_device(self): + resource = self.client.api.tm.sys.sshd.load() + result = resource.attrs + return Parameters(result) + + def update(self): + self.have = self.read_current_from_device() + if not self.should_update(): + return False + if self.client.check_mode: + return True + self.update_on_device() + return True + + def should_update(self): + result = self._update_changed_options() + if result: + return True + return False + + def update_on_device(self): + params = self.want.api_params() + resource = self.client.api.tm.sys.sshd.load() + resource.update(**params) + + +class ArgumentSpec(object): + def __init__(self): + self.choices = ['enabled', 'disabled'] + self.levels = [ + 'debug', 'debug1', 'debug2', 'debug3', 'error', 'fatal', 'info', + 'quiet', 'verbose' + ] + self.supports_check_mode = True + self.argument_spec = dict( + allow=dict( + required=False, + default=None, + type='list' + ), + banner=dict( + required=False, + default=None, + choices=self.choices + ), + banner_text=dict( + required=False, + default=None + ), + inactivity_timeout=dict( + required=False, + default=None, + type='int' + ), + log_level=dict( + required=False, + default=None, + choices=self.levels + ), + login=dict( + required=False, + default=None, + choices=self.choices + ), + port=dict( + required=False, + default=None, + type='int' + ), + state=dict( + default='present', + choices=['present'] + ) + ) + self.f5_product_name = 'bigip' + def main(): - argument_spec = f5_argument_spec() + if not HAS_F5SDK: + raise F5ModuleError("The python f5-sdk module is required") - meta_args = dict( - allow=dict(required=False, default=None, type='list'), - banner=dict(required=False, default=None, choices=CHOICES), - banner_text=dict(required=False, default=None), - inactivity_timeout=dict(required=False, default=None, type='int'), - log_level=dict(required=False, default=None, choices=LEVELS), - login=dict(required=False, default=None, choices=CHOICES), - port=dict(required=False, default=None, type='int'), - state=dict(default='present', choices=['present']) - ) - argument_spec.update(meta_args) + spec = ArgumentSpec() - module = AnsibleModule( - argument_spec=argument_spec, - supports_check_mode=True + client = AnsibleF5Client( + argument_spec=spec.argument_spec, + supports_check_mode=spec.supports_check_mode, + f5_product_name=spec.f5_product_name ) try: - obj = BigIpDeviceSshd(check_mode=module.check_mode, **module.params) - result = obj.flush() - - module.exit_json(**result) + mm = ModuleManager(client) + results = mm.exec_module() + client.module.exit_json(**results) except F5ModuleError as e: - module.fail_json(msg=str(e)) - -from ansible.module_utils.basic import * -from ansible.module_utils.ec2 import camel_dict_to_snake_dict -from ansible.module_utils.f5_utils import * + client.module.fail_json(msg=str(e)) if __name__ == '__main__': main() diff --git a/test/units/modules/network/f5/test_bigip_device_sshd.py b/test/units/modules/network/f5/test_bigip_device_sshd.py new file mode 100644 index 0000000000..2a7a748924 --- /dev/null +++ b/test/units/modules/network/f5/test_bigip_device_sshd.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 F5 Networks Inc. +# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +import json +import sys + +from nose.plugins.skip import SkipTest +if sys.version_info < (2, 7): + raise SkipTest("F5 Ansible modules require Python >= 2.7") + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import Mock +from ansible.compat.tests.mock import patch +from ansible.module_utils.f5_utils import AnsibleF5Client + +try: + from library.bigip_device_sshd import Parameters + from library.bigip_device_sshd import ModuleManager + from library.bigip_device_sshd import ArgumentSpec + from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError + from test.unit.modules.utils import set_module_args +except ImportError: + try: + from ansible.modules.network.f5.bigip_device_sshd import Parameters + from ansible.modules.network.f5.bigip_device_sshd import ModuleManager + from ansible.modules.network.f5.bigip_device_sshd import ArgumentSpec + from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError + from units.modules.utils import set_module_args + except ImportError: + raise SkipTest("F5 Ansible modules require the f5-sdk Python library") + +fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures') +fixture_data = {} + + +def load_fixture(name): + path = os.path.join(fixture_path, name) + + if path in fixture_data: + return fixture_data[path] + + with open(path) as f: + data = f.read() + + try: + data = json.loads(data) + except Exception: + pass + + fixture_data[path] = data + return data + + +class TestParameters(unittest.TestCase): + def test_module_parameters(self): + args = dict( + allow=['all'], + banner='enabled', + banner_text='asdf', + inactivity_timeout='100', + log_level='debug', + login='enabled', + port=1010, + server='localhost', + user='admin', + password='password' + ) + p = Parameters(args) + assert p.allow == ['all'] + assert p.banner == 'enabled' + assert p.banner_text == 'asdf' + assert p.inactivity_timeout == 100 + assert p.log_level == 'debug' + assert p.login == 'enabled' + assert p.port == 1010 + + +class TestManager(unittest.TestCase): + + def setUp(self): + self.spec = ArgumentSpec() + + @patch('ansible.module_utils.f5_utils.AnsibleF5Client._get_mgmt_root', + return_value=True) + def test_update_settings(self, *args): + set_module_args(dict( + allow=['all'], + banner='enabled', + banner_text='asdf', + inactivity_timeout='100', + log_level='debug', + login='enabled', + port=1010, + server='localhost', + user='admin', + password='password' + )) + + # Configure the parameters that would be returned by querying the + # remote device + current = Parameters( + dict( + allow=['172.27.1.1'] + ) + ) + + client = AnsibleF5Client( + argument_spec=self.spec.argument_spec, + supports_check_mode=self.spec.supports_check_mode, + f5_product_name=self.spec.f5_product_name + ) + mm = ModuleManager(client) + + # Override methods to force specific logic in the module to happen + mm.update_on_device = Mock(return_value=True) + mm.read_current_from_device = Mock(return_value=current) + + results = mm.exec_module() + + assert results['changed'] is True + assert results['allow'] == ['all']