mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-25 13:34:01 -07:00 
			
		
		
		
	Use semantic markup (modules a-c) (#6671)
* Use semantic markup.
* E() now works better.
(cherry picked from commit 6fc1df9b83)
Co-authored-by: Felix Fontein <felix@fontein.de>
		
	
			
		
			
				
	
	
		
			362 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			362 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/python
 | |
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| # Copyright (c) 2014, Steve <yo@groks.org>
 | |
| # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
 | |
| # SPDX-License-Identifier: GPL-3.0-or-later
 | |
| 
 | |
| from __future__ import absolute_import, division, print_function
 | |
| __metaclass__ = type
 | |
| 
 | |
| DOCUMENTATION = r'''
 | |
| ---
 | |
| module: crypttab
 | |
| short_description: Encrypted Linux block devices
 | |
| description:
 | |
|   - Control Linux encrypted block devices that are set up during system boot in C(/etc/crypttab).
 | |
| extends_documentation_fragment:
 | |
|   - community.general.attributes
 | |
| attributes:
 | |
|   check_mode:
 | |
|     support: full
 | |
|   diff_mode:
 | |
|     support: none
 | |
| options:
 | |
|   name:
 | |
|     description:
 | |
|       - Name of the encrypted block device as it appears in the C(/etc/crypttab) file, or
 | |
|         optionally prefixed with V(/dev/mapper/), as it appears in the filesystem. V(/dev/mapper/)
 | |
|         will be stripped from O(name).
 | |
|     type: str
 | |
|     required: true
 | |
|   state:
 | |
|     description:
 | |
|       - Use V(present) to add a line to C(/etc/crypttab) or update its definition
 | |
|         if already present.
 | |
|       - Use V(absent) to remove a line with matching O(name).
 | |
|       - Use V(opts_present) to add options to those already present; options with
 | |
|         different values will be updated.
 | |
|       - Use V(opts_absent) to remove options from the existing set.
 | |
|     type: str
 | |
|     required: true
 | |
|     choices: [ absent, opts_absent, opts_present, present ]
 | |
|   backing_device:
 | |
|     description:
 | |
|       - Path to the underlying block device or file, or the UUID of a block-device
 | |
|         prefixed with V(UUID=).
 | |
|     type: str
 | |
|   password:
 | |
|     description:
 | |
|       - Encryption password, the path to a file containing the password, or
 | |
|         V(-) or unset if the password should be entered at boot.
 | |
|     type: path
 | |
|   opts:
 | |
|     description:
 | |
|       - A comma-delimited list of options. See V(crypttab(5\)) for details.
 | |
|     type: str
 | |
|   path:
 | |
|     description:
 | |
|       - Path to file to use instead of V(/etc/crypttab).
 | |
|       - This might be useful in a chroot environment.
 | |
|     type: path
 | |
|     default: /etc/crypttab
 | |
| author:
 | |
| - Steve (@groks)
 | |
| '''
 | |
| 
 | |
| EXAMPLES = r'''
 | |
| - name: Set the options explicitly a device which must already exist
 | |
|   community.general.crypttab:
 | |
|     name: luks-home
 | |
|     state: present
 | |
|     opts: discard,cipher=aes-cbc-essiv:sha256
 | |
| 
 | |
| - name: Add the 'discard' option to any existing options for all devices
 | |
|   community.general.crypttab:
 | |
|     name: '{{ item.device }}'
 | |
|     state: opts_present
 | |
|     opts: discard
 | |
|   loop: '{{ ansible_mounts }}'
 | |
|   when: "'/dev/mapper/luks-' in {{ item.device }}"
 | |
| '''
 | |
| 
 | |
| import os
 | |
| import traceback
 | |
| 
 | |
| from ansible.module_utils.basic import AnsibleModule
 | |
| from ansible.module_utils.common.text.converters import to_bytes, to_native
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     module = AnsibleModule(
 | |
|         argument_spec=dict(
 | |
|             name=dict(type='str', required=True),
 | |
|             state=dict(type='str', required=True, choices=['absent', 'opts_absent', 'opts_present', 'present']),
 | |
|             backing_device=dict(type='str'),
 | |
|             password=dict(type='path'),
 | |
|             opts=dict(type='str'),
 | |
|             path=dict(type='path', default='/etc/crypttab')
 | |
|         ),
 | |
|         supports_check_mode=True,
 | |
|     )
 | |
| 
 | |
|     backing_device = module.params['backing_device']
 | |
|     password = module.params['password']
 | |
|     opts = module.params['opts']
 | |
|     state = module.params['state']
 | |
|     path = module.params['path']
 | |
|     name = module.params['name']
 | |
|     if name.startswith('/dev/mapper/'):
 | |
|         name = name[len('/dev/mapper/'):]
 | |
| 
 | |
|     if state != 'absent' and backing_device is None and password is None and opts is None:
 | |
|         module.fail_json(msg="expected one or more of 'backing_device', 'password' or 'opts'",
 | |
|                          **module.params)
 | |
| 
 | |
|     if 'opts' in state and (backing_device is not None or password is not None):
 | |
|         module.fail_json(msg="cannot update 'backing_device' or 'password' when state=%s" % state,
 | |
|                          **module.params)
 | |
| 
 | |
|     for arg_name, arg in (('name', name),
 | |
|                           ('backing_device', backing_device),
 | |
|                           ('password', password),
 | |
|                           ('opts', opts)):
 | |
|         if (arg is not None and (' ' in arg or '\t' in arg or arg == '')):
 | |
|             module.fail_json(msg="invalid '%s': contains white space or is empty" % arg_name,
 | |
|                              **module.params)
 | |
| 
 | |
|     try:
 | |
|         crypttab = Crypttab(path)
 | |
|         existing_line = crypttab.match(name)
 | |
|     except Exception as e:
 | |
|         module.fail_json(msg="failed to open and parse crypttab file: %s" % to_native(e),
 | |
|                          exception=traceback.format_exc(), **module.params)
 | |
| 
 | |
|     if 'present' in state and existing_line is None and backing_device is None:
 | |
|         module.fail_json(msg="'backing_device' required to add a new entry",
 | |
|                          **module.params)
 | |
| 
 | |
|     changed, reason = False, '?'
 | |
| 
 | |
|     if state == 'absent':
 | |
|         if existing_line is not None:
 | |
|             changed, reason = existing_line.remove()
 | |
| 
 | |
|     elif state == 'present':
 | |
|         if existing_line is not None:
 | |
|             changed, reason = existing_line.set(backing_device, password, opts)
 | |
|         else:
 | |
|             changed, reason = crypttab.add(Line(None, name, backing_device, password, opts))
 | |
| 
 | |
|     elif state == 'opts_present':
 | |
|         if existing_line is not None:
 | |
|             changed, reason = existing_line.opts.add(opts)
 | |
|         else:
 | |
|             changed, reason = crypttab.add(Line(None, name, backing_device, password, opts))
 | |
| 
 | |
|     elif state == 'opts_absent':
 | |
|         if existing_line is not None:
 | |
|             changed, reason = existing_line.opts.remove(opts)
 | |
| 
 | |
|     if changed and not module.check_mode:
 | |
|         try:
 | |
|             f = open(path, 'wb')
 | |
|             f.write(to_bytes(crypttab, errors='surrogate_or_strict'))
 | |
|         finally:
 | |
|             f.close()
 | |
| 
 | |
|     module.exit_json(changed=changed, msg=reason, **module.params)
 | |
| 
 | |
| 
 | |
| class Crypttab(object):
 | |
|     _lines = []
 | |
| 
 | |
|     def __init__(self, path):
 | |
|         self.path = path
 | |
|         if not os.path.exists(path):
 | |
|             if not os.path.exists(os.path.dirname(path)):
 | |
|                 os.makedirs(os.path.dirname(path))
 | |
|             open(path, 'a').close()
 | |
| 
 | |
|         try:
 | |
|             f = open(path, 'r')
 | |
|             for line in f.readlines():
 | |
|                 self._lines.append(Line(line))
 | |
|         finally:
 | |
|             f.close()
 | |
| 
 | |
|     def add(self, line):
 | |
|         self._lines.append(line)
 | |
|         return True, 'added line'
 | |
| 
 | |
|     def lines(self):
 | |
|         for line in self._lines:
 | |
|             if line.valid():
 | |
|                 yield line
 | |
| 
 | |
|     def match(self, name):
 | |
|         for line in self.lines():
 | |
|             if line.name == name:
 | |
|                 return line
 | |
|         return None
 | |
| 
 | |
|     def __str__(self):
 | |
|         lines = []
 | |
|         for line in self._lines:
 | |
|             lines.append(str(line))
 | |
|         crypttab = '\n'.join(lines)
 | |
|         if len(crypttab) == 0:
 | |
|             crypttab += '\n'
 | |
|         if crypttab[-1] != '\n':
 | |
|             crypttab += '\n'
 | |
|         return crypttab
 | |
| 
 | |
| 
 | |
| class Line(object):
 | |
|     def __init__(self, line=None, name=None, backing_device=None, password=None, opts=None):
 | |
|         self.line = line
 | |
|         self.name = name
 | |
|         self.backing_device = backing_device
 | |
|         self.password = password
 | |
|         self.opts = Options(opts)
 | |
| 
 | |
|         if line is not None:
 | |
|             self.line = self.line.rstrip('\n')
 | |
|             if self._line_valid(line):
 | |
|                 self.name, backing_device, password, opts = self._split_line(line)
 | |
| 
 | |
|         self.set(backing_device, password, opts)
 | |
| 
 | |
|     def set(self, backing_device, password, opts):
 | |
|         changed = False
 | |
| 
 | |
|         if backing_device is not None and self.backing_device != backing_device:
 | |
|             self.backing_device = backing_device
 | |
|             changed = True
 | |
| 
 | |
|         if password is not None and self.password != password:
 | |
|             self.password = password
 | |
|             changed = True
 | |
| 
 | |
|         if opts is not None:
 | |
|             opts = Options(opts)
 | |
|             if opts != self.opts:
 | |
|                 self.opts = opts
 | |
|                 changed = True
 | |
| 
 | |
|         return changed, 'updated line'
 | |
| 
 | |
|     def _line_valid(self, line):
 | |
|         if not line.strip() or line.startswith('#') or len(line.split()) not in (2, 3, 4):
 | |
|             return False
 | |
|         return True
 | |
| 
 | |
|     def _split_line(self, line):
 | |
|         fields = line.split()
 | |
|         try:
 | |
|             field2 = fields[2]
 | |
|         except IndexError:
 | |
|             field2 = None
 | |
|         try:
 | |
|             field3 = fields[3]
 | |
|         except IndexError:
 | |
|             field3 = None
 | |
| 
 | |
|         return (fields[0],
 | |
|                 fields[1],
 | |
|                 field2,
 | |
|                 field3)
 | |
| 
 | |
|     def remove(self):
 | |
|         self.line, self.name, self.backing_device = '', None, None
 | |
|         return True, 'removed line'
 | |
| 
 | |
|     def valid(self):
 | |
|         if self.name is not None and self.backing_device is not None:
 | |
|             return True
 | |
|         return False
 | |
| 
 | |
|     def __str__(self):
 | |
|         if self.valid():
 | |
|             fields = [self.name, self.backing_device]
 | |
|             if self.password is not None or self.opts:
 | |
|                 if self.password is not None:
 | |
|                     fields.append(self.password)
 | |
|                 else:
 | |
|                     fields.append('none')
 | |
|             if self.opts:
 | |
|                 fields.append(str(self.opts))
 | |
|             return ' '.join(fields)
 | |
|         return self.line
 | |
| 
 | |
| 
 | |
| class Options(dict):
 | |
|     """opts_string looks like: 'discard,foo=bar,baz=greeble' """
 | |
| 
 | |
|     def __init__(self, opts_string):
 | |
|         super(Options, self).__init__()
 | |
|         self.itemlist = []
 | |
|         if opts_string is not None:
 | |
|             for opt in opts_string.split(','):
 | |
|                 kv = opt.split('=')
 | |
|                 if len(kv) > 1:
 | |
|                     k, v = (kv[0], kv[1])
 | |
|                 else:
 | |
|                     k, v = (kv[0], None)
 | |
|                 self[k] = v
 | |
| 
 | |
|     def add(self, opts_string):
 | |
|         changed = False
 | |
|         for k, v in Options(opts_string).items():
 | |
|             if k in self:
 | |
|                 if self[k] != v:
 | |
|                     changed = True
 | |
|             else:
 | |
|                 changed = True
 | |
|             self[k] = v
 | |
|         return changed, 'updated options'
 | |
| 
 | |
|     def remove(self, opts_string):
 | |
|         changed = False
 | |
|         for k in Options(opts_string):
 | |
|             if k in self:
 | |
|                 del self[k]
 | |
|                 changed = True
 | |
|         return changed, 'removed options'
 | |
| 
 | |
|     def keys(self):
 | |
|         return self.itemlist
 | |
| 
 | |
|     def values(self):
 | |
|         return [self[key] for key in self]
 | |
| 
 | |
|     def items(self):
 | |
|         return [(key, self[key]) for key in self]
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return iter(self.itemlist)
 | |
| 
 | |
|     def __setitem__(self, key, value):
 | |
|         if key not in self:
 | |
|             self.itemlist.append(key)
 | |
|         super(Options, self).__setitem__(key, value)
 | |
| 
 | |
|     def __delitem__(self, key):
 | |
|         self.itemlist.remove(key)
 | |
|         super(Options, self).__delitem__(key)
 | |
| 
 | |
|     def __ne__(self, obj):
 | |
|         return not (isinstance(obj, Options) and sorted(self.items()) == sorted(obj.items()))
 | |
| 
 | |
|     def __str__(self):
 | |
|         ret = []
 | |
|         for k, v in self.items():
 | |
|             if v is None:
 | |
|                 ret.append(k)
 | |
|             else:
 | |
|                 ret.append('%s=%s' % (k, v))
 | |
|         return ','.join(ret)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |