community.mysql/plugins/module_utils/implementations/mysql/user.py
Laurent Indermühle 47710cfb93
Enhance support of tls_requires in mysql_user and mysql_info (#628)
* fix option name

* Add tests for users using SSL

* Rewrite get_tls_requires using mysql.user table

* Add tls_requires to users_info filter

* add more consistant test users

* Add tls tests users in cleanup task

* Fix tls_requires data structure inconsistencies between modules

* Refactor user implementation to host get_tls_requires

* fix MySQL tls_requires not removed from user passed as empty

* Fix wrong variable used to return a hashed password

* Fix sanity

* fix unit tests

* Add changelog fragment

* Add PR URI to the changelog

* Add more precise change log

* fix documentation using wrong variable as an example

* Document example returned value `tls_requires` from users_info filter

* Revert changes that will be in a separate PR

* Fix sanity
2024-04-16 10:52:24 +02:00

78 lines
2.2 KiB
Python

# -*- coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible_collections.community.mysql.plugins.module_utils.version import LooseVersion
from ansible_collections.community.mysql.plugins.module_utils.mysql import get_server_version
import re
import shlex
def use_old_user_mgmt(cursor):
version = get_server_version(cursor)
return LooseVersion(version) < LooseVersion("5.7")
def supports_identified_by_password(cursor):
version = get_server_version(cursor)
return LooseVersion(version) < LooseVersion("8")
def server_supports_alter_user(cursor):
version = get_server_version(cursor)
return LooseVersion(version) >= LooseVersion("5.6")
def server_supports_password_expire(cursor):
version = get_server_version(cursor)
return LooseVersion(version) >= LooseVersion("5.7")
def get_tls_requires(cursor, user, host):
"""Get user TLS requirements.
We must use SHOW GRANTS because some tls fileds are encoded.
Args:
cursor (cursor): DB driver cursor object.
user (str): User name.
host (str): User host name.
Returns: Dictionary containing current TLS required
"""
if not use_old_user_mgmt(cursor):
query = "SHOW CREATE USER '%s'@'%s'" % (user, host)
else:
query = "SHOW GRANTS for '%s'@'%s'" % (user, host)
cursor.execute(query)
grants = cursor.fetchone()
# Mysql_info use a DictCursor so we must convert back to a list
# otherwise we get KeyError 0
if isinstance(grants, dict):
grants = list(grants.values())
grants_str = ''.join(grants)
pattern = r"(?<=\bREQUIRE\b)(.*?)(?=(?:\bPASSWORD\b|$))"
requires_match = re.search(pattern, grants_str)
requires = requires_match.group().strip() if requires_match else ""
if requires.startswith('NONE'):
return None
if requires.startswith('SSL'):
return {'SSL': None}
if requires.startswith('X509'):
return {'X509': None}
items = iter(shlex.split(requires))
requires = dict(zip(items, items))
return requires or None