Enhance support of tls_requires in mysql_user and mysql_info (#628)

* fix option name

* Add tests for users using SSL

* Rewrite get_tls_requires using mysql.user table

* Add tls_requires to users_info filter

* add more consistant test users

* Add tls tests users in cleanup task

* Fix tls_requires data structure inconsistencies between modules

* Refactor user implementation to host get_tls_requires

* fix MySQL tls_requires not removed from user passed as empty

* Fix wrong variable used to return a hashed password

* Fix sanity

* fix unit tests

* Add changelog fragment

* Add PR URI to the changelog

* Add more precise change log

* fix documentation using wrong variable as an example

* Document example returned value `tls_requires` from users_info filter

* Revert changes that will be in a separate PR

* Fix sanity
This commit is contained in:
Laurent Indermühle 2024-04-16 10:52:24 +02:00 committed by GitHub
parent 0618ff6c41
commit 47710cfb93
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 213 additions and 55 deletions

View file

@ -29,3 +29,48 @@ def server_supports_password_expire(cursor):
version = get_server_version(cursor)
return LooseVersion(version) >= LooseVersion("10.4.3")
def get_tls_requires(cursor, user, host):
"""Get user TLS requirements.
Reads directly from mysql.user table allowing for a more
readable code.
Args:
cursor (cursor): DB driver cursor object.
user (str): User name.
host (str): User host name.
Returns: Dictionary containing current TLS required
"""
tls_requires = dict()
query = ('SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject '
'FROM mysql.user WHERE User = %s AND Host = %s')
cursor.execute(query, (user, host))
res = cursor.fetchone()
# Mysql_info use a DictCursor so we must convert back to a list
# otherwise we get KeyError 0
if isinstance(res, dict):
res = list(res.values())
# When user don't require SSL, res value is: ('', '', '', '')
if not any(res):
return None
if res[0] == 'ANY':
tls_requires['SSL'] = None
if res[0] == 'X509':
tls_requires['X509'] = None
if res[1]:
tls_requires['CIPHER'] = res[1]
if res[2]:
tls_requires['ISSUER'] = res[2]
if res[3]:
tls_requires['SUBJECT'] = res[3]
return tls_requires