Allow multiple values per key in name fields in openssl_certificate/csr (#30338)

* allow multiple values per key in name fields in openssl_certificate

* check correct side of comparison

* trigger only on lists

* add subject parameter to openssl_csr

* fix key: value mapping not skipping None elements

* temporary fix for undefined "subject" field

* fix iteration over subject entries

* fix docs

* quote sample string

* allow csr with only subject defined

* fix integration test

* look up NIDs before comparing, add hidden _strict params

* deal with empty issuer/subject fields

* adapt integration tests

* also normalize output from pyopenssl

* fix issue with _sanitize_inputs

* don't convert empty lists

* workaround for pyopenssl limitations

* properly encode the input to the txt2nid function

* another to_bytes fix

* make subject, commonname and subjecAltName completely optional

* don't compare hashes of keys in openssl_csr integration tests

* add integration test for old API in openssl_csr

* compare keys directly in certificate and publickey integration tests

* fix typo
This commit is contained in:
MarkusTeufelberger 2017-12-12 13:35:22 +01:00 committed by John R Barker
commit 9ea1b18ff7
8 changed files with 141 additions and 63 deletions

View file

@ -109,11 +109,13 @@ options:
issuer:
description:
- Key/value pairs that must be present in the issuer name field of the certificate
- Key/value pairs that must be present in the issuer name field of the certificate.
If you need to specify more than one value with the same key, use a list as value.
subject:
description:
- Key/value pairs that must be present in the subject name field of the certificate
- Key/value pairs that must be present in the subject name field of the certificate.
If you need to specify more than one value with the same key, use a list as value.
has_expired:
default: False
@ -453,8 +455,16 @@ class AssertOnlyCertificate(Certificate):
def __init__(self, module):
super(AssertOnlyCertificate, self).__init__(module)
self.signature_algorithms = module.params['signature_algorithms']
self.subject = module.params['subject']
self.issuer = module.params['issuer']
if module.params['subject']:
self.subject = crypto_utils.parse_name_field(module.params['subject'])
else:
self.subject = []
self.subject_strict = False
if module.params['issuer']:
self.issuer = crypto_utils.parse_name_field(module.params['issuer'])
else:
self.issuer = []
self.issuer_strict = False
self.has_expired = module.params['has_expired']
self.version = module.params['version']
self.keyUsage = module.params['keyUsage']
@ -479,8 +489,11 @@ class AssertOnlyCertificate(Certificate):
'notAfter', 'valid_at', 'invalid_at']:
attr = getattr(self, param)
if isinstance(attr, list):
setattr(self, param, [to_bytes(item) for item in attr])
if isinstance(attr, list) and attr:
if isinstance(attr[0], str):
setattr(self, param, [to_bytes(item) for item in attr])
elif isinstance(attr[0], tuple):
setattr(self, param, [(to_bytes(item[0]), to_bytes(item[1])) for item in attr])
elif isinstance(attr, tuple):
setattr(self, param, dict((to_bytes(k), to_bytes(v)) for (k, v) in attr.items()))
elif isinstance(attr, dict):
@ -501,20 +514,26 @@ class AssertOnlyCertificate(Certificate):
def _validate_subject():
if self.subject:
expected_subject = [(OpenSSL._util.lib.OBJ_txt2nid(sub[0]), sub[1]) for sub in self.subject]
cert_subject = self.cert.get_subject().get_components()
diff = [item for item in self.subject.items() if item not in cert_subject]
if diff:
current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(sub[0]), sub[1]) for sub in cert_subject]
if (not self.subject_strict and not all(x in current_subject for x in expected_subject)) or \
(self.subject_strict and not set(expected_subject) == set(current_subject)):
diff = [item for item in self.subject if item not in current_subject]
self.message.append(
'Invalid subject component (got %s, expected all of %s to be present)' % (cert_subject, self.subject.items())
'Invalid subject component (got %s, expected all of %s to be present)' % (cert_subject, self.subject)
)
def _validate_issuer():
if self.issuer:
expected_issuer = [(OpenSSL._util.lib.OBJ_txt2nid(iss[0]), iss[1]) for iss in self.issuer]
cert_issuer = self.cert.get_issuer().get_components()
diff = [item for item in self.issuer.items() if item not in cert_issuer]
if diff:
current_issuer = [(OpenSSL._util.lib.OBJ_txt2nid(iss[0]), iss[1]) for iss in cert_issuer]
if (not self.issuer_strict and not all(x in current_issuer for x in expected_issuer)) or \
(self.issuer_strict and not set(expected_issuer) == set(current_issuer)):
diff = [item for item in self.issuer if item not in current_issuer]
self.message.append(
'Invalid issuer component (got %s, expected all of %s to be present)' % (cert_issuer, self.issuer.items())
'Invalid issuer component (got %s, expected all of %s to be present)' % (cert_issuer, self.issuer)
)
def _validate_has_expired():

View file

@ -22,8 +22,7 @@ short_description: Generate OpenSSL Certificate Signing Request (CSR)
description:
- "This module allows one to (re)generate OpenSSL certificate signing requests.
It uses the pyOpenSSL python library to interact with openssl. This module supports
the subjectAltName as well as the keyUsage and extendedKeyUsage extensions.
Note: At least one of common_name or subject_alt_name must be specified."
the subjectAltName as well as the keyUsage and extendedKeyUsage extensions."
requirements:
- "python-pyOpenSSL >= 0.15"
options:
@ -61,6 +60,12 @@ options:
required: true
description:
- Name of the folder in which the generated OpenSSL certificate signing request will be written
subject:
required: false
description:
- Key/value pairs that will be present in the subject name field of the certificate signing request.
- If you need to specify more than one value with the same key, use a list as value.
version_added: '2.5'
country_name:
required: false
aliases: [ 'C', 'countryName' ]
@ -214,10 +219,10 @@ filename:
type: string
sample: /etc/ssl/csr/www.ansible.com.csr
subject:
description: A dictionnary of the subject attached to the CSR
description: A list of the subject tuples attached to the CSR
returned: changed or success
type: list
sample: {'CN': 'www.ansible.com', 'O': 'Ansible'}
sample: "[('CN', 'www.ansible.com'), ('O', 'Ansible')]"
subjectAltName:
description: The alternative names this CSR is valid for
returned: changed or success
@ -283,20 +288,25 @@ class CertificateSigningRequest(crypto_utils.OpenSSLObject):
self.request = None
self.privatekey = None
self.subject = {
'C': module.params['countryName'],
'ST': module.params['stateOrProvinceName'],
'L': module.params['localityName'],
'O': module.params['organizationName'],
'OU': module.params['organizationalUnitName'],
'CN': module.params['commonName'],
'emailAddress': module.params['emailAddress'],
}
self.subject = [
('C', module.params['countryName']),
('ST', module.params['stateOrProvinceName']),
('L', module.params['localityName']),
('O', module.params['organizationName']),
('OU', module.params['organizationalUnitName']),
('CN', module.params['commonName']),
('emailAddress', module.params['emailAddress']),
]
if module.params['subject']:
self.subject = self.subject + crypto_utils.parse_name_field(module.params['subject'])
self.subject = [(entry[0], entry[1]) for entry in self.subject if entry[1]]
if not self.subjectAltName:
self.subjectAltName = ['DNS:%s' % self.subject['CN']]
self.subject = dict((k, v) for k, v in self.subject.items() if v)
for sub in self.subject:
if OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])) == 13: # 13 is the NID for "commonName"
self.subjectAltName = ['DNS:%s' % sub[1]]
break
def generate(self, module):
'''Generate the certificate signing request.'''
@ -305,12 +315,16 @@ class CertificateSigningRequest(crypto_utils.OpenSSLObject):
req = crypto.X509Req()
req.set_version(self.version - 1)
subject = req.get_subject()
for (key, value) in self.subject.items():
if value is not None:
setattr(subject, key, value)
for entry in self.subject:
if entry[1] is not None:
# Workaround for https://github.com/pyca/pyopenssl/issues/165
nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(entry[0]))
OpenSSL._util.lib.X509_NAME_add_entry_by_NID(subject._name, nid, OpenSSL._util.lib.MBSTRING_UTF8, to_bytes(entry[1]), -1, -1, 0)
altnames = ', '.join(self.subjectAltName)
extensions = [crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii'))]
extensions = []
if self.subjectAltName:
altnames = ', '.join(self.subjectAltName)
extensions.append(crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii')))
if self.keyUsage:
usages = ', '.join(self.keyUsage)
@ -324,7 +338,8 @@ class CertificateSigningRequest(crypto_utils.OpenSSLObject):
usages = ', '.join(self.basicConstraints)
extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii')))
req.add_extensions(extensions)
if extensions:
req.add_extensions(extensions)
req.set_pubkey(self.privatekey)
req.sign(self.privatekey, self.digest)
@ -350,10 +365,10 @@ class CertificateSigningRequest(crypto_utils.OpenSSLObject):
self.privatekey = crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase)
def _check_subject(csr):
subject = csr.get_subject()
for (key, value) in self.subject.items():
if getattr(subject, key, None) != value:
return False
subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in self.subject]
current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in csr.get_subject().get_components()]
if not set(subject) == set(current_subject):
return False
return True
@ -437,6 +452,7 @@ def main():
version=dict(default='1', type='int'),
force=dict(default=False, type='bool'),
path=dict(required=True, type='path'),
subject=dict(type='dict'),
countryName=dict(aliases=['C', 'country_name'], type='str'),
stateOrProvinceName=dict(aliases=['ST', 'state_or_province_name'], type='str'),
localityName=dict(aliases=['L', 'locality_name'], type='str'),
@ -455,7 +471,6 @@ def main():
),
add_file_common_args=True,
supports_check_mode=True,
required_one_of=[['commonName', 'subjectAltName']],
)
if not pyopenssl_found: