mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-10-24 04:54:00 -07:00
Use semantic markup (modules a-c) (#6671)
* Use semantic markup.
* E() now works better.
(cherry picked from commit 6fc1df9b83
)
Co-authored-by: Felix Fontein <felix@fontein.de>
362 lines
11 KiB
Python
362 lines
11 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2014, Steve <yo@groks.org>
|
|
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
__metaclass__ = type
|
|
|
|
DOCUMENTATION = r'''
|
|
---
|
|
module: crypttab
|
|
short_description: Encrypted Linux block devices
|
|
description:
|
|
- Control Linux encrypted block devices that are set up during system boot in C(/etc/crypttab).
|
|
extends_documentation_fragment:
|
|
- community.general.attributes
|
|
attributes:
|
|
check_mode:
|
|
support: full
|
|
diff_mode:
|
|
support: none
|
|
options:
|
|
name:
|
|
description:
|
|
- Name of the encrypted block device as it appears in the C(/etc/crypttab) file, or
|
|
optionally prefixed with V(/dev/mapper/), as it appears in the filesystem. V(/dev/mapper/)
|
|
will be stripped from O(name).
|
|
type: str
|
|
required: true
|
|
state:
|
|
description:
|
|
- Use V(present) to add a line to C(/etc/crypttab) or update its definition
|
|
if already present.
|
|
- Use V(absent) to remove a line with matching O(name).
|
|
- Use V(opts_present) to add options to those already present; options with
|
|
different values will be updated.
|
|
- Use V(opts_absent) to remove options from the existing set.
|
|
type: str
|
|
required: true
|
|
choices: [ absent, opts_absent, opts_present, present ]
|
|
backing_device:
|
|
description:
|
|
- Path to the underlying block device or file, or the UUID of a block-device
|
|
prefixed with V(UUID=).
|
|
type: str
|
|
password:
|
|
description:
|
|
- Encryption password, the path to a file containing the password, or
|
|
V(-) or unset if the password should be entered at boot.
|
|
type: path
|
|
opts:
|
|
description:
|
|
- A comma-delimited list of options. See V(crypttab(5\)) for details.
|
|
type: str
|
|
path:
|
|
description:
|
|
- Path to file to use instead of V(/etc/crypttab).
|
|
- This might be useful in a chroot environment.
|
|
type: path
|
|
default: /etc/crypttab
|
|
author:
|
|
- Steve (@groks)
|
|
'''
|
|
|
|
EXAMPLES = r'''
|
|
- name: Set the options explicitly a device which must already exist
|
|
community.general.crypttab:
|
|
name: luks-home
|
|
state: present
|
|
opts: discard,cipher=aes-cbc-essiv:sha256
|
|
|
|
- name: Add the 'discard' option to any existing options for all devices
|
|
community.general.crypttab:
|
|
name: '{{ item.device }}'
|
|
state: opts_present
|
|
opts: discard
|
|
loop: '{{ ansible_mounts }}'
|
|
when: "'/dev/mapper/luks-' in {{ item.device }}"
|
|
'''
|
|
|
|
import os
|
|
import traceback
|
|
|
|
from ansible.module_utils.basic import AnsibleModule
|
|
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
|
|
|
|
|
def main():
|
|
module = AnsibleModule(
|
|
argument_spec=dict(
|
|
name=dict(type='str', required=True),
|
|
state=dict(type='str', required=True, choices=['absent', 'opts_absent', 'opts_present', 'present']),
|
|
backing_device=dict(type='str'),
|
|
password=dict(type='path'),
|
|
opts=dict(type='str'),
|
|
path=dict(type='path', default='/etc/crypttab')
|
|
),
|
|
supports_check_mode=True,
|
|
)
|
|
|
|
backing_device = module.params['backing_device']
|
|
password = module.params['password']
|
|
opts = module.params['opts']
|
|
state = module.params['state']
|
|
path = module.params['path']
|
|
name = module.params['name']
|
|
if name.startswith('/dev/mapper/'):
|
|
name = name[len('/dev/mapper/'):]
|
|
|
|
if state != 'absent' and backing_device is None and password is None and opts is None:
|
|
module.fail_json(msg="expected one or more of 'backing_device', 'password' or 'opts'",
|
|
**module.params)
|
|
|
|
if 'opts' in state and (backing_device is not None or password is not None):
|
|
module.fail_json(msg="cannot update 'backing_device' or 'password' when state=%s" % state,
|
|
**module.params)
|
|
|
|
for arg_name, arg in (('name', name),
|
|
('backing_device', backing_device),
|
|
('password', password),
|
|
('opts', opts)):
|
|
if (arg is not None and (' ' in arg or '\t' in arg or arg == '')):
|
|
module.fail_json(msg="invalid '%s': contains white space or is empty" % arg_name,
|
|
**module.params)
|
|
|
|
try:
|
|
crypttab = Crypttab(path)
|
|
existing_line = crypttab.match(name)
|
|
except Exception as e:
|
|
module.fail_json(msg="failed to open and parse crypttab file: %s" % to_native(e),
|
|
exception=traceback.format_exc(), **module.params)
|
|
|
|
if 'present' in state and existing_line is None and backing_device is None:
|
|
module.fail_json(msg="'backing_device' required to add a new entry",
|
|
**module.params)
|
|
|
|
changed, reason = False, '?'
|
|
|
|
if state == 'absent':
|
|
if existing_line is not None:
|
|
changed, reason = existing_line.remove()
|
|
|
|
elif state == 'present':
|
|
if existing_line is not None:
|
|
changed, reason = existing_line.set(backing_device, password, opts)
|
|
else:
|
|
changed, reason = crypttab.add(Line(None, name, backing_device, password, opts))
|
|
|
|
elif state == 'opts_present':
|
|
if existing_line is not None:
|
|
changed, reason = existing_line.opts.add(opts)
|
|
else:
|
|
changed, reason = crypttab.add(Line(None, name, backing_device, password, opts))
|
|
|
|
elif state == 'opts_absent':
|
|
if existing_line is not None:
|
|
changed, reason = existing_line.opts.remove(opts)
|
|
|
|
if changed and not module.check_mode:
|
|
try:
|
|
f = open(path, 'wb')
|
|
f.write(to_bytes(crypttab, errors='surrogate_or_strict'))
|
|
finally:
|
|
f.close()
|
|
|
|
module.exit_json(changed=changed, msg=reason, **module.params)
|
|
|
|
|
|
class Crypttab(object):
|
|
_lines = []
|
|
|
|
def __init__(self, path):
|
|
self.path = path
|
|
if not os.path.exists(path):
|
|
if not os.path.exists(os.path.dirname(path)):
|
|
os.makedirs(os.path.dirname(path))
|
|
open(path, 'a').close()
|
|
|
|
try:
|
|
f = open(path, 'r')
|
|
for line in f.readlines():
|
|
self._lines.append(Line(line))
|
|
finally:
|
|
f.close()
|
|
|
|
def add(self, line):
|
|
self._lines.append(line)
|
|
return True, 'added line'
|
|
|
|
def lines(self):
|
|
for line in self._lines:
|
|
if line.valid():
|
|
yield line
|
|
|
|
def match(self, name):
|
|
for line in self.lines():
|
|
if line.name == name:
|
|
return line
|
|
return None
|
|
|
|
def __str__(self):
|
|
lines = []
|
|
for line in self._lines:
|
|
lines.append(str(line))
|
|
crypttab = '\n'.join(lines)
|
|
if len(crypttab) == 0:
|
|
crypttab += '\n'
|
|
if crypttab[-1] != '\n':
|
|
crypttab += '\n'
|
|
return crypttab
|
|
|
|
|
|
class Line(object):
|
|
def __init__(self, line=None, name=None, backing_device=None, password=None, opts=None):
|
|
self.line = line
|
|
self.name = name
|
|
self.backing_device = backing_device
|
|
self.password = password
|
|
self.opts = Options(opts)
|
|
|
|
if line is not None:
|
|
self.line = self.line.rstrip('\n')
|
|
if self._line_valid(line):
|
|
self.name, backing_device, password, opts = self._split_line(line)
|
|
|
|
self.set(backing_device, password, opts)
|
|
|
|
def set(self, backing_device, password, opts):
|
|
changed = False
|
|
|
|
if backing_device is not None and self.backing_device != backing_device:
|
|
self.backing_device = backing_device
|
|
changed = True
|
|
|
|
if password is not None and self.password != password:
|
|
self.password = password
|
|
changed = True
|
|
|
|
if opts is not None:
|
|
opts = Options(opts)
|
|
if opts != self.opts:
|
|
self.opts = opts
|
|
changed = True
|
|
|
|
return changed, 'updated line'
|
|
|
|
def _line_valid(self, line):
|
|
if not line.strip() or line.startswith('#') or len(line.split()) not in (2, 3, 4):
|
|
return False
|
|
return True
|
|
|
|
def _split_line(self, line):
|
|
fields = line.split()
|
|
try:
|
|
field2 = fields[2]
|
|
except IndexError:
|
|
field2 = None
|
|
try:
|
|
field3 = fields[3]
|
|
except IndexError:
|
|
field3 = None
|
|
|
|
return (fields[0],
|
|
fields[1],
|
|
field2,
|
|
field3)
|
|
|
|
def remove(self):
|
|
self.line, self.name, self.backing_device = '', None, None
|
|
return True, 'removed line'
|
|
|
|
def valid(self):
|
|
if self.name is not None and self.backing_device is not None:
|
|
return True
|
|
return False
|
|
|
|
def __str__(self):
|
|
if self.valid():
|
|
fields = [self.name, self.backing_device]
|
|
if self.password is not None or self.opts:
|
|
if self.password is not None:
|
|
fields.append(self.password)
|
|
else:
|
|
fields.append('none')
|
|
if self.opts:
|
|
fields.append(str(self.opts))
|
|
return ' '.join(fields)
|
|
return self.line
|
|
|
|
|
|
class Options(dict):
|
|
"""opts_string looks like: 'discard,foo=bar,baz=greeble' """
|
|
|
|
def __init__(self, opts_string):
|
|
super(Options, self).__init__()
|
|
self.itemlist = []
|
|
if opts_string is not None:
|
|
for opt in opts_string.split(','):
|
|
kv = opt.split('=')
|
|
if len(kv) > 1:
|
|
k, v = (kv[0], kv[1])
|
|
else:
|
|
k, v = (kv[0], None)
|
|
self[k] = v
|
|
|
|
def add(self, opts_string):
|
|
changed = False
|
|
for k, v in Options(opts_string).items():
|
|
if k in self:
|
|
if self[k] != v:
|
|
changed = True
|
|
else:
|
|
changed = True
|
|
self[k] = v
|
|
return changed, 'updated options'
|
|
|
|
def remove(self, opts_string):
|
|
changed = False
|
|
for k in Options(opts_string):
|
|
if k in self:
|
|
del self[k]
|
|
changed = True
|
|
return changed, 'removed options'
|
|
|
|
def keys(self):
|
|
return self.itemlist
|
|
|
|
def values(self):
|
|
return [self[key] for key in self]
|
|
|
|
def items(self):
|
|
return [(key, self[key]) for key in self]
|
|
|
|
def __iter__(self):
|
|
return iter(self.itemlist)
|
|
|
|
def __setitem__(self, key, value):
|
|
if key not in self:
|
|
self.itemlist.append(key)
|
|
super(Options, self).__setitem__(key, value)
|
|
|
|
def __delitem__(self, key):
|
|
self.itemlist.remove(key)
|
|
super(Options, self).__delitem__(key)
|
|
|
|
def __ne__(self, obj):
|
|
return not (isinstance(obj, Options) and sorted(self.items()) == sorted(obj.items()))
|
|
|
|
def __str__(self):
|
|
ret = []
|
|
for k, v in self.items():
|
|
if v is None:
|
|
ret.append(k)
|
|
else:
|
|
ret.append('%s=%s' % (k, v))
|
|
return ','.join(ret)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|