initial commit for password_expire support

This commit is contained in:
Tomas 2023-11-22 10:32:57 +02:00
parent 8dfab12bae
commit 314ec02955
4 changed files with 123 additions and 6 deletions

View file

@ -151,12 +151,13 @@ def get_existing_authentication(cursor, user, host):
def user_add(cursor, user, host, host_all, password, encrypted,
plugin, plugin_hash_string, plugin_auth_string, new_priv,
tls_requires, check_mode, reuse_existing_password):
tls_requires, module, reuse_existing_password,
password_expire, password_expire_interval):
# we cannot create users without a proper hostname
if host_all:
return {'changed': False, 'password_changed': False}
if check_mode:
if module.check_mode:
return {'changed': True, 'password_changed': None}
# Determine what user management method server uses
@ -200,6 +201,12 @@ def user_add(cursor, user, host, host_all, password, encrypted,
query_with_args_and_tls_requires = query_with_args + (tls_requires,)
cursor.execute(*mogrify(*query_with_args_and_tls_requires))
if password_expire and impl.supports_identified_by_password(cursor):
set_password_expire(cursor, user, host, password_expire, password_expire_interval)
else:
module.fail_json(msg="The server version does not match the requirements "
"for password_expire parameter. See module's documentation.")
if new_priv is not None:
for db_table, priv in iteritems(new_priv):
privileges_grant(cursor, user, host, db_table, priv, tls_requires)
@ -218,7 +225,8 @@ def is_hash(password):
def user_mod(cursor, user, host, host_all, password, encrypted,
plugin, plugin_hash_string, plugin_auth_string, new_priv,
append_privs, subtract_privs, tls_requires, module, role=False, maria_role=False):
append_privs, subtract_privs, tls_requires, module,
password_expire, password_expire_interval, role=False, maria_role=False):
changed = False
msg = "User unchanged"
grant_option = False
@ -301,6 +309,27 @@ def user_mod(cursor, user, host, host_all, password, encrypted,
raise e
changed = True
# Handle password expiration
if bool(password_expire):
if impl.server_supports_password_expire(cursor):
update = False
mariadb_role = True if "mariadb" in str(impl.__name__) else False
current_password_policy = get_password_expiration_policy(cursor, user, host, maria_role=mariadb_role)
if not (current_password_policy == -1 and password_expire == "default" or
current_password_policy == 0 and password_expire == "never" or
current_password_policy == password_expire_interval):
update = True
if module.check_mode:
return {'changed': True, 'msg': msg, 'password_changed': password_changed}
set_password_expire(cursor, user, host, password_expire, password_expire_interval)
password_changed = True
changed = True
else:
module.fail_json(msg="The server version does not match the requirements "
"for password_expire parameter. See module's documentation.")
# Handle plugin authentication
if plugin and not role:
cursor.execute("SELECT plugin, authentication_string FROM mysql.user "
@ -924,6 +953,57 @@ def limit_resources(module, cursor, user, host, resource_limits, check_mode):
return True
def set_password_expire(cursor, user, host, password_expire, password_expire_interval):
"""Fuction to set passowrd expiration for user.
Args:
cursor (cursor): DB driver cursor object.
user (str): User name.
host (str): User hostname.
password_expire (str): Password expiration mode.
password_expire_days (int): Invterval of days password expires.
"""
if password_expire.lower() == "never":
statment = "PASSWORD EXPIRE NEVER"
elif password_expire.lower() == "default":
statment = "PASSWORD EXPIRE DEFAULT"
elif password_expire.lower() == "interval":
if password_expire_interval > 0:
statment = "PASSWORD EXPIRE INTERVAL %d DAY" % (password_expire_interval)
else:
# expire password now if days <=0
if isinstance(password_expire_interval, int):
statment = "PASSWORD EXPIRE"
query = "ALTER USER %s@%s %s" % (user, host, statment)
cursor.execute(query)
def get_password_expiration_policy(cursor, user, host, maria_role=False):
"""Function to get password policy for user.
Args:
cursor (cursor): DB driver cursor object.
user (str): User name.
host (str): User hostname.
maria_role (bool, optional): mariadb or mysql. Defaults to False.
Returns:
policy (int): Current users password policy.
"""
if not maria_role:
statment = "SELECT password_lifetime FROM mysql.user \
WHERE User = %s AND Host = %s", (user, host)
else:
statment = "SELECT JSON_EXTRACT(Priv, '$.password_lifetime') AS password_lifetime \
FROM mysql.global_priv \
WHERE User = %s AND Host = %s", (user, host)
cursor.execute(*statment)
policy = cursor.fetchone()[0]
if not policy:
policy = -1
return int(policy)
def get_impl(cursor):
global impl
cursor.execute("SELECT VERSION()")