Update java_cert module (#2008) (#2199)

* porting https://github.com/ansible/ansible/pull/56778 as requested in https://github.com/ansible-collections/community.general/issues/821

* fix imports, add back trust_cacerts option

* try to fix import, ansible-lint fixes

* modify import to use ansible.module_utils.six instead

* cleanup indentation for tests/integration/targets/java_cert/tasks/main.yml file

* remove external crypto dependency - switch to openssl, work on password obfuscation, using files compare to reduce logic

* java_cert - remove latest run_command using password in arguments

* fix sanity check

* rename changelog fragment file - wrong extension

* add openssl dependency

* fix openssl_bin parameter missing on _get_digest_from_x509_file function call

* remove useless close files, fix paragraph, fix changelog, clean import re

* fix missing dots at end-of-line in changelogs fragments

* fix reminder case

* fix changelog

* restore .gitignore

* fix indentation on integration test files, delete useless json file

* fix typo importing tasks in tests/integration/targets/java_cert/tasks/main.yml

* Update changelogs/fragments/2008-update-java-cert-replace-cert-when-changed.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update tests/integration/targets/java_cert/tasks/state_change.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/system/java_cert.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/system/java_cert.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/system/java_cert.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/system/java_cert.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/system/java_cert.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/system/java_cert.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/system/java_cert.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* fix hardcoded executable keytool, use re.sub instead of import, add required cert_url or cert_alias parameter when absent, fix python script and cert_url test

* fix pylint issue with setupSSLServeR.py

Co-authored-by: Felix Fontein <felix@fontein.de>
(cherry picked from commit 40ce0f995b)

Co-authored-by: absynth76 <58172580+absynth76@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2021-04-07 20:41:58 +02:00 committed by GitHub
parent 809cdda9ef
commit 19a87874f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 496 additions and 111 deletions

View file

@ -10,6 +10,7 @@ __metaclass__ = type
DOCUMENTATION = r'''
---
module: java_cert
short_description: Uses keytool to import/remove key from java keystore (cacerts)
description:
- This is a wrapper module around keytool, which can be used to import/remove
@ -81,9 +82,12 @@ options:
state:
description:
- Defines action which can be either certificate import or removal.
- When state is present, the certificate will always idempotently be inserted
into the keystore, even if there already exists a cert alias that is different.
type: str
choices: [ absent, present ]
default: present
requirements: [openssl, keytool]
author:
- Adam Hamsik (@haad)
'''
@ -166,41 +170,143 @@ cmd:
'''
import os
import tempfile
import random
import string
import re
# import module snippets
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six.moves.urllib.parse import urlparse
from ansible.module_utils.six.moves.urllib.request import getproxies
def get_keystore_type(keystore_type):
def _get_keystore_type_keytool_parameters(keystore_type):
''' Check that custom keystore is presented in parameters '''
if keystore_type:
return " -storetype '%s'" % keystore_type
return ''
return ["-storetype", keystore_type]
return []
def check_cert_present(module, executable, keystore_path, keystore_pass, alias, keystore_type):
def _check_cert_present(module, executable, keystore_path, keystore_pass, alias, keystore_type):
''' Check if certificate with alias is present in keystore
located at keystore_path '''
test_cmd = ("%s -noprompt -list -keystore '%s' -storepass '%s' "
"-alias '%s' %s") % (executable, keystore_path, keystore_pass, alias, get_keystore_type(keystore_type))
test_cmd = [
executable,
"-list",
"-keystore",
keystore_path,
"-alias",
alias,
"-rfc"
]
test_cmd += _get_keystore_type_keytool_parameters(keystore_type)
check_rc, dummy, dummy = module.run_command(test_cmd)
(check_rc, stdout, dummy) = module.run_command(test_cmd, data=keystore_pass, check_rc=False)
if check_rc == 0:
return True
return False
return (True, stdout)
return (False, '')
def import_cert_url(module, executable, url, port, keystore_path, keystore_pass, alias, keystore_type, trust_cacert):
''' Import certificate from URL into keystore located at keystore_path '''
def _get_certificate_from_url(module, executable, url, port, pem_certificate_output):
remote_cert_pem_chain = _download_cert_url(module, executable, url, port)
with open(pem_certificate_output, 'w') as f:
f.write(remote_cert_pem_chain)
https_proxy = os.getenv("https_proxy")
def _get_first_certificate_from_x509_file(module, pem_certificate_file, pem_certificate_output, openssl_bin):
""" Read a X509 certificate chain file and output the first certificate in the list """
extract_cmd = [
openssl_bin,
"x509",
"-in",
pem_certificate_file,
"-out",
pem_certificate_output
]
(extract_rc, dummy, extract_stderr) = module.run_command(extract_cmd, check_rc=False)
if extract_rc != 0:
# trying der encoded file
extract_cmd += ["-inform", "der"]
(extract_rc, dummy, extract_stderr) = module.run_command(extract_cmd, check_rc=False)
if extract_rc != 0:
# this time it's a real failure
module.fail_json(msg="Internal module failure, cannot extract certificate, error: %s" % extract_stderr,
rc=extract_rc, cmd=extract_cmd)
return extract_rc
def _get_digest_from_x509_file(module, pem_certificate_file, openssl_bin):
""" Read a X509 certificate file and output sha256 digest using openssl """
# cleanup file before to compare
(dummy, tmp_certificate) = tempfile.mkstemp()
module.add_cleanup_file(tmp_certificate)
_get_first_certificate_from_x509_file(module, pem_certificate_file, tmp_certificate, openssl_bin)
dgst_cmd = [
openssl_bin,
"dgst",
"-r",
"-sha256",
tmp_certificate
]
(dgst_rc, dgst_stdout, dgst_stderr) = module.run_command(dgst_cmd, check_rc=False)
if dgst_rc != 0:
module.fail_json(msg="Internal module failure, cannot compute digest for certificate, error: %s" % dgst_stderr,
rc=dgst_rc, cmd=dgst_cmd)
return dgst_stdout.split(' ')[0]
def _export_public_cert_from_pkcs12(module, executable, pkcs_file, alias, password, dest):
""" Runs keytools to extract the public cert from a PKCS12 archive and write it to a file. """
export_cmd = [
executable,
"-list",
"-keystore",
pkcs_file,
"-alias",
alias,
"-storetype",
"pkcs12",
"-rfc"
]
(export_rc, export_stdout, export_err) = module.run_command(export_cmd, data=password, check_rc=False)
if export_rc != 0:
module.fail_json(msg="Internal module failure, cannot extract public certificate from pkcs12, error: %s" % export_err,
rc=export_rc)
with open(dest, 'w') as f:
f.write(export_stdout)
def get_proxy_settings(scheme='https'):
""" Returns a tuple containing (proxy_host, proxy_port). (False, False) if no proxy is found """
proxy_url = getproxies().get(scheme, '')
if not proxy_url:
return (False, False)
else:
parsed_url = urlparse(proxy_url)
if parsed_url.scheme:
(proxy_host, proxy_port) = parsed_url.netloc.split(':')
else:
(proxy_host, proxy_port) = parsed_url.path.split(':')
return (proxy_host, proxy_port)
def build_proxy_options():
""" Returns list of valid proxy options for keytool """
(proxy_host, proxy_port) = get_proxy_settings()
no_proxy = os.getenv("no_proxy")
proxy_opts = ''
if https_proxy is not None:
(proxy_host, proxy_port) = https_proxy.split(':')
proxy_opts = "-J-Dhttps.proxyHost=%s -J-Dhttps.proxyPort=%s" % (proxy_host, proxy_port)
proxy_opts = []
if proxy_host:
proxy_opts.extend(["-J-Dhttps.proxyHost=%s" % proxy_host, "-J-Dhttps.proxyPort=%s" % proxy_port])
if no_proxy is not None:
# For Java's nonProxyHosts property, items are separated by '|',
@ -210,46 +316,48 @@ def import_cert_url(module, executable, url, port, keystore_path, keystore_pass,
# The property name is http.nonProxyHosts, there is no
# separate setting for HTTPS.
proxy_opts += " -J-Dhttp.nonProxyHosts='%s'" % non_proxy_hosts
proxy_opts.extend(["-J-Dhttp.nonProxyHosts=%s" % non_proxy_hosts])
return proxy_opts
fetch_cmd = "%s -printcert -rfc -sslserver %s %s:%d" % (executable, proxy_opts, url, port)
import_cmd = ("%s -importcert -noprompt -keystore '%s' "
"-storepass '%s' -alias '%s' %s") % (executable, keystore_path,
keystore_pass, alias,
get_keystore_type(keystore_type))
if trust_cacert:
import_cmd = import_cmd + " -trustcacerts"
def _download_cert_url(module, executable, url, port):
""" Fetches the certificate from the remote URL using `keytool -printcert...`
The PEM formatted string is returned """
proxy_opts = build_proxy_options()
fetch_cmd = [executable, "-printcert", "-rfc", "-sslserver"] + proxy_opts + ["%s:%d" % (url, port)]
# Fetch SSL certificate from remote host.
dummy, fetch_out, dummy = module.run_command(fetch_cmd, check_rc=True)
(fetch_rc, fetch_out, fetch_err) = module.run_command(fetch_cmd, check_rc=False)
# Use remote certificate from remote host and import it to a java keystore
(import_rc, import_out, import_err) = module.run_command(import_cmd,
data=fetch_out,
check_rc=False)
diff = {'before': '\n', 'after': '%s\n' % alias}
if import_rc == 0:
module.exit_json(changed=True, msg=import_out,
rc=import_rc, cmd=import_cmd, stdout=import_out,
diff=diff)
else:
module.fail_json(msg=import_out, rc=import_rc, cmd=import_cmd,
error=import_err)
if fetch_rc != 0:
module.fail_json(msg="Internal module failure, cannot download certificate, error: %s" % fetch_err,
rc=fetch_rc, cmd=fetch_cmd)
return fetch_out
def import_cert_path(module, executable, path, keystore_path, keystore_pass, alias, keystore_type, trust_cacert):
''' Import certificate from path into keystore located on
keystore_path as alias '''
import_cmd = ("%s -importcert -noprompt -keystore '%s' "
"-storepass '%s' -file '%s' -alias '%s' %s") % (executable, keystore_path,
keystore_pass, path, alias,
get_keystore_type(keystore_type))
import_cmd = [
executable,
"-importcert",
"-noprompt",
"-keystore",
keystore_path,
"-file",
path,
"-alias",
alias
]
import_cmd += _get_keystore_type_keytool_parameters(keystore_type)
if trust_cacert:
import_cmd = import_cmd + " -trustcacerts"
import_cmd.extend(["-trustcacerts"])
# Use local certificate from local path and import it to a java keystore
(import_rc, import_out, import_err) = module.run_command(import_cmd,
data="%s\n%s" % (keystore_pass, keystore_pass),
check_rc=False)
diff = {'before': '\n', 'after': '%s\n' % alias}
@ -261,41 +369,29 @@ def import_cert_path(module, executable, path, keystore_path, keystore_pass, ali
module.fail_json(msg=import_out, rc=import_rc, cmd=import_cmd)
def import_pkcs12_path(module, executable, path, keystore_path, keystore_pass, pkcs12_pass, pkcs12_alias, alias, keystore_type):
''' Import pkcs12 from path into keystore located on
keystore_path as alias '''
import_cmd = ("%s -importkeystore -noprompt -destkeystore '%s' -srcstoretype PKCS12 "
"-deststorepass '%s' -destkeypass '%s' -srckeystore '%s' -srcstorepass '%s' "
"-srcalias '%s' -destalias '%s' %s") % (executable, keystore_path, keystore_pass,
keystore_pass, path, pkcs12_pass, pkcs12_alias,
alias, get_keystore_type(keystore_type))
# Use local certificate from local path and import it to a java keystore
(import_rc, import_out, import_err) = module.run_command(import_cmd,
check_rc=False)
diff = {'before': '\n', 'after': '%s\n' % alias}
if import_rc == 0:
module.exit_json(changed=True, msg=import_out,
rc=import_rc, cmd=import_cmd, stdout=import_out,
error=import_err, diff=diff)
else:
module.fail_json(msg=import_out, rc=import_rc, cmd=import_cmd)
def delete_cert(module, executable, keystore_path, keystore_pass, alias, keystore_type):
def delete_cert(module, executable, keystore_path, keystore_pass, alias, keystore_type, exit_after=True):
''' Delete certificate identified with alias from keystore on keystore_path '''
del_cmd = ("%s -delete -keystore '%s' -storepass '%s' "
"-alias '%s' %s") % (executable, keystore_path, keystore_pass, alias, get_keystore_type(keystore_type))
del_cmd = [
executable,
"-delete",
"-noprompt",
"-keystore",
keystore_path,
"-alias",
alias
]
del_cmd += _get_keystore_type_keytool_parameters(keystore_type)
# Delete SSL certificate from keystore
(del_rc, del_out, del_err) = module.run_command(del_cmd, check_rc=True)
(del_rc, del_out, del_err) = module.run_command(del_cmd, data=keystore_pass, check_rc=True)
diff = {'before': '%s\n' % alias, 'after': None}
if exit_after:
diff = {'before': '%s\n' % alias, 'after': None}
module.exit_json(changed=True, msg=del_out,
rc=del_rc, cmd=del_cmd, stdout=del_out,
error=del_err, diff=diff)
module.exit_json(changed=True, msg=del_out,
rc=del_rc, cmd=del_cmd, stdout=del_out,
error=del_err, diff=diff)
def test_keytool(module, executable):
@ -333,7 +429,8 @@ def main():
module = AnsibleModule(
argument_spec=argument_spec,
required_one_of=[['cert_path', 'cert_url', 'pkcs12_path']],
required_if=[['state', 'present', ('cert_path', 'cert_url', 'pkcs12_path'), True],
['state', 'absent', ('cert_url', 'cert_alias'), True]],
required_together=[['keystore_path', 'keystore_pass']],
mutually_exclusive=[
['cert_url', 'cert_path', 'pkcs12_path']
@ -359,6 +456,9 @@ def main():
executable = module.params.get('executable')
state = module.params.get('state')
# openssl dependency resolution
openssl_bin = module.get_bin_path('openssl', True)
if path and not cert_alias:
module.fail_json(changed=False,
msg="Using local path import from %s requires alias argument."
@ -369,31 +469,62 @@ def main():
if not keystore_create:
test_keystore(module, keystore_path)
cert_present = check_cert_present(module, executable, keystore_path,
keystore_pass, cert_alias, keystore_type)
alias_exists, alias_exists_output = _check_cert_present(
module, executable, keystore_path, keystore_pass, cert_alias, keystore_type)
if state == 'absent' and cert_present:
(dummy, new_certificate) = tempfile.mkstemp()
(dummy, old_certificate) = tempfile.mkstemp()
module.add_cleanup_file(new_certificate)
module.add_cleanup_file(old_certificate)
if state == 'absent' and alias_exists:
if module.check_mode:
module.exit_json(changed=True)
# delete and exit
delete_cert(module, executable, keystore_path, keystore_pass, cert_alias, keystore_type)
elif state == 'present' and not cert_present:
if module.check_mode:
module.exit_json(changed=True)
# dump certificate to enroll in the keystore on disk and compute digest
if state == 'present':
# The alias exists in the keystore so we must now compare the SHA256 hash of the
# public certificate already in the keystore, and the certificate we are wanting to add
if alias_exists:
with open(old_certificate, "w") as f:
f.write(alias_exists_output)
keystore_cert_digest = _get_digest_from_x509_file(module, old_certificate, openssl_bin)
else:
keystore_cert_digest = ''
if pkcs12_path:
import_pkcs12_path(module, executable, pkcs12_path, keystore_path,
keystore_pass, pkcs12_pass, pkcs12_alias, cert_alias, keystore_type)
# Extracting certificate with openssl
_export_public_cert_from_pkcs12(module, executable, pkcs12_path, cert_alias, pkcs12_pass, new_certificate)
if path:
import_cert_path(module, executable, path, keystore_path,
elif path:
# Extracting the X509 digest is a bit easier. Keytool will print the PEM
# certificate to stdout so we don't need to do any transformations.
new_certificate = path
elif url:
# Getting the X509 digest from a URL is the same as from a path, we just have
# to download the cert first
_get_certificate_from_url(module, executable, url, port, new_certificate)
new_cert_digest = _get_digest_from_x509_file(module, new_certificate, openssl_bin)
if keystore_cert_digest != new_cert_digest:
if module.check_mode:
module.exit_json(changed=True)
if alias_exists:
# The certificate in the keystore does not match with the one we want to be present
# The existing certificate must first be deleted before we insert the correct one
delete_cert(module, executable, keystore_path, keystore_pass, cert_alias, keystore_type, exit_after=False)
import_cert_path(module, executable, new_certificate, keystore_path,
keystore_pass, cert_alias, keystore_type, trust_cacert)
if url:
import_cert_url(module, executable, url, port, keystore_path,
keystore_pass, cert_alias, keystore_type, trust_cacert)
module.exit_json(changed=False)