From 31645ded1171b68a4af1dbc8ec93ad9f49de823b Mon Sep 17 00:00:00 2001 From: justchris1 <30219018+justchris1@users.noreply.github.com> Date: Sun, 11 Apr 2021 09:25:03 -0400 Subject: [PATCH] Added modules ipa_otpconfig and ipa_otptoken (#2122) * Added module for ipa_otpconfig * Make no_log=False explicit. * Updated inputs to be int type instead of strings to align to expected inputs. Updated output message * Add changelog fragment * Remove changelog fragment as this is a new module * Update plugins/modules/identity/ipa/ipa_otpconfig.py Add version_added field to module description. Co-authored-by: Felix Fontein * Updated punctuation in examples * Add unit test for ipa_otpconfig * Add ipa_otptoken module with unit test * Updated documentation in unit test * Update plugins/modules/identity/ipa/ipa_otpconfig.py Co-authored-by: Felix Fontein * Update plugins/modules/identity/ipa/ipa_otpconfig.py Co-authored-by: Felix Fontein * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein * Added some documentation updates to make it conform to ansible standards * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein * Address review comments Co-authored-by: Chris Costa Co-authored-by: Felix Fontein --- plugins/module_utils/ipa.py | 4 +- plugins/modules/identity/ipa/ipa_otpconfig.py | 172 ++++++ plugins/modules/identity/ipa/ipa_otptoken.py | 527 ++++++++++++++++++ plugins/modules/ipa_otpconfig.py | 1 + plugins/modules/ipa_otptoken.py | 1 + .../identity/ipa/test_ipa_otpconfig.py | 406 ++++++++++++++ .../modules/identity/ipa/test_ipa_otptoken.py | 495 ++++++++++++++++ 7 files changed, 1604 insertions(+), 2 deletions(-) create mode 100644 plugins/modules/identity/ipa/ipa_otpconfig.py create mode 100644 plugins/modules/identity/ipa/ipa_otptoken.py create mode 120000 plugins/modules/ipa_otpconfig.py create mode 120000 plugins/modules/ipa_otptoken.py create mode 100644 tests/unit/plugins/modules/identity/ipa/test_ipa_otpconfig.py create mode 100644 tests/unit/plugins/modules/identity/ipa/test_ipa_otptoken.py diff --git a/plugins/module_utils/ipa.py b/plugins/module_utils/ipa.py index 9eb9f406f6..b2b1a892cd 100644 --- a/plugins/module_utils/ipa.py +++ b/plugins/module_utils/ipa.py @@ -119,9 +119,9 @@ class IPAClient(object): data = dict(method=method) # TODO: We should probably handle this a little better. - if method in ('ping', 'config_show'): + if method in ('ping', 'config_show', 'otpconfig_show'): data['params'] = [[], {}] - elif method == 'config_mod': + elif method in ('config_mod', 'otpconfig_mod'): data['params'] = [[], item] else: data['params'] = [[name], item] diff --git a/plugins/modules/identity/ipa/ipa_otpconfig.py b/plugins/modules/identity/ipa/ipa_otpconfig.py new file mode 100644 index 0000000000..84a9e969cb --- /dev/null +++ b/plugins/modules/identity/ipa/ipa_otpconfig.py @@ -0,0 +1,172 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# Copyright: (c) 2021, Ansible Project +# Heavily influenced from Fran Fitzpatrick ipa_config module +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +DOCUMENTATION = r''' +--- +module: ipa_otpconfig +author: justchris1 (@justchris1) +short_description: Manage FreeIPA OTP Configuration Settings +version_added: 2.5.0 +description: +- Modify global configuration settings of a FreeIPA Server with respect to OTP (One Time Passwords). +options: + ipatokentotpauthwindow: + description: TOTP authentication window in seconds. + aliases: ["totpauthwindow"] + type: int + ipatokentotpsyncwindow: + description: TOTP synchronization window in seconds. + aliases: ["totpsyncwindow"] + type: int + ipatokenhotpauthwindow: + description: HOTP authentication window in number of hops. + aliases: ["hotpauthwindow"] + type: int + ipatokenhotpsyncwindow: + description: HOTP synchronization window in hops. + aliases: ["hotpsyncwindow"] + type: int +extends_documentation_fragment: +- community.general.ipa.documentation + +''' + +EXAMPLES = r''' +- name: Ensure the TOTP authentication window is set to 300 seconds + community.general.ipa_otpconfig: + ipatokentotpauthwindow: '300' + ipa_host: localhost + ipa_user: admin + ipa_pass: supersecret + +- name: Ensure the TOTP syncronization window is set to 86400 seconds + community.general.ipa_otpconfig: + ipatokentotpsyncwindow: '86400' + ipa_host: localhost + ipa_user: admin + ipa_pass: supersecret + +- name: Ensure the HOTP authentication window is set to 10 hops + community.general.ipa_otpconfig: + ipatokenhotpauthwindow: '10' + ipa_host: localhost + ipa_user: admin + ipa_pass: supersecret + +- name: Ensure the HOTP syncronization window is set to 100 hops + community.general.ipa_otpconfig: + ipatokenhotpsyncwindow: '100' + ipa_host: localhost + ipa_user: admin + ipa_pass: supersecret +''' + +RETURN = r''' +otpconfig: + description: OTP configuration as returned by IPA API. + returned: always + type: dict +''' + +import traceback + +from ansible.module_utils.basic import AnsibleModule +from ansible_collections.community.general.plugins.module_utils.ipa import IPAClient, ipa_argument_spec +from ansible.module_utils._text import to_native + + +class OTPConfigIPAClient(IPAClient): + def __init__(self, module, host, port, protocol): + super(OTPConfigIPAClient, self).__init__(module, host, port, protocol) + + def otpconfig_show(self): + return self._post_json(method='otpconfig_show', name=None) + + def otpconfig_mod(self, name, item): + return self._post_json(method='otpconfig_mod', name=name, item=item) + + +def get_otpconfig_dict(ipatokentotpauthwindow=None, ipatokentotpsyncwindow=None, + ipatokenhotpauthwindow=None, ipatokenhotpsyncwindow=None): + + config = {} + if ipatokentotpauthwindow is not None: + config['ipatokentotpauthwindow'] = str(ipatokentotpauthwindow) + if ipatokentotpsyncwindow is not None: + config['ipatokentotpsyncwindow'] = str(ipatokentotpsyncwindow) + if ipatokenhotpauthwindow is not None: + config['ipatokenhotpauthwindow'] = str(ipatokenhotpauthwindow) + if ipatokenhotpsyncwindow is not None: + config['ipatokenhotpsyncwindow'] = str(ipatokenhotpsyncwindow) + + return config + + +def get_otpconfig_diff(client, ipa_config, module_config): + return client.get_diff(ipa_data=ipa_config, module_data=module_config) + + +def ensure(module, client): + module_otpconfig = get_otpconfig_dict( + ipatokentotpauthwindow=module.params.get('ipatokentotpauthwindow'), + ipatokentotpsyncwindow=module.params.get('ipatokentotpsyncwindow'), + ipatokenhotpauthwindow=module.params.get('ipatokenhotpauthwindow'), + ipatokenhotpsyncwindow=module.params.get('ipatokenhotpsyncwindow'), + ) + ipa_otpconfig = client.otpconfig_show() + diff = get_otpconfig_diff(client, ipa_otpconfig, module_otpconfig) + + changed = False + new_otpconfig = {} + for module_key in diff: + if module_otpconfig.get(module_key) != ipa_otpconfig.get(module_key, None): + changed = True + new_otpconfig.update({module_key: module_otpconfig.get(module_key)}) + + if changed and not module.check_mode: + client.otpconfig_mod(name=None, item=new_otpconfig) + + return changed, client.otpconfig_show() + + +def main(): + argument_spec = ipa_argument_spec() + argument_spec.update( + ipatokentotpauthwindow=dict(type='int', aliases=['totpauthwindow'], no_log=False), + ipatokentotpsyncwindow=dict(type='int', aliases=['totpsyncwindow'], no_log=False), + ipatokenhotpauthwindow=dict(type='int', aliases=['hotpauthwindow'], no_log=False), + ipatokenhotpsyncwindow=dict(type='int', aliases=['hotpsyncwindow'], no_log=False), + ) + + module = AnsibleModule( + argument_spec=argument_spec, + supports_check_mode=True + ) + + client = OTPConfigIPAClient( + module=module, + host=module.params['ipa_host'], + port=module.params['ipa_port'], + protocol=module.params['ipa_prot'] + ) + + try: + client.login( + username=module.params['ipa_user'], + password=module.params['ipa_pass'] + ) + changed, otpconfig = ensure(module, client) + except Exception as e: + module.fail_json(msg=to_native(e), exception=traceback.format_exc()) + + module.exit_json(changed=changed, otpconfig=otpconfig) + + +if __name__ == '__main__': + main() diff --git a/plugins/modules/identity/ipa/ipa_otptoken.py b/plugins/modules/identity/ipa/ipa_otptoken.py new file mode 100644 index 0000000000..f8f48d68a6 --- /dev/null +++ b/plugins/modules/identity/ipa/ipa_otptoken.py @@ -0,0 +1,527 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# Copyright: (c) 2017, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +DOCUMENTATION = r''' +--- +module: ipa_otptoken +author: justchris1 (@justchris1) +short_description: Manage FreeIPA OTPs +version_added: 2.5.0 +description: +- Add, modify, and delete One Time Passwords in IPA. +options: + uniqueid: + description: Unique ID of the token in IPA. + required: true + aliases: ["name"] + type: str + newuniqueid: + description: If specified, the unique id specified will be changed to this. + type: str + otptype: + description: + - Type of OTP. + - "B(Note:) Cannot be modified after OTP is created." + type: str + choices: [ totp, hotp ] + secretkey: + description: + - Token secret (Base64). + - If OTP is created and this is not specified, a random secret will be generated by IPA. + - "B(Note:) Cannot be modified after OTP is created." + type: str + description: + description: Description of the token (informational only). + type: str + owner: + description: Assigned user of the token. + type: str + enabled: + description: Mark the token as enabled (default C(true)). + default: true + type: bool + notbefore: + description: + - First date/time the token can be used. + - In the format C(YYYYMMddHHmmss). + - For example, C(20180121182022) will allow the token to be used starting on 21 January 2018 at 18:20:22. + type: str + notafter: + description: + - Last date/time the token can be used. + - In the format C(YYYYMMddHHmmss). + - For example, C(20200121182022) will allow the token to be used until 21 January 2020 at 18:20:22. + type: str + vendor: + description: Token vendor name (informational only). + type: str + model: + description: Token model (informational only). + type: str + serial: + description: Token serial (informational only). + type: str + state: + description: State to ensure. + choices: ['present', 'absent'] + default: 'present' + type: str + algorithm: + description: + - Token hash algorithm. + - "B(Note:) Cannot be modified after OTP is created." + choices: ['sha1', 'sha256', 'sha384', 'sha512'] + type: str + digits: + description: + - Number of digits each token code will have. + - "B(Note:) Cannot be modified after OTP is created." + choices: [ 6, 8 ] + type: int + offset: + description: + - TOTP token / IPA server time difference. + - "B(Note:) Cannot be modified after OTP is created." + type: int + interval: + description: + - Length of TOTP token code validity in seconds. + - "B(Note:) Cannot be modified after OTP is created." + type: int + counter: + description: + - Initial counter for the HOTP token. + - "B(Note:) Cannot be modified after OTP is created." + type: int +extends_documentation_fragment: +- community.general.ipa.documentation +''' + +EXAMPLES = r''' +- name: Create a totp for pinky, allowing the IPA server to generate using defaults + community.general.ipa_otptoken: + uniqueid: Token123 + otptype: totp + owner: pinky + ipa_host: ipa.example.com + ipa_user: admin + ipa_pass: topsecret + +- name: Create a 8 digit hotp for pinky with sha256 with specified validity times + community.general.ipa_otptoken: + uniqueid: Token123 + enabled: true + otptype: hotp + digits: 8 + secretkey: UMKSIER00zT2T2tWMUlTRmNlekRCbFQvWFBVZUh2dElHWGR6T3VUR3IzK2xjaFk9 + algorithm: sha256 + notbefore: 20180121182123 + notafter: 20220121182123 + owner: pinky + ipa_host: ipa.example.com + ipa_user: admin + ipa_pass: topsecret + +- name: Update Token123 to indicate a vendor, model, serial number (info only), and description + community.general.ipa_otptoken: + uniqueid: Token123 + vendor: Acme + model: acme101 + serial: SerialNumber1 + description: Acme OTP device + ipa_host: ipa.example.com + ipa_user: admin + ipa_pass: topsecret + +- name: Disable Token123 + community.general.ipa_otptoken: + uniqueid: Token123 + enabled: false + ipa_host: ipa.example.com + ipa_user: admin + ipa_pass: topsecret + +- name: Rename Token123 to TokenABC and enable it + community.general.ipa_otptoken: + uniqueid: Token123 + newuniqueid: TokenABC + enabled: true + ipa_host: ipa.example.com + ipa_user: admin + ipa_pass: topsecret +''' + +RETURN = r''' +otptoken: + description: OTP Token as returned by IPA API + returned: always + type: dict +''' + +import base64 +import traceback + +from ansible.module_utils.basic import AnsibleModule, sanitize_keys +from ansible_collections.community.general.plugins.module_utils.ipa import IPAClient, ipa_argument_spec +from ansible.module_utils._text import to_native + + +class OTPTokenIPAClient(IPAClient): + def __init__(self, module, host, port, protocol): + super(OTPTokenIPAClient, self).__init__(module, host, port, protocol) + + def otptoken_find(self, name): + return self._post_json(method='otptoken_find', name=None, item={'all': True, + 'ipatokenuniqueid': name, + 'timelimit': '0', + 'sizelimit': '0'}) + + def otptoken_add(self, name, item): + return self._post_json(method='otptoken_add', name=name, item=item) + + def otptoken_mod(self, name, item): + return self._post_json(method='otptoken_mod', name=name, item=item) + + def otptoken_del(self, name): + return self._post_json(method='otptoken_del', name=name) + + +def base64_to_base32(base64_string): + """Converts base64 string to base32 string""" + b32_string = base64.b32encode(base64.b64decode(base64_string)).decode('ascii') + return b32_string + + +def base32_to_base64(base32_string): + """Converts base32 string to base64 string""" + b64_string = base64.b64encode(base64.b32decode(base32_string)).decode('ascii') + return b64_string + + +def get_otptoken_dict(ansible_to_ipa, uniqueid=None, newuniqueid=None, otptype=None, secretkey=None, description=None, owner=None, + enabled=None, notbefore=None, notafter=None, vendor=None, + model=None, serial=None, algorithm=None, digits=None, offset=None, + interval=None, counter=None): + """Create the dictionary of settings passed in""" + + otptoken = {} + if uniqueid is not None: + otptoken[ansible_to_ipa['uniqueid']] = uniqueid + if newuniqueid is not None: + otptoken[ansible_to_ipa['newuniqueid']] = newuniqueid + if otptype is not None: + otptoken[ansible_to_ipa['otptype']] = otptype.upper() + if secretkey is not None: + # For some unknown reason, while IPA returns the secret in base64, + # it wants the secret passed in as base32. This makes it more difficult + # for comparison (does 'current' equal to 'new'). Moreover, this may + # cause some subtle issue in a playbook as the output is encoded + # in a different way than if it was passed in as a parameter. For + # these reasons, have the module standardize on base64 input (as parameter) + # and output (from IPA). + otptoken[ansible_to_ipa['secretkey']] = base64_to_base32(secretkey) + if description is not None: + otptoken[ansible_to_ipa['description']] = description + if owner is not None: + otptoken[ansible_to_ipa['owner']] = owner + if enabled is not None: + otptoken[ansible_to_ipa['enabled']] = 'FALSE' if enabled else 'TRUE' + if notbefore is not None: + otptoken[ansible_to_ipa['notbefore']] = notbefore + 'Z' + if notafter is not None: + otptoken[ansible_to_ipa['notafter']] = notafter + 'Z' + if vendor is not None: + otptoken[ansible_to_ipa['vendor']] = vendor + if model is not None: + otptoken[ansible_to_ipa['model']] = model + if serial is not None: + otptoken[ansible_to_ipa['serial']] = serial + if algorithm is not None: + otptoken[ansible_to_ipa['algorithm']] = algorithm + if digits is not None: + otptoken[ansible_to_ipa['digits']] = str(digits) + if offset is not None: + otptoken[ansible_to_ipa['offset']] = str(offset) + if interval is not None: + otptoken[ansible_to_ipa['interval']] = str(interval) + if counter is not None: + otptoken[ansible_to_ipa['counter']] = str(counter) + + return otptoken + + +def transform_output(ipa_otptoken, ansible_to_ipa, ipa_to_ansible): + """Transform the output received by IPA to a format more friendly + before it is returned to the user. IPA returns even simple + strings as a list of strings. It also returns bools and + int as string. This function cleans that up before return. + """ + updated_otptoken = ipa_otptoken + + # Used to hold values that will be sanitized from output as no_log. + # For the case where secretkey is not specified at the module, but + # is passed back from IPA. + sanitize_strings = set() + + # Rename the IPA parameters to the more friendly ansible module names for them + for ipa_parameter in ipa_to_ansible: + if ipa_parameter in ipa_otptoken: + updated_otptoken[ipa_to_ansible[ipa_parameter]] = ipa_otptoken[ipa_parameter] + updated_otptoken.pop(ipa_parameter) + + # Change the type from IPA's list of string to the appropriate return value type + # based on field. By default, assume they should be strings. + for ansible_parameter in ansible_to_ipa: + if ansible_parameter in updated_otptoken: + if isinstance(updated_otptoken[ansible_parameter], list) and len(updated_otptoken[ansible_parameter]) == 1: + if ansible_parameter in ['digits', 'offset', 'interval', 'counter']: + updated_otptoken[ansible_parameter] = int(updated_otptoken[ansible_parameter][0]) + elif ansible_parameter == 'enabled': + updated_otptoken[ansible_parameter] = bool(updated_otptoken[ansible_parameter][0]) + else: + updated_otptoken[ansible_parameter] = updated_otptoken[ansible_parameter][0] + + if 'secretkey' in updated_otptoken: + if isinstance(updated_otptoken['secretkey'], dict): + if '__base64__' in updated_otptoken['secretkey']: + sanitize_strings.add(updated_otptoken['secretkey']['__base64__']) + b64key = updated_otptoken['secretkey']['__base64__'] + updated_otptoken.pop('secretkey') + updated_otptoken['secretkey'] = b64key + sanitize_strings.add(b64key) + elif '__base32__' in updated_otptoken['secretkey']: + sanitize_strings.add(updated_otptoken['secretkey']['__base32__']) + b32key = updated_otptoken['secretkey']['__base32__'] + b64key = base32_to_base64(b32key) + updated_otptoken.pop('secretkey') + updated_otptoken['secretkey'] = b64key + sanitize_strings.add(b32key) + sanitize_strings.add(b64key) + + return updated_otptoken, sanitize_strings + + +def validate_modifications(ansible_to_ipa, module, ipa_otptoken, + module_otptoken, unmodifiable_after_creation): + """Checks to see if the requested modifications are valid. Some elements + cannot be modified after initial creation. However, we still want to + validate arguments that are specified, but are not different than what + is currently set on the server. + """ + + modifications_valid = True + + for parameter in unmodifiable_after_creation: + if ansible_to_ipa[parameter] in module_otptoken and ansible_to_ipa[parameter] in ipa_otptoken: + mod_value = module_otptoken[ansible_to_ipa[parameter]] + + # For someone unknown reason, the returns from IPA put almost all + # values in a list, even though passing them in a list (even of + # length 1) will be rejected. The module values for all elements + # other than type (totp or hotp) have this happen. + if parameter == 'otptype': + ipa_value = ipa_otptoken[ansible_to_ipa[parameter]] + else: + if len(ipa_otptoken[ansible_to_ipa[parameter]]) != 1: + module.fail_json(msg=("Invariant fail: Return value from IPA is not a list " + + "of length 1. Please open a bug report for the module.")) + if parameter == 'secretkey': + # We stored the secret key in base32 since we had assumed that would need to + # be the format if we were contacting IPA to create it. However, we are + # now comparing it against what is already set in the IPA server, so convert + # back to base64 for comparison. + mod_value = base32_to_base64(mod_value) + + # For the secret key, it is even more specific in that the key is returned + # in a dict, in the list, as the __base64__ entry for the IPA response. + ipa_value = ipa_otptoken[ansible_to_ipa[parameter]][0]['__base64__'] + if '__base64__' in ipa_otptoken[ansible_to_ipa[parameter]][0]: + ipa_value = ipa_otptoken[ansible_to_ipa[parameter]][0]['__base64__'] + elif '__base32__' in ipa_otptoken[ansible_to_ipa[parameter]][0]: + b32key = ipa_otptoken[ansible_to_ipa[parameter]][0]['__base32__'] + b64key = base32_to_base64(b32key) + ipa_value = b64key + else: + ipa_value = None + else: + ipa_value = ipa_otptoken[ansible_to_ipa[parameter]][0] + + if mod_value != ipa_value: + modifications_valid = False + fail_message = ("Parameter '" + parameter + "' cannot be changed once " + + "the OTP is created and the requested value specified here (" + + str(mod_value) + + ") differs from what is set in the IPA server (" + + str(ipa_value) + ")") + module.fail_json(msg=fail_message) + + return modifications_valid + + +def ensure(module, client): + # dict to map from ansible parameter names to attribute names + # used by IPA (which are not so friendly). + ansible_to_ipa = {'uniqueid': 'ipatokenuniqueid', + 'newuniqueid': 'rename', + 'otptype': 'type', + 'secretkey': 'ipatokenotpkey', + 'description': 'description', + 'owner': 'ipatokenowner', + 'enabled': 'ipatokendisabled', + 'notbefore': 'ipatokennotbefore', + 'notafter': 'ipatokennotafter', + 'vendor': 'ipatokenvendor', + 'model': 'ipatokenmodel', + 'serial': 'ipatokenserial', + 'algorithm': 'ipatokenotpalgorithm', + 'digits': 'ipatokenotpdigits', + 'offset': 'ipatokentotpclockoffset', + 'interval': 'ipatokentotptimestep', + 'counter': 'ipatokenhotpcounter'} + + # Create inverse dictionary for mapping return values + ipa_to_ansible = {} + for (k, v) in ansible_to_ipa.items(): + ipa_to_ansible[v] = k + + unmodifiable_after_creation = ['otptype', 'secretkey', 'algorithm', + 'digits', 'offset', 'interval', 'counter'] + state = module.params['state'] + uniqueid = module.params['uniqueid'] + + module_otptoken = get_otptoken_dict(ansible_to_ipa=ansible_to_ipa, + uniqueid=module.params.get('uniqueid'), + newuniqueid=module.params.get('newuniqueid'), + otptype=module.params.get('otptype'), + secretkey=module.params.get('secretkey'), + description=module.params.get('description'), + owner=module.params.get('owner'), + enabled=module.params.get('enabled'), + notbefore=module.params.get('notbefore'), + notafter=module.params.get('notafter'), + vendor=module.params.get('vendor'), + model=module.params.get('model'), + serial=module.params.get('serial'), + algorithm=module.params.get('algorithm'), + digits=module.params.get('digits'), + offset=module.params.get('offset'), + interval=module.params.get('interval'), + counter=module.params.get('counter')) + + ipa_otptoken = client.otptoken_find(name=uniqueid) + + if ansible_to_ipa['newuniqueid'] in module_otptoken: + # Check to see if the new unique id is already taken in use + ipa_otptoken_new = client.otptoken_find(name=module_otptoken[ansible_to_ipa['newuniqueid']]) + if ipa_otptoken_new: + module.fail_json(msg=("Requested rename through newuniqueid to " + + module_otptoken[ansible_to_ipa['newuniqueid']] + + " failed because the new unique id is already in use")) + + changed = False + if state == 'present': + if not ipa_otptoken: + changed = True + if not module.check_mode: + # It would not make sense to have a rename after creation, so if the user + # specified a newuniqueid, just replace the uniqueid with the updated one + # before creation + if ansible_to_ipa['newuniqueid'] in module_otptoken: + module_otptoken[ansible_to_ipa['uniqueid']] = module_otptoken[ansible_to_ipa['newuniqueid']] + uniqueid = module_otptoken[ansible_to_ipa['newuniqueid']] + module_otptoken.pop(ansible_to_ipa['newuniqueid']) + + # IPA wants the unique id in the first position and not as a key/value pair. + # Get rid of it from the otptoken dict and just specify it in the name field + # for otptoken_add. + if ansible_to_ipa['uniqueid'] in module_otptoken: + module_otptoken.pop(ansible_to_ipa['uniqueid']) + + module_otptoken['all'] = True + ipa_otptoken = client.otptoken_add(name=uniqueid, item=module_otptoken) + else: + if not(validate_modifications(ansible_to_ipa, module, ipa_otptoken, + module_otptoken, unmodifiable_after_creation)): + module.fail_json(msg="Modifications requested in module are not valid") + + # IPA will reject 'modifications' that do not actually modify anything + # if any of the unmodifiable elements are specified. Explicitly + # get rid of them here. They were not different or else the + # we would have failed out in validate_modifications. + for x in unmodifiable_after_creation: + if ansible_to_ipa[x] in module_otptoken: + module_otptoken.pop(ansible_to_ipa[x]) + + diff = client.get_diff(ipa_data=ipa_otptoken, module_data=module_otptoken) + if len(diff) > 0: + changed = True + if not module.check_mode: + + # IPA wants the unique id in the first position and not as a key/value pair. + # Get rid of it from the otptoken dict and just specify it in the name field + # for otptoken_mod. + if ansible_to_ipa['uniqueid'] in module_otptoken: + module_otptoken.pop(ansible_to_ipa['uniqueid']) + + module_otptoken['all'] = True + ipa_otptoken = client.otptoken_mod(name=uniqueid, item=module_otptoken) + else: + if ipa_otptoken: + changed = True + if not module.check_mode: + client.otptoken_del(name=uniqueid) + + # Transform the output to use ansible keywords (not the IPA keywords) and + # sanitize any key values in the output. + ipa_otptoken, sanitize_strings = transform_output(ipa_otptoken, ansible_to_ipa, ipa_to_ansible) + module.no_log_values = module.no_log_values.union(sanitize_strings) + sanitized_otptoken = sanitize_keys(obj=ipa_otptoken, no_log_strings=module.no_log_values) + return changed, sanitized_otptoken + + +def main(): + argument_spec = ipa_argument_spec() + argument_spec.update(uniqueid=dict(type='str', aliases=['name'], required=True), + newuniqueid=dict(type='str'), + otptype=dict(type='str', choices=['totp', 'hotp']), + secretkey=dict(type='str', no_log=True), + description=dict(type='str'), + owner=dict(type='str'), + enabled=dict(type='bool', default=True), + notbefore=dict(type='str'), + notafter=dict(type='str'), + vendor=dict(type='str'), + model=dict(type='str'), + serial=dict(type='str'), + state=dict(type='str', choices=['present', 'absent'], default='present'), + algorithm=dict(type='str', choices=['sha1', 'sha256', 'sha384', 'sha512']), + digits=dict(type='int', choices=[6, 8]), + offset=dict(type='int'), + interval=dict(type='int'), + counter=dict(type='int')) + + module = AnsibleModule(argument_spec=argument_spec, + supports_check_mode=True) + + client = OTPTokenIPAClient(module=module, + host=module.params['ipa_host'], + port=module.params['ipa_port'], + protocol=module.params['ipa_prot']) + + try: + client.login(username=module.params['ipa_user'], + password=module.params['ipa_pass']) + changed, otptoken = ensure(module, client) + except Exception as e: + module.fail_json(msg=to_native(e), exception=traceback.format_exc()) + + module.exit_json(changed=changed, otptoken=otptoken) + + +if __name__ == '__main__': + main() diff --git a/plugins/modules/ipa_otpconfig.py b/plugins/modules/ipa_otpconfig.py new file mode 120000 index 0000000000..a29ef04412 --- /dev/null +++ b/plugins/modules/ipa_otpconfig.py @@ -0,0 +1 @@ +./identity/ipa/ipa_otpconfig.py \ No newline at end of file diff --git a/plugins/modules/ipa_otptoken.py b/plugins/modules/ipa_otptoken.py new file mode 120000 index 0000000000..cb0dbaf569 --- /dev/null +++ b/plugins/modules/ipa_otptoken.py @@ -0,0 +1 @@ +./identity/ipa/ipa_otptoken.py \ No newline at end of file diff --git a/tests/unit/plugins/modules/identity/ipa/test_ipa_otpconfig.py b/tests/unit/plugins/modules/identity/ipa/test_ipa_otpconfig.py new file mode 100644 index 0000000000..cae905942a --- /dev/null +++ b/tests/unit/plugins/modules/identity/ipa/test_ipa_otpconfig.py @@ -0,0 +1,406 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2020, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from contextlib import contextmanager + +from ansible_collections.community.general.tests.unit.compat import unittest +from ansible_collections.community.general.tests.unit.compat.mock import call, patch +from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args + +from ansible_collections.community.general.plugins.modules.identity.ipa import ipa_otpconfig + + +@contextmanager +def patch_ipa(**kwargs): + """Mock context manager for patching the methods in OTPConfigIPAClient that contact the IPA server + + Patches the `login` and `_post_json` methods + + Keyword arguments are passed to the mock object that patches `_post_json` + + No arguments are passed to the mock object that patches `login` because no tests require it + + Example:: + + with patch_ipa(return_value={}) as (mock_login, mock_post): + ... + """ + obj = ipa_otpconfig.OTPConfigIPAClient + with patch.object(obj, 'login') as mock_login: + with patch.object(obj, '_post_json', **kwargs) as mock_post: + yield mock_login, mock_post + + +class TestIPAOTPConfig(ModuleTestCase): + def setUp(self): + super(TestIPAOTPConfig, self).setUp() + self.module = ipa_otpconfig + + def _test_base(self, module_args, return_value, mock_calls, changed): + """Base function that's called by all the other test functions + + module_args (dict): + Arguments passed to the module + + return_value (dict): + Mocked return value of OTPConfigIPAClient.otpconfig_show, as returned by the IPA API. + This should be set to the current state. It will be changed to the desired state using the above arguments. + (Technically, this is the return value of _post_json, but it's only checked by otpconfig_show). + + mock_calls (list/tuple of dicts): + List of calls made to OTPConfigIPAClient._post_json, in order. + _post_json is called by all of the otpconfig_* methods of the class. + Pass an empty list if no calls are expected. + + changed (bool): + Whether or not the module is supposed to be marked as changed + """ + set_module_args(module_args) + + # Run the module + with patch_ipa(return_value=return_value) as (mock_login, mock_post): + with self.assertRaises(AnsibleExitJson) as exec_info: + self.module.main() + + # Verify that the calls to _post_json match what is expected + expected_call_count = len(mock_calls) + if expected_call_count > 1: + # Convert the call dicts to unittest.mock.call instances because `assert_has_calls` only accepts them + converted_calls = [] + for call_dict in mock_calls: + converted_calls.append(call(**call_dict)) + + mock_post.assert_has_calls(converted_calls) + self.assertEqual(len(mock_post.mock_calls), expected_call_count) + elif expected_call_count == 1: + mock_post.assert_called_once_with(**mock_calls[0]) + else: # expected_call_count is 0 + mock_post.assert_not_called() + + # Verify that the module's changed status matches what is expected + self.assertIs(exec_info.exception.args[0]['changed'], changed) + + def test_set_all_no_adjustment(self): + """Set values requiring no adjustment""" + module_args = { + 'ipatokentotpauthwindow': 11, + 'ipatokentotpsyncwindow': 12, + 'ipatokenhotpauthwindow': 13, + 'ipatokenhotpsyncwindow': 14 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = False + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_all_aliases_no_adjustment(self): + """Set values requiring no adjustment on all using aliases values""" + module_args = { + 'totpauthwindow': 11, + 'totpsyncwindow': 12, + 'hotpauthwindow': 13, + 'hotpsyncwindow': 14 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = False + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_totp_auth_window_no_adjustment(self): + """Set values requiring no adjustment on totpauthwindow""" + module_args = { + 'totpauthwindow': 11 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = False + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_totp_sync_window_no_adjustment(self): + """Set values requiring no adjustment on totpsyncwindow""" + module_args = { + 'totpsyncwindow': 12 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = False + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_hotp_auth_window_no_adjustment(self): + """Set values requiring no adjustment on hotpauthwindow""" + module_args = { + 'hotpauthwindow': 13 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = False + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_hotp_sync_window_no_adjustment(self): + """Set values requiring no adjustment on hotpsyncwindow""" + module_args = { + 'hotpsyncwindow': 14 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = False + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_totp_auth_window(self): + """Set values requiring adjustment on totpauthwindow""" + module_args = { + 'totpauthwindow': 10 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_mod', + 'name': None, + 'item': {'ipatokentotpauthwindow': '10'} + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_totp_sync_window(self): + """Set values requiring adjustment on totpsyncwindow""" + module_args = { + 'totpsyncwindow': 10 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_mod', + 'name': None, + 'item': {'ipatokentotpsyncwindow': '10'} + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_hotp_auth_window(self): + """Set values requiring adjustment on hotpauthwindow""" + module_args = { + 'hotpauthwindow': 10 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_mod', + 'name': None, + 'item': {'ipatokenhotpauthwindow': '10'} + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_hotp_sync_window(self): + """Set values requiring adjustment on hotpsyncwindow""" + module_args = { + 'hotpsyncwindow': 10 + } + return_value = { + 'ipatokentotpauthwindow': ['11'], + 'ipatokentotpsyncwindow': ['12'], + 'ipatokenhotpauthwindow': ['13'], + 'ipatokenhotpsyncwindow': ['14']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_mod', + 'name': None, + 'item': {'ipatokenhotpsyncwindow': '10'} + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_set_all(self): + """Set values requiring adjustment on all""" + module_args = { + 'ipatokentotpauthwindow': 11, + 'ipatokentotpsyncwindow': 12, + 'ipatokenhotpauthwindow': 13, + 'ipatokenhotpsyncwindow': 14 + } + return_value = { + 'ipatokentotpauthwindow': ['1'], + 'ipatokentotpsyncwindow': ['2'], + 'ipatokenhotpauthwindow': ['3'], + 'ipatokenhotpsyncwindow': ['4']} + mock_calls = ( + { + 'method': 'otpconfig_show', + 'name': None + }, + { + 'method': 'otpconfig_mod', + 'name': None, + 'item': {'ipatokentotpauthwindow': '11', + 'ipatokentotpsyncwindow': '12', + 'ipatokenhotpauthwindow': '13', + 'ipatokenhotpsyncwindow': '14'} + }, + { + 'method': 'otpconfig_show', + 'name': None + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_fail_post(self): + """Fail due to an exception raised from _post_json""" + set_module_args({ + 'ipatokentotpauthwindow': 11, + 'ipatokentotpsyncwindow': 12, + 'ipatokenhotpauthwindow': 13, + 'ipatokenhotpsyncwindow': 14 + }) + + with patch_ipa(side_effect=Exception('ERROR MESSAGE')) as (mock_login, mock_post): + with self.assertRaises(AnsibleFailJson) as exec_info: + self.module.main() + + self.assertEqual(exec_info.exception.args[0]['msg'], 'ERROR MESSAGE') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/plugins/modules/identity/ipa/test_ipa_otptoken.py b/tests/unit/plugins/modules/identity/ipa/test_ipa_otptoken.py new file mode 100644 index 0000000000..ecea5920a0 --- /dev/null +++ b/tests/unit/plugins/modules/identity/ipa/test_ipa_otptoken.py @@ -0,0 +1,495 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2020, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from contextlib import contextmanager + +from ansible_collections.community.general.tests.unit.compat import unittest +from ansible_collections.community.general.tests.unit.compat.mock import call, patch +from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args + +from ansible_collections.community.general.plugins.modules.identity.ipa import ipa_otptoken + + +@contextmanager +def patch_ipa(**kwargs): + """Mock context manager for patching the methods in OTPTokenIPAClient that contact the IPA server + + Patches the `login` and `_post_json` methods + + Keyword arguments are passed to the mock object that patches `_post_json` + + No arguments are passed to the mock object that patches `login` because no tests require it + + Example:: + + with patch_ipa(return_value={}) as (mock_login, mock_post): + ... + """ + obj = ipa_otptoken.OTPTokenIPAClient + with patch.object(obj, 'login') as mock_login: + with patch.object(obj, '_post_json', **kwargs) as mock_post: + yield mock_login, mock_post + + +class TestIPAOTPToken(ModuleTestCase): + def setUp(self): + super(TestIPAOTPToken, self).setUp() + self.module = ipa_otptoken + + def _test_base(self, module_args, return_value, mock_calls, changed): + """Base function that's called by all the other test functions + + module_args (dict): + Arguments passed to the module + + return_value (dict): + Mocked return value of OTPTokenIPAClient.otptoken_show, as returned by the IPA API. + This should be set to the current state. It will be changed to the desired state using the above arguments. + (Technically, this is the return value of _post_json, but it's only checked by otptoken_show). + + mock_calls (list/tuple of dicts): + List of calls made to OTPTokenIPAClient._post_json, in order. + _post_json is called by all of the otptoken_* methods of the class. + Pass an empty list if no calls are expected. + + changed (bool): + Whether or not the module is supposed to be marked as changed + """ + set_module_args(module_args) + + # Run the module + with patch_ipa(return_value=return_value) as (mock_login, mock_post): + with self.assertRaises(AnsibleExitJson) as exec_info: + self.module.main() + + # Verify that the calls to _post_json match what is expected + expected_call_count = len(mock_calls) + if expected_call_count > 1: + # Convert the call dicts to unittest.mock.call instances because `assert_has_calls` only accepts them + converted_calls = [] + for call_dict in mock_calls: + converted_calls.append(call(**call_dict)) + + mock_post.assert_has_calls(converted_calls) + self.assertEqual(len(mock_post.mock_calls), expected_call_count) + elif expected_call_count == 1: + mock_post.assert_called_once_with(**mock_calls[0]) + else: # expected_call_count is 0 + mock_post.assert_not_called() + + # Verify that the module's changed status matches what is expected + self.assertIs(exec_info.exception.args[0]['changed'], changed) + + def test_add_new_all_default(self): + """Add a new OTP with all default values""" + module_args = { + 'uniqueid': 'NewToken1' + } + return_value = {} + mock_calls = ( + { + 'method': 'otptoken_find', + 'name': None, + 'item': {'all': True, + 'ipatokenuniqueid': 'NewToken1', + 'timelimit': '0', + 'sizelimit': '0'} + }, + { + 'method': 'otptoken_add', + 'name': 'NewToken1', + 'item': {'ipatokendisabled': 'FALSE', + 'all': True} + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_add_new_all_default_with_aliases(self): + """Add a new OTP with all default values using alias values""" + module_args = { + 'name': 'NewToken1' + } + return_value = {} + mock_calls = ( + { + 'method': 'otptoken_find', + 'name': None, + 'item': {'all': True, + 'ipatokenuniqueid': 'NewToken1', + 'timelimit': '0', + 'sizelimit': '0'} + }, + { + 'method': 'otptoken_add', + 'name': 'NewToken1', + 'item': {'ipatokendisabled': 'FALSE', + 'all': True} + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_add_new_all_specified(self): + """Add a new OTP with all default values""" + module_args = { + 'uniqueid': 'NewToken1', + 'otptype': 'hotp', + 'secretkey': 'VGVzdFNlY3JldDE=', + 'description': 'Test description', + 'owner': 'pinky', + 'enabled': True, + 'notbefore': '20200101010101', + 'notafter': '20900101010101', + 'vendor': 'Acme', + 'model': 'ModelT', + 'serial': 'Number1', + 'state': 'present', + 'algorithm': 'sha256', + 'digits': 6, + 'offset': 10, + 'interval': 30, + 'counter': 30, + } + return_value = {} + mock_calls = ( + { + 'method': 'otptoken_find', + 'name': None, + 'item': {'all': True, + 'ipatokenuniqueid': 'NewToken1', + 'timelimit': '0', + 'sizelimit': '0'} + }, + { + 'method': 'otptoken_add', + 'name': 'NewToken1', + 'item': {'type': 'HOTP', + 'ipatokenotpkey': 'KRSXG5CTMVRXEZLUGE======', + 'description': 'Test description', + 'ipatokenowner': 'pinky', + 'ipatokendisabled': 'FALSE', + 'ipatokennotbefore': '20200101010101Z', + 'ipatokennotafter': '20900101010101Z', + 'ipatokenvendor': 'Acme', + 'ipatokenmodel': 'ModelT', + 'ipatokenserial': 'Number1', + 'ipatokenotpalgorithm': 'sha256', + 'ipatokenotpdigits': '6', + 'ipatokentotpclockoffset': '10', + 'ipatokentotptimestep': '30', + 'ipatokenhotpcounter': '30', + 'all': True} + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_already_existing_no_change_all_specified(self): + """Add a new OTP with all values specified but needing no change""" + module_args = { + 'uniqueid': 'NewToken1', + 'otptype': 'hotp', + 'secretkey': 'VGVzdFNlY3JldDE=', + 'description': 'Test description', + 'owner': 'pinky', + 'enabled': True, + 'notbefore': '20200101010101', + 'notafter': '20900101010101', + 'vendor': 'Acme', + 'model': 'ModelT', + 'serial': 'Number1', + 'state': 'present', + 'algorithm': 'sha256', + 'digits': 6, + 'offset': 10, + 'interval': 30, + 'counter': 30, + } + return_value = {'ipatokenuniqueid': 'NewToken1', + 'type': 'HOTP', + 'ipatokenotpkey': [{'__base64__': 'VGVzdFNlY3JldDE='}], + 'description': ['Test description'], + 'ipatokenowner': ['pinky'], + 'ipatokendisabled': ['FALSE'], + 'ipatokennotbefore': ['20200101010101Z'], + 'ipatokennotafter': ['20900101010101Z'], + 'ipatokenvendor': ['Acme'], + 'ipatokenmodel': ['ModelT'], + 'ipatokenserial': ['Number1'], + 'ipatokenotpalgorithm': ['sha256'], + 'ipatokenotpdigits': ['6'], + 'ipatokentotpclockoffset': ['10'], + 'ipatokentotptimestep': ['30'], + 'ipatokenhotpcounter': ['30']} + mock_calls = [ + { + 'method': 'otptoken_find', + 'name': None, + 'item': {'all': True, + 'ipatokenuniqueid': 'NewToken1', + 'timelimit': '0', + 'sizelimit': '0'} + } + ] + changed = False + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_already_existing_one_change_all_specified(self): + """Modify an existing OTP with one value specified needing change""" + module_args = { + 'uniqueid': 'NewToken1', + 'otptype': 'hotp', + 'secretkey': 'VGVzdFNlY3JldDE=', + 'description': 'Test description', + 'owner': 'brain', + 'enabled': True, + 'notbefore': '20200101010101', + 'notafter': '20900101010101', + 'vendor': 'Acme', + 'model': 'ModelT', + 'serial': 'Number1', + 'state': 'present', + 'algorithm': 'sha256', + 'digits': 6, + 'offset': 10, + 'interval': 30, + 'counter': 30, + } + return_value = {'ipatokenuniqueid': 'NewToken1', + 'type': 'HOTP', + 'ipatokenotpkey': [{'__base64__': 'VGVzdFNlY3JldDE='}], + 'description': ['Test description'], + 'ipatokenowner': ['pinky'], + 'ipatokendisabled': ['FALSE'], + 'ipatokennotbefore': ['20200101010101Z'], + 'ipatokennotafter': ['20900101010101Z'], + 'ipatokenvendor': ['Acme'], + 'ipatokenmodel': ['ModelT'], + 'ipatokenserial': ['Number1'], + 'ipatokenotpalgorithm': ['sha256'], + 'ipatokenotpdigits': ['6'], + 'ipatokentotpclockoffset': ['10'], + 'ipatokentotptimestep': ['30'], + 'ipatokenhotpcounter': ['30']} + mock_calls = ( + { + 'method': 'otptoken_find', + 'name': None, + 'item': {'all': True, + 'ipatokenuniqueid': 'NewToken1', + 'timelimit': '0', + 'sizelimit': '0'} + }, + { + 'method': 'otptoken_mod', + 'name': 'NewToken1', + 'item': {'description': 'Test description', + 'ipatokenowner': 'brain', + 'ipatokendisabled': 'FALSE', + 'ipatokennotbefore': '20200101010101Z', + 'ipatokennotafter': '20900101010101Z', + 'ipatokenvendor': 'Acme', + 'ipatokenmodel': 'ModelT', + 'ipatokenserial': 'Number1', + 'all': True} + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_already_existing_all_valid_change_all_specified(self): + """Modify an existing OTP with all valid values specified needing change""" + module_args = { + 'uniqueid': 'NewToken1', + 'otptype': 'hotp', + 'secretkey': 'VGVzdFNlY3JldDE=', + 'description': 'New Test description', + 'owner': 'pinky', + 'enabled': False, + 'notbefore': '20200101010102', + 'notafter': '20900101010102', + 'vendor': 'NewAcme', + 'model': 'NewModelT', + 'serial': 'Number2', + 'state': 'present', + 'algorithm': 'sha256', + 'digits': 6, + 'offset': 10, + 'interval': 30, + 'counter': 30, + } + return_value = {'ipatokenuniqueid': 'NewToken1', + 'type': 'HOTP', + 'ipatokenotpkey': [{'__base64__': 'VGVzdFNlY3JldDE='}], + 'description': ['Test description'], + 'ipatokenowner': ['pinky'], + 'ipatokendisabled': ['FALSE'], + 'ipatokennotbefore': ['20200101010101Z'], + 'ipatokennotafter': ['20900101010101Z'], + 'ipatokenvendor': ['Acme'], + 'ipatokenmodel': ['ModelT'], + 'ipatokenserial': ['Number1'], + 'ipatokenotpalgorithm': ['sha256'], + 'ipatokenotpdigits': ['6'], + 'ipatokentotpclockoffset': ['10'], + 'ipatokentotptimestep': ['30'], + 'ipatokenhotpcounter': ['30']} + mock_calls = ( + { + 'method': 'otptoken_find', + 'name': None, + 'item': {'all': True, + 'ipatokenuniqueid': 'NewToken1', + 'timelimit': '0', + 'sizelimit': '0'} + }, + { + 'method': 'otptoken_mod', + 'name': 'NewToken1', + 'item': {'description': 'New Test description', + 'ipatokenowner': 'pinky', + 'ipatokendisabled': 'TRUE', + 'ipatokennotbefore': '20200101010102Z', + 'ipatokennotafter': '20900101010102Z', + 'ipatokenvendor': 'NewAcme', + 'ipatokenmodel': 'NewModelT', + 'ipatokenserial': 'Number2', + 'all': True} + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_delete_existing_token(self): + """Delete an existing OTP""" + module_args = { + 'uniqueid': 'NewToken1', + 'state': 'absent' + } + return_value = {'ipatokenuniqueid': 'NewToken1', + 'type': 'HOTP', + 'ipatokenotpkey': [{'__base64__': 'KRSXG5CTMVRXEZLUGE======'}], + 'description': ['Test description'], + 'ipatokenowner': ['pinky'], + 'ipatokendisabled': ['FALSE'], + 'ipatokennotbefore': ['20200101010101Z'], + 'ipatokennotafter': ['20900101010101Z'], + 'ipatokenvendor': ['Acme'], + 'ipatokenmodel': ['ModelT'], + 'ipatokenserial': ['Number1'], + 'ipatokenotpalgorithm': ['sha256'], + 'ipatokenotpdigits': ['6'], + 'ipatokentotpclockoffset': ['10'], + 'ipatokentotptimestep': ['30'], + 'ipatokenhotpcounter': ['30']} + mock_calls = ( + { + 'method': 'otptoken_find', + 'name': None, + 'item': {'all': True, + 'ipatokenuniqueid': 'NewToken1', + 'timelimit': '0', + 'sizelimit': '0'} + }, + { + 'method': 'otptoken_del', + 'name': 'NewToken1' + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_disable_existing_token(self): + """Disable an existing OTP""" + module_args = { + 'uniqueid': 'NewToken1', + 'otptype': 'hotp', + 'enabled': False + } + return_value = {'ipatokenuniqueid': 'NewToken1', + 'type': 'HOTP', + 'ipatokenotpkey': [{'__base64__': 'KRSXG5CTMVRXEZLUGE======'}], + 'description': ['Test description'], + 'ipatokenowner': ['pinky'], + 'ipatokendisabled': ['FALSE'], + 'ipatokennotbefore': ['20200101010101Z'], + 'ipatokennotafter': ['20900101010101Z'], + 'ipatokenvendor': ['Acme'], + 'ipatokenmodel': ['ModelT'], + 'ipatokenserial': ['Number1'], + 'ipatokenotpalgorithm': ['sha256'], + 'ipatokenotpdigits': ['6'], + 'ipatokentotpclockoffset': ['10'], + 'ipatokentotptimestep': ['30'], + 'ipatokenhotpcounter': ['30']} + mock_calls = ( + { + 'method': 'otptoken_find', + 'name': None, + 'item': {'all': True, + 'ipatokenuniqueid': 'NewToken1', + 'timelimit': '0', + 'sizelimit': '0'} + }, + { + 'method': 'otptoken_mod', + 'name': 'NewToken1', + 'item': {'ipatokendisabled': 'TRUE', + 'all': True} + } + ) + changed = True + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_delete_not_existing_token(self): + """Delete a OTP that does not exist""" + module_args = { + 'uniqueid': 'NewToken1', + 'state': 'absent' + } + return_value = {} + + mock_calls = [ + { + 'method': 'otptoken_find', + 'name': None, + 'item': {'all': True, + 'ipatokenuniqueid': 'NewToken1', + 'timelimit': '0', + 'sizelimit': '0'} + } + ] + + changed = False + + self._test_base(module_args, return_value, mock_calls, changed) + + def test_fail_post(self): + """Fail due to an exception raised from _post_json""" + set_module_args({ + 'uniqueid': 'NewToken1' + }) + + with patch_ipa(side_effect=Exception('ERROR MESSAGE')) as (mock_login, mock_post): + with self.assertRaises(AnsibleFailJson) as exec_info: + self.module.main() + + self.assertEqual(exec_info.exception.args[0]['msg'], 'ERROR MESSAGE') + + +if __name__ == '__main__': + unittest.main()