openssl_privatekey: add ECC support (#49416)

* Add cryptography backend for openssl_privatekey.

* Adding ECC support.

No support for X25519 and X449, since they don't support serialization.

* Improve finterprint calculation to work with Python 3.

* Add fingerprint check.

* Fix typo.

* Use separate curve option for elliptic curves, and use type 'ECC'.

* Using curve names as defined in IANA registry.

* Bump minimal supported cryptography version. Older versions might work as well, but I couldn't test them.

* Improve documentation.
This commit is contained in:
Felix Fontein 2018-12-18 10:07:36 +01:00 committed by John R Barker
commit 92ef500185
6 changed files with 694 additions and 98 deletions

View file

@ -16,15 +16,24 @@ ANSIBLE_METADATA = {'metadata_version': '1.1',
DOCUMENTATION = '''
---
module: openssl_privatekey
author: "Yanis Guenane (@Spredzy)"
author:
- "Yanis Guenane (@Spredzy)"
- "Felix Fontein (@felixfontein)"
version_added: "2.3"
short_description: Generate OpenSSL private keys.
description:
- "This module allows one to (re)generate OpenSSL private keys. It uses
the pyOpenSSL python library to interact with openssl. One can generate
either RSA or DSA private keys. Keys are generated in PEM format."
- "This module allows one to (re)generate OpenSSL private keys. One can
generate L(RSA,https://en.wikipedia.org/wiki/RSA_(cryptosystem)),
L(DSA,https://en.wikipedia.org/wiki/Digital_Signature_Algorithm) or
L(ECC,https://en.wikipedia.org/wiki/Elliptic-curve_cryptography)
private keys. Keys are generated in PEM format."
- "The module can use the cryptography Python library, or the pyOpenSSL Python
library. By default, it tries to detect which one is available. This can be
overridden with the I(select_crypto_backend) option."
requirements:
- "python-pyOpenSSL"
- "One of the following Python libraries:"
- "cryptography >= 1.2.3 (older versions might work as well)"
- "pyOpenSSL"
options:
state:
required: false
@ -40,9 +49,43 @@ options:
type:
required: false
default: "RSA"
choices: [ RSA, DSA ]
choices:
- RSA
- DSA
- ECC
# - X448
# - X25519
description:
- The algorithm used to generate the TLS/SSL private key
- "Note that C(ECC) requires the C(cryptography) backend. Depending on the curve, you need a newer
version of the cryptography backend."
curve:
required: false
choices:
- secp384r1
- secp521r1
- secp224r1
- secp192r1
- secp256k1
- brainpoolP256r1
- brainpoolP384r1
- brainpoolP512r1
- sect571k1
- sect409k1
- sect283k1
- sect233k1
- sect163k1
- sect571r1
- sect409r1
- sect283r1
- sect233r1
- sect163r2
description:
- Note that not all curves are supported by all versions of C(cryptography).
- For maximal interoperability, C(secp384r1) or C(secp256k1) should be used.
- We use the curve names as defined in the
L(IANA registry for TLS,https://www.iana.org/assignments/tls-parameters/tls-parameters.xhtml#tls-parameters-8).
version_added: "2.8"
force:
required: false
default: False
@ -62,7 +105,24 @@ options:
required: false
description:
- The cipher to encrypt the private key. (cipher can be found by running `openssl list-cipher-algorithms`)
- When using the C(cryptography) backend, use C(auto).
version_added: "2.4"
select_crypto_backend:
description:
- "Determines which crypto backend to use. The default choice is C(auto),
which tries to use C(cryptography) if available, and falls back to
C(pyopenssl)."
- "If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/)
library."
- "If set to C(cryptography), will try to use the
L(cryptography,https://cryptography.io/) library."
type: str
default: 'auto'
choices:
- auto
- cryptography
- pyopenssl
version_added: "2.8"
extends_documentation_fragment: files
'''
@ -105,14 +165,20 @@ type:
returned: changed or success
type: string
sample: RSA
curve:
description: Elliptic curve used to generate the TLS/SSL private key
returned: changed or success, and I(type) is C(ECC)
type: string
sample: secp256k1
filename:
description: Path to the generated TLS/SSL private key file
returned: changed or success
type: string
sample: /etc/ssl/private/ansible.com.pem
fingerprint:
description: The fingerprint of the public key. Fingerprint will be generated for each hashlib.algorithms available.
Requires PyOpenSSL >= 16.0 for meaningful output.
description: The fingerprint of the public key. Fingerprint will be generated for
each C(hashlib.algorithms) available.
The PyOpenSSL backend requires PyOpenSSL >= 16.0 for meaningful output.
returned: changed or success
type: dict
sample:
@ -124,15 +190,46 @@ fingerprint:
sha512: "fd:ed:5e:39:48:5f:9f:fe:7f:25:06:3f:79:08:cd:ee:a5:e7:b3:3d:13:82:87:1f:84:e1:f5:c7:28:77:53:94:86:56:38:69:f0:d9:35:22:01:1e:a6:60:...:0f:9b"
'''
import abc
import os
import traceback
from distutils.version import LooseVersion
MINIMAL_PYOPENSSL_VERSION = '0.6'
MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3'
try:
import OpenSSL
from OpenSSL import crypto
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
except ImportError:
pyopenssl_found = False
PYOPENSSL_FOUND = False
else:
pyopenssl_found = True
PYOPENSSL_FOUND = True
try:
import cryptography
import cryptography.exceptions
import cryptography.hazmat.backends
import cryptography.hazmat.primitives.serialization
import cryptography.hazmat.primitives.asymmetric.rsa
import cryptography.hazmat.primitives.asymmetric.dsa
import cryptography.hazmat.primitives.asymmetric.ec
import cryptography.hazmat.primitives.asymmetric.utils
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True
try:
import cryptography.hazmat.primitives.asymmetric.x25519
CRYPTOGRAPHY_HAS_X25519 = True
except ImportError:
CRYPTOGRAPHY_HAS_X25519 = False
try:
import cryptography.hazmat.primitives.asymmetric.x448
CRYPTOGRAPHY_HAS_X448 = True
except ImportError:
CRYPTOGRAPHY_HAS_X448 = False
from ansible.module_utils import crypto as crypto_utils
from ansible.module_utils._text import to_native, to_bytes
@ -144,10 +241,10 @@ class PrivateKeyError(crypto_utils.OpenSSLObjectError):
pass
class PrivateKey(crypto_utils.OpenSSLObject):
class PrivateKeyBase(crypto_utils.OpenSSLObject):
def __init__(self, module):
super(PrivateKey, self).__init__(
super(PrivateKeyBase, self).__init__(
module.params['path'],
module.params['state'],
module.params['force'],
@ -163,21 +260,19 @@ class PrivateKey(crypto_utils.OpenSSLObject):
if self.mode is None:
self.mode = 0o600
self.type = crypto.TYPE_RSA
if module.params['type'] == 'DSA':
self.type = crypto.TYPE_DSA
@abc.abstractmethod
def _generate_private_key_data(self):
pass
@abc.abstractmethod
def _get_fingerprint(self):
pass
def generate(self, module):
"""Generate a keypair."""
if not self.check(module, perms_required=False) or self.force:
self.privatekey = crypto.PKey()
try:
self.privatekey.generate_key(self.type, self.size)
except (TypeError, ValueError) as exc:
raise PrivateKeyError(exc)
privatekey_data = self._generate_private_key_data()
try:
privatekey_file = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
os.close(privatekey_file)
@ -192,46 +287,35 @@ class PrivateKey(crypto_utils.OpenSSLObject):
module.fail_json(msg="%s" % to_native(e), exception=traceback.format_exc())
os.chmod(self.path, self.mode)
privatekey_file = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode)
if self.cipher and self.passphrase:
os.write(privatekey_file, crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey,
self.cipher, to_bytes(self.passphrase)))
else:
os.write(privatekey_file, crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey))
os.write(privatekey_file, privatekey_data)
os.close(privatekey_file)
self.changed = True
except IOError as exc:
self.remove()
raise PrivateKeyError(exc)
self.fingerprint = crypto_utils.get_fingerprint(self.path, self.passphrase)
self.fingerprint = self._get_fingerprint()
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
self.changed = True
@abc.abstractmethod
def _check_passphrase(self):
pass
@abc.abstractmethod
def _check_size_and_type(self):
pass
def check(self, module, perms_required=True):
"""Ensure the resource is in its desired state."""
state_and_perms = super(PrivateKey, self).check(module, perms_required)
state_and_perms = super(PrivateKeyBase, self).check(module, perms_required)
def _check_size(privatekey):
return self.size == privatekey.bits()
def _check_type(privatekey):
return self.type == privatekey.type()
def _check_passphrase():
try:
crypto_utils.load_privatekey(self.path, self.passphrase)
return True
except crypto.Error:
return False
if not state_and_perms or not _check_passphrase():
if not state_and_perms or not self._check_passphrase():
return False
privatekey = crypto_utils.load_privatekey(self.path, self.passphrase)
return _check_size(privatekey) and _check_type(privatekey)
return self._check_size_and_type()
def dump(self):
"""Serialize the object into a dictionary."""
@ -243,6 +327,62 @@ class PrivateKey(crypto_utils.OpenSSLObject):
'fingerprint': self.fingerprint,
}
return result
# Implementation with using pyOpenSSL
class PrivateKeyPyOpenSSL(PrivateKeyBase):
def __init__(self, module):
super(PrivateKeyPyOpenSSL, self).__init__(module)
if module.params['type'] == 'RSA':
self.type = crypto.TYPE_RSA
elif module.params['type'] == 'DSA':
self.type = crypto.TYPE_DSA
else:
module.fail_json(msg="PyOpenSSL backend only supports RSA and DSA keys.")
def _generate_private_key_data(self):
self.privatekey = crypto.PKey()
try:
self.privatekey.generate_key(self.type, self.size)
except (TypeError, ValueError) as exc:
raise PrivateKeyError(exc)
if self.cipher and self.passphrase:
return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey,
self.cipher, to_bytes(self.passphrase))
else:
return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey)
def _get_fingerprint(self):
return crypto_utils.get_fingerprint(self.path, self.passphrase)
def _check_passphrase(self):
try:
crypto_utils.load_privatekey(self.path, self.passphrase)
return True
except crypto.Error:
return False
def _check_size_and_type(self):
def _check_size(privatekey):
return self.size == privatekey.bits()
def _check_type(privatekey):
return self.type == privatekey.type()
privatekey = crypto_utils.load_privatekey(self.path, self.passphrase)
return _check_size(privatekey) and _check_type(privatekey)
def dump(self):
"""Serialize the object into a dictionary."""
result = super(PrivateKeyPyOpenSSL, self).dump()
if self.type == crypto.TYPE_RSA:
result['type'] = 'RSA'
else:
@ -251,26 +391,197 @@ class PrivateKey(crypto_utils.OpenSSLObject):
return result
# Implementation with using cryptography
class PrivateKeyCryptography(PrivateKeyBase):
def _get_ec_class(self, ectype):
ecclass = cryptography.hazmat.primitives.asymmetric.ec.__dict__.get(ectype)
if ecclass is None:
self.module.fail_json(msg='Your cryptography version does not support {0}'.format(ectype))
return ecclass
def _add_curve(self, name, ectype, deprecated=False):
def create(size):
ecclass = self._get_ec_class(ectype)
return ecclass()
def verify(privatekey):
ecclass = self._get_ec_class(ectype)
return isinstance(privatekey.private_numbers().public_numbers.curve, ecclass)
self.curves[name] = {
'create': create,
'verify': verify,
'deprecated': deprecated,
}
def __init__(self, module):
super(PrivateKeyCryptography, self).__init__(module)
self.curves = dict()
self._add_curve('secp384r1', 'SECP384R1')
self._add_curve('secp521r1', 'SECP521R1')
self._add_curve('secp224r1', 'SECP224R1')
self._add_curve('secp192r1', 'SECP192R1')
self._add_curve('secp256k1', 'SECP256K1')
self._add_curve('brainpoolP256r1', 'BrainpoolP256R1', deprecated=True)
self._add_curve('brainpoolP384r1', 'BrainpoolP384R1', deprecated=True)
self._add_curve('brainpoolP512r1', 'BrainpoolP512R1', deprecated=True)
self._add_curve('sect571k1', 'SECT571K1', deprecated=True)
self._add_curve('sect409k1', 'SECT409K1', deprecated=True)
self._add_curve('sect283k1', 'SECT283K1', deprecated=True)
self._add_curve('sect233k1', 'SECT233K1', deprecated=True)
self._add_curve('sect163k1', 'SECT163K1', deprecated=True)
self._add_curve('sect571r1', 'SECT571R1', deprecated=True)
self._add_curve('sect409r1', 'SECT409R1', deprecated=True)
self._add_curve('sect283r1', 'SECT283R1', deprecated=True)
self._add_curve('sect233r1', 'SECT233R1', deprecated=True)
self._add_curve('sect163r2', 'SECT163R2', deprecated=True)
self.module = module
self.cryptography_backend = cryptography.hazmat.backends.default_backend()
self.type = module.params['type']
self.curve = module.params['curve']
if not CRYPTOGRAPHY_HAS_X25519 and self.type == 'X25519':
self.module.fail_json(msg='Your cryptography version does not support X25519')
if not CRYPTOGRAPHY_HAS_X448 and self.type == 'X448':
self.module.fail_json(msg='Your cryptography version does not support X448')
def _generate_private_key_data(self):
try:
if self.type == 'RSA':
self.privatekey = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key(
public_exponent=65537, # OpenSSL always uses this
key_size=self.size,
backend=self.cryptography_backend
)
if self.type == 'DSA':
self.privatekey = cryptography.hazmat.primitives.asymmetric.dsa.generate_private_key(
key_size=self.size,
backend=self.cryptography_backend
)
if CRYPTOGRAPHY_HAS_X25519 and self.type == 'X25519':
self.privatekey = cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.generate()
if CRYPTOGRAPHY_HAS_X448 and self.type == 'X448':
self.privatekey = cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.generate()
if self.type == 'ECC' and self.curve in self.curves:
if self.curves[self.curve]['deprecated']:
self.module.warn('Elliptic curves of type {0} should not be used for new keys!'.format(self.curve))
self.privatekey = cryptography.hazmat.primitives.asymmetric.ec.generate_private_key(
curve=self.curves[self.curve]['create'](self.size),
backend=self.cryptography_backend
)
except cryptography.exceptions.UnsupportedAlgorithm as e:
self.module.fail_json(msg='Cryptography backend does not support the algorithm required for {0}'.format(self.type))
# Select key encryption
encryption_algorithm = cryptography.hazmat.primitives.serialization.NoEncryption()
if self.cipher and self.passphrase:
if self.cipher == 'auto':
encryption_algorithm = cryptography.hazmat.primitives.serialization.BestAvailableEncryption(to_bytes(self.passphrase))
else:
self.module.fail_json(msg='Cryptography backend can only use "auto" for cipher option.')
# Serialize key
return self.privatekey.private_bytes(
encoding=cryptography.hazmat.primitives.serialization.Encoding.PEM,
format=cryptography.hazmat.primitives.serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=encryption_algorithm
)
def _load_privatekey(self):
try:
with open(self.path, 'rb') as f:
return cryptography.hazmat.primitives.serialization.load_pem_private_key(
f.read(),
None if self.passphrase is None else to_bytes(self.passphrase),
backend=self.cryptography_backend
)
except Exception as e:
raise PrivateKeyError(e)
def _get_fingerprint(self):
# Get bytes of public key
private_key = self._load_privatekey()
public_key = private_key.public_key()
public_key_bytes = public_key.public_bytes(
cryptography.hazmat.primitives.serialization.Encoding.DER,
cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo
)
# Get fingerprints of public_key_bytes
return crypto_utils.get_fingerprint_of_bytes(public_key_bytes)
def _check_passphrase(self):
try:
self._load_privatekey()
return True
except crypto.Error:
return False
def _check_size_and_type(self):
privatekey = self._load_privatekey()
if isinstance(privatekey, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
return self.type == 'RSA' and self.size == privatekey.key_size
if isinstance(privatekey, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
return self.type == 'DSA' and self.size == privatekey.key_size
if CRYPTOGRAPHY_HAS_X25519 and isinstance(privatekey, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey):
return self.type == 'X25519'
if CRYPTOGRAPHY_HAS_X448 and isinstance(privatekey, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey):
return self.type == 'X448'
if isinstance(privatekey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
if self.type != 'ECC':
return False
if self.curve not in self.curves:
return False
return self.curves[self.curve]['verify'](privatekey)
return False
def dump(self):
"""Serialize the object into a dictionary."""
result = super(PrivateKeyCryptography, self).dump()
result['type'] = self.type
if self.type == 'ECC':
result['curve'] = self.curve
return result
def main():
module = AnsibleModule(
argument_spec=dict(
state=dict(default='present', choices=['present', 'absent'], type='str'),
size=dict(default=4096, type='int'),
type=dict(default='RSA', choices=['RSA', 'DSA'], type='str'),
type=dict(default='RSA', choices=[
'RSA', 'DSA', 'ECC',
# x25519 is missing serialization functions: https://github.com/pyca/cryptography/issues/4386
# x448 is also missing it: https://github.com/pyca/cryptography/pull/4580#issuecomment-437913340
# 'X448', 'X25519',
], type='str'),
curve=dict(choices=[
'secp384r1', 'secp521r1', 'secp224r1', 'secp192r1', 'secp256k1',
'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1',
'sect571k1', 'sect409k1', 'sect283k1', 'sect233k1', 'sect163k1',
'sect571r1', 'sect409r1', 'sect283r1', 'sect233r1', 'sect163r2',
], type='str'),
force=dict(default=False, type='bool'),
path=dict(required=True, type='path'),
passphrase=dict(type='str', no_log=True),
cipher=dict(type='str'),
select_crypto_backend=dict(required=False, choices=['auto', 'pyopenssl', 'cryptography'], default='auto', type='str'),
),
supports_check_mode=True,
add_file_common_args=True,
required_together=[['cipher', 'passphrase']],
required_together=[
['cipher', 'passphrase']
],
required_if=[
['type', 'ECC', ['curve']],
],
)
if not pyopenssl_found:
module.fail_json(msg='the python pyOpenSSL module is required')
base_dir = os.path.dirname(module.params['path'])
if not os.path.isdir(base_dir):
module.fail_json(
@ -278,7 +589,40 @@ def main():
msg='The directory %s does not exist or the file is not a directory' % base_dir
)
private_key = PrivateKey(module)
backend = module.params['select_crypto_backend']
if backend == 'auto':
# Detection what is possible
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
# Decision
if module.params['cipher'] and module.params['passphrase'] and module.params['cipher'] != 'auto':
# First try pyOpenSSL, then cryptography
if can_use_pyopenssl:
backend = 'pyopenssl'
elif can_use_cryptography:
backend = 'cryptography'
else:
# First try cryptography, then pyOpenSSL
if can_use_cryptography:
backend = 'cryptography'
elif can_use_pyopenssl:
backend = 'pyopenssl'
# Success?
if backend == 'auto':
module.fail_json(msg=('Can detect none of the Python libraries '
'cryptography (>= {0}) and pyOpenSSL (>= {1})').format(
MINIMAL_CRYPTOGRAPHY_VERSION,
MINIMAL_PYOPENSSL_VERSION))
if backend == 'pyopenssl':
if not PYOPENSSL_FOUND:
module.fail_json(msg='The Python pyOpenSSL library is required')
private_key = PrivateKeyPyOpenSSL(module)
elif backend == 'cryptography':
if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg='The Python cryptography library is required')
private_key = PrivateKeyCryptography(module)
if private_key.state == 'present':