Add test to verify TLS requirements are removed (#26)

* Add test to verify TLS requirements are removed

* Fix cursor parsing

* small fixes

* Refactor TLS tests into their own file

* Fix TLS requirements parsing
This commit is contained in:
Jorge Rodriguez (A.K.A. Tiriel) 2020-09-21 07:39:08 +03:00 committed by GitHub
parent b3a22390f0
commit 6e040e1496
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 213 additions and 164 deletions

View file

@ -210,6 +210,11 @@ EXAMPLES = r'''
subject: '/CN=alice/O=MyDom, Inc./C=US/ST=Oregon/L=Portland'
cipher: 'ECDHE-ECDSA-AES256-SHA384'
- name: Modify user to no longer require SSL.
community.mysql.mysql_user:
name: bob
tls_requires:
- name: Ensure no user named 'sally'@'localhost' exists, also passing in the auth credentials.
community.mysql.mysql_user:
login_user: root
@ -424,12 +429,16 @@ def get_tls_requires(cursor, user, host):
query = "SHOW GRANTS for '%s'@'%s'" % (user, host)
cursor.execute(query)
require_list = list(filter(lambda x: "REQUIRE" in x, cursor.fetchall()))
require_list = [tuple[0] for tuple in filter(lambda x: "REQUIRE" in x[0], cursor.fetchall())]
require_line = require_list[0] if require_list else ""
pattern = r"(?<=\bREQUIRE\b)(.*?)(?=(?:\bPASSWORD\b|$))"
requires_match = re.search(pattern, require_line)
requires = requires_match.group().strip() if requires_match else ""
if len(requires.split()) > 1:
if any((requires.startswith(req) for req in ('SSL', 'X509', 'NONE'))):
requires = requires.split()[0]
if requires == 'NONE':
requires = None
else:
import shlex
items = iter(shlex.split(requires))
@ -514,6 +523,9 @@ def user_mod(cursor, user, host, host_all, password, encrypted,
msg = "User unchanged"
grant_option = False
# Determine what user management method server uses
old_user_mgmt = use_old_user_mgmt(cursor)
if host_all:
hostnames = user_get_hostnames(cursor, [user])
else:
@ -522,8 +534,6 @@ def user_mod(cursor, user, host, host_all, password, encrypted,
for host in hostnames:
# Handle clear text and hashed passwords.
if bool(password):
# Determine what user management method server uses
old_user_mgmt = use_old_user_mgmt(cursor)
# Get a list of valid columns in mysql.user table to check if Password and/or authentication_string exist
cursor.execute("""
@ -675,7 +685,7 @@ def user_mod(cursor, user, host, host_all, password, encrypted,
query = " ".join((pre_query, "%s@%s"))
cursor.execute(*mogrify_requires(query, (user, host), tls_requires))
else:
query = " ".join(pre_query, "%s@%s REQUIRE NONE")
query = " ".join((pre_query, "%s@%s REQUIRE NONE"))
cursor.execute(query, (user, host))
changed = True