From 5325d0cb02ea8496b84599cebf6b27fcfdf81f50 Mon Sep 17 00:00:00 2001 From: Laurent Indermuehle Date: Mon, 8 Apr 2024 17:23:20 +0200 Subject: [PATCH] Rewrite get_tls_requires using mysql.user table --- plugins/module_utils/user.py | 57 +++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/plugins/module_utils/user.py b/plugins/module_utils/user.py index f042c85..7833859 100644 --- a/plugins/module_utils/user.py +++ b/plugins/module_utils/user.py @@ -81,28 +81,45 @@ def do_not_mogrify_requires(query, params, tls_requires): def get_tls_requires(cursor, user, host): - if user: - if not impl.use_old_user_mgmt(cursor): - query = "SHOW CREATE USER '%s'@'%s'" % (user, host) - else: - query = "SHOW GRANTS for '%s'@'%s'" % (user, host) + """Get user TLS requirements. - cursor.execute(query) - require_list = [tuple[0] for tuple in filter(lambda x: "REQUIRE" in x[0], cursor.fetchall())] - require_line = require_list[0] if require_list else "" - pattern = r"(?<=\bREQUIRE\b)(.*?)(?=(?:\bPASSWORD\b|$))" - requires_match = re.search(pattern, require_line) - requires = requires_match.group().strip() if requires_match else "" - if any((requires.startswith(req) for req in ('SSL', 'X509', 'NONE'))): - requires = requires.split()[0] - if requires == 'NONE': - requires = None - else: - import shlex + Args: + cursor (cursor): DB driver cursor object. + user (str): User name. + host (str): User host name. - items = iter(shlex.split(requires)) - requires = dict(zip(items, items)) - return requires or None + Returns: Dictionary containing current TLS required + """ + tls_requires = dict() + + query = ('SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject ' + 'FROM mysql.user WHERE User = %s AND Host = %s') + cursor.execute(query, (user, host)) + res = cursor.fetchone() + + # Mysql_info use a DictCursor so we must convert back to a list + # otherwise we get KeyError 0 + if isinstance(res, dict): + res = list(res.values()) + + if not res: + return None + + if res[0] == 'ANY': + return {'SSL': ''} + + if res[0] == 'X509': + return {'X509': ''} + + if res[1]: + tls_requires['CIPHER'] = res[1] + + if res[2]: + tls_requires['ISSUER'] = res[2] + + if res[3]: + tls_requires['SUBJECT'] = res[3] + return tls_requires def get_grants(cursor, user, host):