java_keystore: New ssl_backend option for cryptography (#2485)

* Adding cryptography as a backend for OpenSSL operations

* Updating unit tests and adding changelog fragment

* Allowing private key password option when using unprotected key

* Incorporating suggestions from initial review

* Centralizing module exit path
This commit is contained in:
Ajpantuso 2021-05-14 16:31:44 -04:00 committed by GitHub
parent 5d0a7f40f2
commit a385cbb11d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 414 additions and 231 deletions

View file

@ -88,9 +88,19 @@ options:
description:
- Mode the file should be.
required: false
ssl_backend:
description:
- Backend for loading private keys and certificates.
type: str
default: openssl
choices:
- openssl
- cryptography
version_added: 3.1.0
requirements:
- openssl in PATH
- openssl in PATH (when I(ssl_backend=openssl))
- keytool in PATH
- cryptography >= 3.0 (when I(ssl_backend=cryptography))
author:
- Guillaume Grossetie (@Mogztter)
- quidame (@quidame)
@ -164,55 +174,283 @@ import os
import re
import tempfile
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six import PY2
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
try:
from cryptography.hazmat.primitives.serialization.pkcs12 import serialize_key_and_certificates
from cryptography.hazmat.primitives.serialization import (
BestAvailableEncryption,
NoEncryption,
load_pem_private_key,
load_der_private_key,
)
from cryptography.x509 import (
load_pem_x509_certificate,
load_der_x509_certificate,
)
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl import backend
HAS_CRYPTOGRAPHY_PKCS12 = True
except ImportError:
HAS_CRYPTOGRAPHY_PKCS12 = False
def read_certificate_fingerprint(module, openssl_bin, certificate_path):
current_certificate_fingerprint_cmd = [openssl_bin, "x509", "-noout", "-in", certificate_path, "-fingerprint", "-sha256"]
(rc, current_certificate_fingerprint_out, current_certificate_fingerprint_err) = run_commands(module, current_certificate_fingerprint_cmd)
if rc != 0:
return module.fail_json(msg=current_certificate_fingerprint_out,
err=current_certificate_fingerprint_err,
cmd=current_certificate_fingerprint_cmd,
rc=rc)
class JavaKeystore:
def __init__(self, module):
self.module = module
current_certificate_match = re.search(r"=([\w:]+)", current_certificate_fingerprint_out)
if not current_certificate_match:
return module.fail_json(msg="Unable to find the current certificate fingerprint in %s" % current_certificate_fingerprint_out,
cmd=current_certificate_fingerprint_cmd,
rc=rc)
self.keytool_bin = module.get_bin_path('keytool', True)
return current_certificate_match.group(1)
def read_stored_certificate_fingerprint(module, keytool_bin, alias, keystore_path, keystore_password):
stored_certificate_fingerprint_cmd = [keytool_bin, "-list", "-alias", alias, "-keystore", keystore_path, "-storepass:env", "STOREPASS", "-v"]
(rc, stored_certificate_fingerprint_out, stored_certificate_fingerprint_err) = run_commands(
module, stored_certificate_fingerprint_cmd, environ_update=dict(STOREPASS=keystore_password))
if rc != 0:
if "keytool error: java.lang.Exception: Alias <%s> does not exist" % alias in stored_certificate_fingerprint_out:
return "alias mismatch"
if re.match(r'keytool error: java\.io\.IOException: [Kk]eystore( was tampered with, or)? password was incorrect',
stored_certificate_fingerprint_out):
return "password mismatch"
return module.fail_json(msg=stored_certificate_fingerprint_out,
err=stored_certificate_fingerprint_err,
cmd=stored_certificate_fingerprint_cmd,
rc=rc)
stored_certificate_match = re.search(r"SHA256: ([\w:]+)", stored_certificate_fingerprint_out)
if not stored_certificate_match:
return module.fail_json(msg="Unable to find the stored certificate fingerprint in %s" % stored_certificate_fingerprint_out,
cmd=stored_certificate_fingerprint_cmd,
rc=rc)
return stored_certificate_match.group(1)
def run_commands(module, cmd, data=None, environ_update=None, check_rc=False):
return module.run_command(cmd, check_rc=check_rc, data=data, environ_update=environ_update)
self.certificate = module.params['certificate']
self.keypass = module.params['private_key_passphrase']
self.keystore_path = module.params['dest']
self.name = module.params['name']
self.password = module.params['password']
self.private_key = module.params['private_key']
self.ssl_backend = module.params['ssl_backend']
if self.ssl_backend == 'openssl':
self.openssl_bin = module.get_bin_path('openssl', True)
else:
if not HAS_CRYPTOGRAPHY_PKCS12:
self.module.fail_json(msg=missing_required_lib('cryptography >= 3.0'))
if module.params['certificate_path'] is None:
self.certificate_path = create_file(self.certificate)
self.module.add_cleanup_file(self.certificate_path)
else:
self.certificate_path = module.params['certificate_path']
if module.params['private_key_path'] is None:
self.private_key_path = create_file(self.private_key)
self.module.add_cleanup_file(self.private_key_path)
else:
self.private_key_path = module.params['private_key_path']
def update_permissions(self):
try:
file_args = self.module.load_file_common_arguments(self.module.params, path=self.keystore_path)
except TypeError:
# The path argument is only supported in Ansible-base 2.10+. Fall back to
# pre-2.10 behavior for older Ansible versions.
self.module.params['path'] = self.keystore_path
file_args = self.module.load_file_common_arguments(self.module.params)
return self.module.set_fs_attributes_if_different(file_args, False)
def read_certificate_fingerprint(self, cert_format='PEM'):
if self.ssl_backend == 'cryptography':
if cert_format == 'PEM':
cert_loader = load_pem_x509_certificate
else:
cert_loader = load_der_x509_certificate
try:
with open(self.certificate_path, 'rb') as cert_file:
cert = cert_loader(
cert_file.read(),
backend=backend
)
except (OSError, ValueError) as e:
self.module.fail_json(msg="Unable to read the provided certificate: %s" % to_native(e))
fp = hex_decode(cert.fingerprint(hashes.SHA256())).upper()
fingerprint = ':'.join([fp[i:i + 2] for i in range(0, len(fp), 2)])
else:
current_certificate_fingerprint_cmd = [
self.openssl_bin, "x509", "-noout", "-in", self.certificate_path, "-fingerprint", "-sha256"
]
(rc, current_certificate_fingerprint_out, current_certificate_fingerprint_err) = self.module.run_command(
current_certificate_fingerprint_cmd,
environ_update=None,
check_rc=False
)
if rc != 0:
return self.module.fail_json(
msg=current_certificate_fingerprint_out,
err=current_certificate_fingerprint_err,
cmd=current_certificate_fingerprint_cmd,
rc=rc
)
current_certificate_match = re.search(r"=([\w:]+)", current_certificate_fingerprint_out)
if not current_certificate_match:
return self.module.fail_json(
msg="Unable to find the current certificate fingerprint in %s" % (
current_certificate_fingerprint_out
),
cmd=current_certificate_fingerprint_cmd,
rc=rc
)
fingerprint = current_certificate_match.group(1)
return fingerprint
def read_stored_certificate_fingerprint(self):
stored_certificate_fingerprint_cmd = [
self.keytool_bin, "-list", "-alias", self.name, "-keystore",
self.keystore_path, "-storepass:env", "STOREPASS", "-v"
]
(rc, stored_certificate_fingerprint_out, stored_certificate_fingerprint_err) = self.module.run_command(
stored_certificate_fingerprint_cmd, environ_update=dict(STOREPASS=self.password), check_rc=False)
if rc != 0:
if "keytool error: java.lang.Exception: Alias <%s> does not exist" % self.name \
in stored_certificate_fingerprint_out:
return "alias mismatch"
if re.match(
r'keytool error: java\.io\.IOException: ' +
'[Kk]eystore( was tampered with, or)? password was incorrect',
stored_certificate_fingerprint_out
):
return "password mismatch"
return self.module.fail_json(
msg=stored_certificate_fingerprint_out,
err=stored_certificate_fingerprint_err,
cmd=stored_certificate_fingerprint_cmd,
rc=rc
)
stored_certificate_match = re.search(r"SHA256: ([\w:]+)", stored_certificate_fingerprint_out)
if not stored_certificate_match:
return self.module.fail_json(
msg="Unable to find the stored certificate fingerprint in %s" % stored_certificate_fingerprint_out,
cmd=stored_certificate_fingerprint_cmd,
rc=rc
)
return stored_certificate_match.group(1)
def cert_changed(self):
current_certificate_fingerprint = self.read_certificate_fingerprint()
stored_certificate_fingerprint = self.read_stored_certificate_fingerprint()
return current_certificate_fingerprint != stored_certificate_fingerprint
def cryptography_create_pkcs12_bundle(self, keystore_p12_path, key_format='PEM', cert_format='PEM'):
if key_format == 'PEM':
key_loader = load_pem_private_key
else:
key_loader = load_der_private_key
if cert_format == 'PEM':
cert_loader = load_pem_x509_certificate
else:
cert_loader = load_der_x509_certificate
try:
with open(self.private_key_path, 'rb') as key_file:
private_key = key_loader(
key_file.read(),
password=to_bytes(self.keypass),
backend=backend
)
except TypeError:
# Re-attempt with no password to match existing behavior
try:
with open(self.private_key_path, 'rb') as key_file:
private_key = key_loader(
key_file.read(),
password=None,
backend=backend
)
except (OSError, TypeError, ValueError, UnsupportedAlgorithm) as e:
self.module.fail_json(
msg="The following error occurred while loading the provided private_key: %s" % to_native(e)
)
except (OSError, ValueError, UnsupportedAlgorithm) as e:
self.module.fail_json(
msg="The following error occurred while loading the provided private_key: %s" % to_native(e)
)
try:
with open(self.certificate_path, 'rb') as cert_file:
cert = cert_loader(
cert_file.read(),
backend=backend
)
except (OSError, ValueError, UnsupportedAlgorithm) as e:
self.module.fail_json(
msg="The following error occurred while loading the provided certificate: %s" % to_native(e)
)
if self.password:
encryption = BestAvailableEncryption(to_bytes(self.password))
else:
encryption = NoEncryption()
pkcs12_bundle = serialize_key_and_certificates(
name=to_bytes(self.name),
key=private_key,
cert=cert,
cas=None,
encryption_algorithm=encryption
)
with open(keystore_p12_path, 'wb') as p12_file:
p12_file.write(pkcs12_bundle)
def openssl_create_pkcs12_bundle(self, keystore_p12_path):
export_p12_cmd = [self.openssl_bin, "pkcs12", "-export", "-name", self.name, "-in", self.certificate_path,
"-inkey", self.private_key_path, "-out", keystore_p12_path, "-passout", "stdin"]
# when keypass is provided, add -passin
cmd_stdin = ""
if self.keypass:
export_p12_cmd.append("-passin")
export_p12_cmd.append("stdin")
cmd_stdin = "%s\n" % self.keypass
cmd_stdin += "%s\n%s" % (self.password, self.password)
(rc, export_p12_out, dummy) = self.module.run_command(
export_p12_cmd, data=cmd_stdin, environ_update=None, check_rc=False
)
if rc != 0:
self.module.fail_json(msg=export_p12_out, cmd=export_p12_cmd, rc=rc)
def create(self):
if self.module.check_mode:
return {'changed': True}
if os.path.exists(self.keystore_path):
os.remove(self.keystore_path)
keystore_p12_path = create_path()
self.module.add_cleanup_file(keystore_p12_path)
if self.ssl_backend == 'cryptography':
self.cryptography_create_pkcs12_bundle(keystore_p12_path)
else:
self.openssl_create_pkcs12_bundle(keystore_p12_path)
import_keystore_cmd = [self.keytool_bin, "-importkeystore",
"-destkeystore", self.keystore_path,
"-srckeystore", keystore_p12_path,
"-srcstoretype", "pkcs12",
"-alias", self.name,
"-deststorepass:env", "STOREPASS",
"-srcstorepass:env", "STOREPASS",
"-noprompt"]
(rc, import_keystore_out, dummy) = self.module.run_command(
import_keystore_cmd, data=None, environ_update=dict(STOREPASS=self.password), check_rc=False
)
if rc != 0:
return self.module.fail_json(msg=import_keystore_out, cmd=import_keystore_cmd, rc=rc)
self.update_permissions()
return {
'changed': True,
'msg': import_keystore_out,
'cmd': import_keystore_cmd,
'rc': rc
}
def exists(self):
return os.path.exists(self.keystore_path)
# Utility functions
def create_path():
dummy, tmpfile = tempfile.mkstemp()
os.remove(tmpfile)
@ -226,123 +464,11 @@ def create_file(content):
return tmpfile
def create_tmp_certificate(module):
return create_file(module.params['certificate'])
def create_tmp_private_key(module):
return create_file(module.params['private_key'])
def cert_changed(module, openssl_bin, keytool_bin, keystore_path, keystore_pass, alias):
certificate_path = module.params['certificate_path']
if certificate_path is None:
certificate_path = create_tmp_certificate(module)
try:
current_certificate_fingerprint = read_certificate_fingerprint(module, openssl_bin, certificate_path)
stored_certificate_fingerprint = read_stored_certificate_fingerprint(module, keytool_bin, alias, keystore_path, keystore_pass)
return current_certificate_fingerprint != stored_certificate_fingerprint
finally:
if module.params['certificate_path'] is None:
os.remove(certificate_path)
def create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass):
if module.check_mode:
return module.exit_json(changed=True)
certificate_path = module.params['certificate_path']
if certificate_path is None:
certificate_path = create_tmp_certificate(module)
private_key_path = module.params['private_key_path']
if private_key_path is None:
private_key_path = create_tmp_private_key(module)
keystore_p12_path = create_path()
try:
if os.path.exists(keystore_path):
os.remove(keystore_path)
export_p12_cmd = [openssl_bin, "pkcs12", "-export", "-name", name, "-in", certificate_path,
"-inkey", private_key_path, "-out", keystore_p12_path, "-passout", "stdin"]
# when keypass is provided, add -passin
cmd_stdin = ""
if keypass:
export_p12_cmd.append("-passin")
export_p12_cmd.append("stdin")
cmd_stdin = "%s\n" % keypass
cmd_stdin += "%s\n%s" % (password, password)
(rc, export_p12_out, dummy) = run_commands(module, export_p12_cmd, data=cmd_stdin)
if rc != 0:
return module.fail_json(msg=export_p12_out,
cmd=export_p12_cmd,
rc=rc)
import_keystore_cmd = [keytool_bin, "-importkeystore",
"-destkeystore", keystore_path,
"-srckeystore", keystore_p12_path,
"-srcstoretype", "pkcs12",
"-alias", name,
"-deststorepass:env", "STOREPASS",
"-srcstorepass:env", "STOREPASS",
"-noprompt"]
(rc, import_keystore_out, dummy) = run_commands(module, import_keystore_cmd, data=None,
environ_update=dict(STOREPASS=password))
if rc != 0:
return module.fail_json(msg=import_keystore_out,
cmd=import_keystore_cmd,
rc=rc)
update_jks_perm(module, keystore_path)
return module.exit_json(changed=True,
msg=import_keystore_out,
cmd=import_keystore_cmd,
rc=rc)
finally:
if module.params['certificate_path'] is None:
os.remove(certificate_path)
if module.params['private_key_path'] is None:
os.remove(private_key_path)
os.remove(keystore_p12_path)
def update_jks_perm(module, keystore_path):
try:
file_args = module.load_file_common_arguments(module.params, path=keystore_path)
except TypeError:
# The path argument is only supported in Ansible-base 2.10+. Fall back to
# pre-2.10 behavior for older Ansible versions.
module.params['path'] = keystore_path
file_args = module.load_file_common_arguments(module.params)
module.set_fs_attributes_if_different(file_args, False)
def process_jks(module):
name = module.params['name']
password = module.params['password']
keypass = module.params['private_key_passphrase']
keystore_path = module.params['dest']
force = module.params['force']
openssl_bin = module.get_bin_path('openssl', True)
keytool_bin = module.get_bin_path('keytool', True)
if os.path.exists(keystore_path):
if force:
create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass)
else:
if cert_changed(module, openssl_bin, keytool_bin, keystore_path, password, name):
create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass)
else:
if not module.check_mode:
update_jks_perm(module, keystore_path)
module.exit_json(changed=False)
def hex_decode(s):
if PY2:
return s.decode('hex')
else:
create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass)
return s.hex()
class ArgumentSpec(object):
@ -358,6 +484,7 @@ class ArgumentSpec(object):
private_key_path=dict(type='path', no_log=False),
private_key_passphrase=dict(type='str', no_log=True),
password=dict(type='str', required=True, no_log=True),
ssl_backend=dict(type='str', default='openssl', choices=['openssl', 'cryptography']),
force=dict(type='bool', default=False),
)
choose_between = (
@ -379,7 +506,19 @@ def main():
add_file_common_args=spec.add_file_common_args,
)
module.run_command_environ_update = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C')
process_jks(module)
result = dict()
jks = JavaKeystore(module)
if jks.exists():
if module.params['force'] or jks.cert_changed():
result = jks.create()
else:
result['changed'] = jks.update_permissions()
else:
result = jks.create()
module.exit_json(**result)
if __name__ == '__main__':