mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-06-29 03:30:22 -07:00
pacman_key: support checking for expired and untrusted keys (#9950)
* Support checking for expired and untrusted keys Adds option `ensure_trusted`. Fixes #9949 * Update `ensure_trusted` option documentation
This commit is contained in:
parent
acbb59d3d8
commit
624a8aa120
3 changed files with 277 additions and 75 deletions
|
@ -78,10 +78,16 @@ options:
|
|||
default: /etc/pacman.d/gnupg
|
||||
state:
|
||||
description:
|
||||
- Ensures that the key is present (added) or absent (revoked).
|
||||
- Ensures that the key is V(present) (added) or V(absent) (revoked).
|
||||
default: present
|
||||
choices: [absent, present]
|
||||
type: str
|
||||
ensure_trusted:
|
||||
description:
|
||||
- Ensure that the key is trusted (signed by the Pacman machine key and not expired).
|
||||
type: bool
|
||||
default: false
|
||||
version_added: 11.0.0
|
||||
"""
|
||||
|
||||
EXAMPLES = r"""
|
||||
|
@ -129,12 +135,55 @@ from ansible.module_utils.urls import fetch_url
|
|||
from ansible.module_utils.common.text.converters import to_native
|
||||
|
||||
|
||||
class GpgListResult(object):
|
||||
"""Wraps gpg --list-* output."""
|
||||
|
||||
def __init__(self, line):
|
||||
self._parts = line.split(':')
|
||||
|
||||
@property
|
||||
def kind(self):
|
||||
return self._parts[0]
|
||||
|
||||
@property
|
||||
def valid(self):
|
||||
return self._parts[1]
|
||||
|
||||
@property
|
||||
def is_fully_valid(self):
|
||||
return self.valid == 'f'
|
||||
|
||||
@property
|
||||
def key(self):
|
||||
return self._parts[4]
|
||||
|
||||
@property
|
||||
def user_id(self):
|
||||
return self._parts[9]
|
||||
|
||||
|
||||
def gpg_get_first_attr_of_kind(lines, kind, attr):
|
||||
for line in lines:
|
||||
glr = GpgListResult(line)
|
||||
if glr.kind == kind:
|
||||
return getattr(glr, attr)
|
||||
|
||||
|
||||
def gpg_get_all_attrs_of_kind(lines, kind, attr):
|
||||
result = []
|
||||
for line in lines:
|
||||
glr = GpgListResult(line)
|
||||
if glr.kind == kind:
|
||||
result.append(getattr(glr, attr))
|
||||
return result
|
||||
|
||||
|
||||
class PacmanKey(object):
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
# obtain binary paths for gpg & pacman-key
|
||||
self.gpg = module.get_bin_path('gpg', required=True)
|
||||
self.pacman_key = module.get_bin_path('pacman-key', required=True)
|
||||
self.gpg_binary = module.get_bin_path('gpg', required=True)
|
||||
self.pacman_key_binary = module.get_bin_path('pacman-key', required=True)
|
||||
|
||||
# obtain module parameters
|
||||
keyid = module.params['id']
|
||||
|
@ -146,47 +195,71 @@ class PacmanKey(object):
|
|||
force_update = module.params['force_update']
|
||||
keyring = module.params['keyring']
|
||||
state = module.params['state']
|
||||
ensure_trusted = module.params['ensure_trusted']
|
||||
self.keylength = 40
|
||||
|
||||
# sanitise key ID & check if key exists in the keyring
|
||||
keyid = self.sanitise_keyid(keyid)
|
||||
key_present = self.key_in_keyring(keyring, keyid)
|
||||
key_validity = self.key_validity(keyring, keyid)
|
||||
key_present = len(key_validity) > 0
|
||||
key_valid = any(key_validity)
|
||||
|
||||
# check mode
|
||||
if module.check_mode:
|
||||
if state == "present":
|
||||
if state == 'present':
|
||||
changed = (key_present and force_update) or not key_present
|
||||
if not changed and ensure_trusted:
|
||||
changed = not (key_valid and self.key_is_trusted(keyring, keyid))
|
||||
module.exit_json(changed=changed)
|
||||
elif state == "absent":
|
||||
if key_present:
|
||||
module.exit_json(changed=True)
|
||||
module.exit_json(changed=False)
|
||||
if state == 'absent':
|
||||
module.exit_json(changed=key_present)
|
||||
|
||||
if state == "present":
|
||||
if key_present and not force_update:
|
||||
if state == 'present':
|
||||
trusted = key_valid and self.key_is_trusted(keyring, keyid)
|
||||
if not force_update and key_present and (not ensure_trusted or trusted):
|
||||
module.exit_json(changed=False)
|
||||
|
||||
changed = False
|
||||
if data:
|
||||
file = self.save_key(data)
|
||||
self.add_key(keyring, file, keyid, verify)
|
||||
module.exit_json(changed=True)
|
||||
changed = True
|
||||
elif file:
|
||||
self.add_key(keyring, file, keyid, verify)
|
||||
module.exit_json(changed=True)
|
||||
changed = True
|
||||
elif url:
|
||||
data = self.fetch_key(url)
|
||||
file = self.save_key(data)
|
||||
self.add_key(keyring, file, keyid, verify)
|
||||
module.exit_json(changed=True)
|
||||
changed = True
|
||||
elif keyserver:
|
||||
self.recv_key(keyring, keyid, keyserver)
|
||||
module.exit_json(changed=True)
|
||||
elif state == "absent":
|
||||
changed = True
|
||||
if changed or (ensure_trusted and not trusted):
|
||||
self.lsign_key(keyring=keyring, keyid=keyid)
|
||||
changed = True
|
||||
module.exit_json(changed=changed)
|
||||
elif state == 'absent':
|
||||
if key_present:
|
||||
self.remove_key(keyring, keyid)
|
||||
module.exit_json(changed=True)
|
||||
module.exit_json(changed=False)
|
||||
|
||||
def gpg(self, args, keyring=None, **kwargs):
|
||||
cmd = [self.gpg_binary]
|
||||
if keyring:
|
||||
cmd.append('--homedir={keyring}'.format(keyring=keyring))
|
||||
cmd.extend(['--no-permission-warning', '--with-colons', '--quiet', '--batch', '--no-tty'])
|
||||
return self.module.run_command(cmd + args, **kwargs)
|
||||
|
||||
def pacman_key(self, args, keyring, **kwargs):
|
||||
return self.module.run_command(
|
||||
[self.pacman_key_binary, '--gpgdir', keyring] + args,
|
||||
**kwargs)
|
||||
|
||||
def pacman_machine_key(self, keyring):
|
||||
unused_rc, stdout, unused_stderr = self.gpg(['--list-secret-key'], keyring=keyring)
|
||||
return gpg_get_first_attr_of_kind(stdout.splitlines(), 'sec', 'key')
|
||||
|
||||
def is_hexadecimal(self, string):
|
||||
"""Check if a given string is valid hexadecimal"""
|
||||
try:
|
||||
|
@ -216,14 +289,11 @@ class PacmanKey(object):
|
|||
|
||||
def recv_key(self, keyring, keyid, keyserver):
|
||||
"""Receives key via keyserver"""
|
||||
cmd = [self.pacman_key, '--gpgdir', keyring, '--keyserver', keyserver, '--recv-keys', keyid]
|
||||
self.module.run_command(cmd, check_rc=True)
|
||||
self.lsign_key(keyring, keyid)
|
||||
self.pacman_key(['--keyserver', keyserver, '--recv-keys', keyid], keyring=keyring, check_rc=True)
|
||||
|
||||
def lsign_key(self, keyring, keyid):
|
||||
"""Locally sign key"""
|
||||
cmd = [self.pacman_key, '--gpgdir', keyring]
|
||||
self.module.run_command(cmd + ['--lsign-key', keyid], check_rc=True)
|
||||
self.pacman_key(['--lsign-key', keyid], keyring=keyring, check_rc=True)
|
||||
|
||||
def save_key(self, data):
|
||||
"Saves key data to a temporary file"
|
||||
|
@ -238,14 +308,11 @@ class PacmanKey(object):
|
|||
"""Add key to pacman's keyring"""
|
||||
if verify:
|
||||
self.verify_keyfile(keyfile, keyid)
|
||||
cmd = [self.pacman_key, '--gpgdir', keyring, '--add', keyfile]
|
||||
self.module.run_command(cmd, check_rc=True)
|
||||
self.lsign_key(keyring, keyid)
|
||||
self.pacman_key(['--add', keyfile], keyring=keyring, check_rc=True)
|
||||
|
||||
def remove_key(self, keyring, keyid):
|
||||
"""Remove key from pacman's keyring"""
|
||||
cmd = [self.pacman_key, '--gpgdir', keyring, '--delete', keyid]
|
||||
self.module.run_command(cmd, check_rc=True)
|
||||
self.pacman_key(['--delete', keyid], keyring=keyring, check_rc=True)
|
||||
|
||||
def verify_keyfile(self, keyfile, keyid):
|
||||
"""Verify that keyfile matches the specified key ID"""
|
||||
|
@ -254,48 +321,29 @@ class PacmanKey(object):
|
|||
elif keyid is None:
|
||||
self.module.fail_json(msg="expected a key ID, got none")
|
||||
|
||||
rc, stdout, stderr = self.module.run_command(
|
||||
[
|
||||
self.gpg,
|
||||
'--with-colons',
|
||||
'--with-fingerprint',
|
||||
'--batch',
|
||||
'--no-tty',
|
||||
'--show-keys',
|
||||
keyfile
|
||||
],
|
||||
rc, stdout, stderr = self.gpg(
|
||||
['--with-fingerprint', '--show-keys', keyfile],
|
||||
check_rc=True,
|
||||
)
|
||||
|
||||
extracted_keyid = None
|
||||
for line in stdout.splitlines():
|
||||
if line.startswith('fpr:'):
|
||||
extracted_keyid = line.split(':')[9]
|
||||
break
|
||||
|
||||
extracted_keyid = gpg_get_first_attr_of_kind(stdout.splitlines(), 'fpr', 'user_id')
|
||||
if extracted_keyid != keyid:
|
||||
self.module.fail_json(msg="key ID does not match. expected %s, got %s" % (keyid, extracted_keyid))
|
||||
|
||||
def key_in_keyring(self, keyring, keyid):
|
||||
"Check if the key ID is in pacman's keyring"
|
||||
rc, stdout, stderr = self.module.run_command(
|
||||
[
|
||||
self.gpg,
|
||||
'--with-colons',
|
||||
'--batch',
|
||||
'--no-tty',
|
||||
'--no-default-keyring',
|
||||
'--keyring=%s/pubring.gpg' % keyring,
|
||||
'--list-keys', keyid
|
||||
],
|
||||
check_rc=False,
|
||||
)
|
||||
def key_validity(self, keyring, keyid):
|
||||
"Check if the key ID is in pacman's keyring and not expired"
|
||||
rc, stdout, stderr = self.gpg(['--no-default-keyring', '--list-keys', keyid], keyring=keyring, check_rc=False)
|
||||
if rc != 0:
|
||||
if stderr.find("No public key") >= 0:
|
||||
return False
|
||||
return []
|
||||
else:
|
||||
self.module.fail_json(msg="gpg returned an error: %s" % stderr)
|
||||
return True
|
||||
return gpg_get_all_attrs_of_kind(stdout.splitlines(), 'uid', 'is_fully_valid')
|
||||
|
||||
def key_is_trusted(self, keyring, keyid):
|
||||
"""Check if key is signed and not expired."""
|
||||
unused_rc, stdout, unused_stderr = self.gpg(['--check-signatures', keyid], keyring=keyring)
|
||||
return self.pacman_machine_key(keyring) in gpg_get_all_attrs_of_kind(stdout.splitlines(), 'sig', 'key')
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -309,6 +357,7 @@ def main():
|
|||
verify=dict(type='bool', default=True),
|
||||
force_update=dict(type='bool', default=False),
|
||||
keyring=dict(type='path', default='/etc/pacman.d/gnupg'),
|
||||
ensure_trusted=dict(type='bool', default=False),
|
||||
state=dict(type='str', default='present', choices=['absent', 'present']),
|
||||
),
|
||||
supports_check_mode=True,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue