write integration tests

This commit is contained in:
n-cc 2024-01-11 12:54:18 -06:00 committed by n-cc
parent 7012a277de
commit 9a8b53dd5a
3 changed files with 271 additions and 15 deletions

View file

@ -153,18 +153,16 @@ def get_existing_authentication(cursor, user, host):
def user_add(cursor, user, host, host_all, password, encrypted,
plugin, plugin_hash_string, plugin_auth_string, new_priv,
attributes, tls_requires, reuse_existing_password, module):
# we cannot create users without a proper hostname
if host_all:
return {'changed': False, 'password_changed': False, 'attributes': {}}
if module.check_mode:
return {'changed': True, 'password_changed': None, 'attributes': {}}
# If attributes are set, perform a sanity check to ensure server supports user attributes before creating user
if attributes and not get_attribute_support(cursor):
module.fail_json(msg="user attributes were specified but the mysql server does not support user attributes")
final_attributes = {}
# we cannot create users without a proper hostname
if host_all:
return {'changed': False, 'password_changed': False, 'attributes': attributes}
if module.check_mode:
return {'changed': True, 'password_changed': None, 'attributes': attributes}
# Determine what user management method server uses
old_user_mgmt = impl.use_old_user_mgmt(cursor)
@ -210,12 +208,15 @@ def user_add(cursor, user, host, host_all, password, encrypted,
if new_priv is not None:
for db_table, priv in iteritems(new_priv):
privileges_grant(cursor, user, host, db_table, priv, tls_requires)
if attributes is not None:
cursor.execute("ALTER USER %s@%s ATTRIBUTE %s", (user, host, json.dumps(attributes)))
final_attributes = attributes_get(cursor, user, host)
if tls_requires is not None:
privileges_grant(cursor, user, host, "*.*", get_grants(cursor, user, host), tls_requires)
final_attributes = {}
if attributes:
cursor.execute("ALTER USER %s@%s ATTRIBUTE %s", (user, host, json.dumps(attributes)))
final_attributes = attributes_get(cursor, user, host)
return {'changed': True, 'password_changed': not used_existing_password, 'attributes': final_attributes}
@ -426,6 +427,7 @@ def user_mod(cursor, user, host, host_all, password, encrypted,
# Handle attributes
attribute_support = get_attribute_support(cursor)
final_attributes = {}
if attributes:
if not attribute_support:
@ -441,14 +443,21 @@ def user_mod(cursor, user, host, host_all, password, encrypted,
if attributes_to_change:
msg = "Attributes updated: %s" % (", ".join(["%s: %s" % (key, value) for key, value in attributes_to_change.items()]))
# Calculate final attributes by re-running attributes_get when not in check mode, and merge dictionaries when in check mode
if not module.check_mode:
cursor.execute("ALTER USER %s@%s ATTRIBUTE %s", (user, host, json.dumps(attributes_to_change)))
final_attributes = attributes_get(cursor, user, host)
else:
# Final if statements excludes items whose values are None in attributes_to_change, i.e. attributes that will be deleted
final_attributes = {k: v for d in (current_attributes, attributes_to_change) for k, v in d.items() if k not in attributes_to_change or
attributes_to_change[k]}
changed = True
if attribute_support:
final_attributes = attributes_get(cursor, user, host)
else:
final_attributes = current_attributes
else:
final_attributes = {}
if attribute_support:
final_attributes = attributes_get(cursor, user, host)
if role:
continue