Rewrite get_tls_requires using mysql.user table

This commit is contained in:
Laurent Indermuehle 2024-04-08 17:23:20 +02:00
parent 74903feada
commit 5325d0cb02
No known key found for this signature in database
GPG key ID: 93FA944C9F34DD09

View file

@ -81,28 +81,45 @@ def do_not_mogrify_requires(query, params, tls_requires):
def get_tls_requires(cursor, user, host): def get_tls_requires(cursor, user, host):
if user: """Get user TLS requirements.
if not impl.use_old_user_mgmt(cursor):
query = "SHOW CREATE USER '%s'@'%s'" % (user, host)
else:
query = "SHOW GRANTS for '%s'@'%s'" % (user, host)
cursor.execute(query) Args:
require_list = [tuple[0] for tuple in filter(lambda x: "REQUIRE" in x[0], cursor.fetchall())] cursor (cursor): DB driver cursor object.
require_line = require_list[0] if require_list else "" user (str): User name.
pattern = r"(?<=\bREQUIRE\b)(.*?)(?=(?:\bPASSWORD\b|$))" host (str): User host name.
requires_match = re.search(pattern, require_line)
requires = requires_match.group().strip() if requires_match else ""
if any((requires.startswith(req) for req in ('SSL', 'X509', 'NONE'))):
requires = requires.split()[0]
if requires == 'NONE':
requires = None
else:
import shlex
items = iter(shlex.split(requires)) Returns: Dictionary containing current TLS required
requires = dict(zip(items, items)) """
return requires or None tls_requires = dict()
query = ('SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject '
'FROM mysql.user WHERE User = %s AND Host = %s')
cursor.execute(query, (user, host))
res = cursor.fetchone()
# Mysql_info use a DictCursor so we must convert back to a list
# otherwise we get KeyError 0
if isinstance(res, dict):
res = list(res.values())
if not res:
return None
if res[0] == 'ANY':
return {'SSL': ''}
if res[0] == 'X509':
return {'X509': ''}
if res[1]:
tls_requires['CIPHER'] = res[1]
if res[2]:
tls_requires['ISSUER'] = res[2]
if res[3]:
tls_requires['SUBJECT'] = res[3]
return tls_requires
def get_grants(cursor, user, host): def get_grants(cursor, user, host):