diff --git a/changelogs/fragments/107-mysql_user_fix_grant_on_col_handling.yml b/changelogs/fragments/107-mysql_user_fix_grant_on_col_handling.yml new file mode 100644 index 0000000..38dd297 --- /dev/null +++ b/changelogs/fragments/107-mysql_user_fix_grant_on_col_handling.yml @@ -0,0 +1,2 @@ +bugfixes: +- mysql_user - fix handling of INSERT, UPDATE, REFERENCES on columns (https://github.com/ansible-collections/community.mysql/issues/106). diff --git a/plugins/modules/mysql_user.py b/plugins/modules/mysql_user.py index 85d817c..c5a2011 100644 --- a/plugins/modules/mysql_user.py +++ b/plugins/modules/mysql_user.py @@ -769,15 +769,12 @@ def privileges_get(cursor, user, host): privileges = res.group(1).split(",") privileges = [pick(x.strip()) for x in privileges] - # Handle cases when there's GRANT SELECT (colA, ...) in privileges. + # Handle cases when there's privs like GRANT SELECT (colA, ...) in privs. # To this point, the privileges list can look like # ['SELECT (`A`', '`B`)', 'INSERT'] that is incorrect (SELECT statement is splitted). # Columns should also be sorted to compare it with desired privileges later. # Determine if there's a case similar to the above: - start, end = has_select_on_col(privileges) - # If not, either start and end will be None - if start is not None: - privileges = handle_select_on_col(privileges, start, end) + privileges = normalize_col_grants(privileges) if "WITH GRANT OPTION" in res.group(7): privileges.append('GRANT') @@ -788,7 +785,23 @@ def privileges_get(cursor, user, host): return output -def has_select_on_col(privileges): +def normalize_col_grants(privileges): + """Fix and sort grants on columns in privileges list + + Make ['SELECT (A, B)', 'INSERT (A, B)', 'DETELE'] + from ['SELECT (A', 'B)', 'INSERT (B', 'A)', 'DELETE']. + See unit tests in tests/unit/plugins/modules/test_mysql_user.py + """ + for grant in ('SELECT', 'UPDATE', 'INSERT', 'REFERENCES'): + start, end = has_grant_on_col(privileges, grant) + # If not, either start and end will be None + if start is not None: + privileges = handle_grant_on_col(privileges, start, end) + + return privileges + + +def has_grant_on_col(privileges, grant): """Check if there is a statement like SELECT (colA, colB) in the privilege list. @@ -799,7 +812,7 @@ def has_select_on_col(privileges): start = None end = None for n, priv in enumerate(privileges): - if 'SELECT (' in priv: + if '%s (' % grant in priv: # We found the start element start = n @@ -819,8 +832,8 @@ def has_select_on_col(privileges): return None, None -def handle_select_on_col(privileges, start, end): - """Handle cases when the SELECT (colA, ...) is in the privileges list.""" +def handle_grant_on_col(privileges, start, end): + """Handle cases when the privs like SELECT (colA, ...) is in the privileges list.""" # When the privileges list look like ['SELECT (colA,', 'colB)'] # (Notice that the statement is splitted) if start != end: @@ -844,7 +857,7 @@ def handle_select_on_col(privileges, start, end): def sort_column_order(statement): - """Sort column order in SELECT (colA, colB, ...) grants. + """Sort column order in grants like SELECT (colA, colB, ...). MySQL changes columns order like below: --------------------------------------- @@ -870,8 +883,10 @@ def sort_column_order(statement): # 3. Sort # 4. Put between () and return - # "SELECT (colA, colB) => "colA, colB" - columns = statement.split('(')[1].rstrip(')') + # "SELECT/UPDATE/.. (colA, colB) => "colA, colB" + tmp = statement.split('(') + priv_name = tmp[0] + columns = tmp[1].rstrip(')') # "colA, colB" => ["colA", "colB"] columns = columns.split(',') @@ -881,7 +896,7 @@ def sort_column_order(statement): columns[i] = col.strip('`') columns.sort() - return 'SELECT (%s)' % ', '.join(columns) + return '%s(%s)' % (priv_name, ', '.join(columns)) def privileges_unpack(priv, mode): @@ -927,10 +942,8 @@ def privileges_unpack(priv, mode): output[pieces[0]] = pieces[1].upper().split(',') privs = output[pieces[0]] - # Handle cases when there's GRANT SELECT (colA, ...) in privs. - start, end = has_select_on_col(output[pieces[0]]) - if start is not None: - output[pieces[0]] = handle_select_on_col(output[pieces[0]], start, end) + # Handle cases when there's privs like GRANT SELECT (colA, ...) in privs. + output[pieces[0]] = normalize_col_grants(output[pieces[0]]) new_privs = frozenset(privs) if not new_privs.issubset(VALID_PRIVS): diff --git a/tests/integration/targets/test_mysql_user/tasks/test_priv_dict.yml b/tests/integration/targets/test_mysql_user/tasks/test_priv_dict.yml index 938bf07..d54c946 100644 --- a/tests/integration/targets/test_mysql_user/tasks/test_priv_dict.yml +++ b/tests/integration/targets/test_mysql_user/tasks/test_priv_dict.yml @@ -93,6 +93,46 @@ that: - result is not changed + - name: Grant privs on columns + mysql_user: + <<: *mysql_params + name: '{{ user_name_3 }}' + priv: + 'data3.test_table_issue99': 'SELECT (a, b), INSERT (a, b), UPDATE' + register: result + + - assert: + that: + - result is changed + + - name: Grant same privs on columns again, note that the column order is different + mysql_user: + <<: *mysql_params + name: '{{ user_name_3 }}' + priv: + 'data3.test_table_issue99': 'SELECT (a, b), UPDATE, INSERT (b, a)' + register: result + + - assert: + that: + - result is not changed + + - name: Run command to show privileges for user (expect privileges in stdout) + command: "{{ mysql_command }} -e \"SHOW GRANTS FOR '{{ user_name_3 }}'@'localhost'\"" + register: result + + - name: Assert user has giving privileges + assert: + that: + - "'GRANT SELECT (`A`, `B`), INSERT (`A`, `B`), UPDATE' in result.stdout" + when: "'(`A`, `B`)' in result.stdout" + + - name: Assert user has giving privileges + assert: + that: + - "'GRANT SELECT (A, B), INSERT (A, B), UPDATE' in result.stdout" + when: "'(`A`, `B`)' not in result.stdout" + ########## # Clean up - name: Drop test databases diff --git a/tests/unit/plugins/modules/test_mysql_user.py b/tests/unit/plugins/modules/test_mysql_user.py index ee7c9d7..2ab9267 100644 --- a/tests/unit/plugins/modules/test_mysql_user.py +++ b/tests/unit/plugins/modules/test_mysql_user.py @@ -6,8 +6,9 @@ __metaclass__ = type import pytest from ansible_collections.community.mysql.plugins.modules.mysql_user import ( - handle_select_on_col, - has_select_on_col, + handle_grant_on_col, + has_grant_on_col, + normalize_col_grants, sort_column_order, supports_identified_by_password, ) @@ -41,22 +42,28 @@ def test_supports_identified_by_password(function_return, cursor_output, cursor_ @pytest.mark.parametrize( - 'input_list,output_tuple', + 'input_list,grant,output_tuple', [ - (['INSERT', 'DELETE'], (None, None)), - (['SELECT', 'UPDATE'], (None, None)), - (['INSERT', 'UPDATE', 'INSERT', 'DELETE'], (None, None)), - (['just', 'a', 'random', 'text'], (None, None)), - (['SELECT (A, B, C)'], (0, 0)), - (['UPDATE', 'SELECT (A, B, C)'], (1, 1)), - (['INSERT', 'SELECT (A', 'B)'], (1, 2)), - (['SELECT (A', 'B)', 'UPDATE'], (0, 1)), - (['INSERT', 'SELECT (A', 'B)', 'UPDATE'], (1, 2)), + (['INSERT', 'DELETE'], 'INSERT', (None, None)), + (['SELECT', 'UPDATE'], 'SELECT', (None, None)), + (['INSERT', 'UPDATE', 'INSERT', 'DELETE'], 'DELETE', (None, None)), + (['just', 'a', 'random', 'text'], 'blabla', (None, None)), + (['SELECT (A, B, C)'], 'SELECT', (0, 0)), + (['UPDATE', 'SELECT (A, B, C)'], 'SELECT', (1, 1)), + (['UPDATE', 'REFERENCES (A, B, C)'], 'REFERENCES', (1, 1)), + (['SELECT', 'UPDATE (A, B, C)'], 'UPDATE', (1, 1)), + (['INSERT', 'SELECT (A', 'B)'], 'SELECT', (1, 2)), + (['SELECT (A', 'B)', 'UPDATE'], 'SELECT', (0, 1)), + (['INSERT', 'SELECT (A', 'B)', 'UPDATE'], 'SELECT', (1, 2)), + (['INSERT (A, B)', 'SELECT (A', 'B)', 'UPDATE'], 'INSERT', (0, 0)), + (['INSERT (A', 'B)', 'SELECT (A', 'B)', 'UPDATE'], 'INSERT', (0, 1)), + (['INSERT (A', 'B)', 'SELECT (A', 'B)', 'UPDATE'], 'SELECT', (2, 3)), + (['INSERT (A', 'B)', 'SELECT (A', 'C', 'B)', 'UPDATE'], 'SELECT', (2, 4)), ] ) -def test_has_select_on_col(input_list, output_tuple): - """Tests has_select_on_col function.""" - assert has_select_on_col(input_list) == output_tuple +def test_has_grant_on_col(input_list, grant, output_tuple): + """Tests has_grant_on_col function.""" + assert has_grant_on_col(input_list, grant) == output_tuple @pytest.mark.parametrize( @@ -64,9 +71,9 @@ def test_has_select_on_col(input_list, output_tuple): [ ('SELECT (A)', 'SELECT (A)'), ('SELECT (`A`)', 'SELECT (A)'), - ('SELECT (A, B)', 'SELECT (A, B)'), - ('SELECT (`A`, `B`)', 'SELECT (A, B)'), - ('SELECT (B, A)', 'SELECT (A, B)'), + ('UPDATE (B, A)', 'UPDATE (A, B)'), + ('INSERT (`A`, `B`)', 'INSERT (A, B)'), + ('REFERENCES (B, A)', 'REFERENCES (A, B)'), ('SELECT (`B`, `A`)', 'SELECT (A, B)'), ('SELECT (`B`, `A`, C)', 'SELECT (A, B, C)'), ] @@ -81,10 +88,35 @@ def test_sort_column_order(input_, output): [ (['UPDATE', 'SELECT (C, B, A)'], 1, 1, ['UPDATE', 'SELECT (A, B, C)']), (['INSERT', 'SELECT (A', 'B)'], 1, 2, ['INSERT', 'SELECT (A, B)']), - (['SELECT (`A`', 'B)', 'UPDATE'], 0, 1, ['SELECT (A, B)', 'UPDATE']), - (['INSERT', 'SELECT (`B`', 'A', 'C)', 'UPDATE'], 1, 3, ['INSERT', 'SELECT (A, B, C)', 'UPDATE']), + ( + ['SELECT (`A`', 'B)', 'UPDATE', 'REFERENCES (B, A)'], 0, 1, + ['SELECT (A, B)', 'UPDATE', 'REFERENCES (B, A)']), + ( + ['INSERT', 'REFERENCES (`B`', 'A', 'C)', 'UPDATE (A', 'B)'], 1, 3, + ['INSERT', 'REFERENCES (A, B, C)', 'UPDATE (A', 'B)']), ] ) -def test_handle_select_on_col(privileges, start, end, output): - """Tests handle_select_on_col function.""" - assert handle_select_on_col(privileges, start, end) == output +def test_handle_grant_on_col(privileges, start, end, output): + """Tests handle_grant_on_col function.""" + assert handle_grant_on_col(privileges, start, end) == output + + +@pytest.mark.parametrize( + 'input_,expected', + [ + (['SELECT'], ['SELECT']), + (['SELECT (A, B)'], ['SELECT (A, B)']), + (['SELECT (B, A)'], ['SELECT (A, B)']), + (['UPDATE', 'SELECT (C, B, A)'], ['UPDATE', 'SELECT (A, B, C)']), + (['INSERT', 'SELECT (A', 'B)'], ['INSERT', 'SELECT (A, B)']), + ( + ['SELECT (`A`', 'B)', 'UPDATE', 'REFERENCES (B, A)'], + ['SELECT (A, B)', 'UPDATE', 'REFERENCES (A, B)']), + ( + ['INSERT', 'REFERENCES (`B`', 'A', 'C)', 'UPDATE (B', 'A)', 'DELETE'], + ['INSERT', 'REFERENCES (A, B, C)', 'UPDATE (A, B)', 'DELETE']), + ] +) +def test_normalize_col_grants(input_, expected): + """Tests normalize_col_grants function.""" + assert normalize_col_grants(input_) == expected