Enable changed var with ufw check mode (#49948)

* Enable 'changed' var with ufw check mode

* Fix from comment of the PR + Unit Test

* Fix on ufw module after the second review

- delete rules change works in check mode
- simplify execute def & use it on every call process
- improved regexp
- rename vars defaults to current_default_values

* Add ignore error to execute() and use it in get_current_rules()

* Update after third code review (introduce change in changed status)

* Adjust tests and fix some problems (#1)

* 'active' also appears in 'inactive'.

* 'reject' is also a valid option here.

* For example for reloaded, changed will be set back to False here.

* Improve and adjust tests.

* Fix after merging integration test

* handle "disabled" on default routed

* Add /var/lib/ufw/.. rules files

* add unit test

* Fix pep8 formatting error

* Separate ipv6 and ipv4 rules process from checkmode

* fix non-ascii error on ci

* Some change after review

* Add unit test with sub network mask

* rename is_match function by is_starting

* add changelog fragment
This commit is contained in:
Jérôme BAROTIN 2019-02-11 12:05:35 +01:00 committed by John R Barker
parent 708f0b07ba
commit b99de25f32
5 changed files with 498 additions and 41 deletions

View file

@ -207,11 +207,37 @@ EXAMPLES = '''
'''
import re
from operator import itemgetter
from ansible.module_utils.basic import AnsibleModule
def compile_ipv4_regexp():
r = r"((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}"
r += r"(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])"
return re.compile(r)
def compile_ipv6_regexp():
"""
validation pattern provided by :
https://stackoverflow.com/questions/53497/regular-expression-that-matches-
valid-ipv6-addresses#answer-17871737
"""
r = r"(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:"
r += r"|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}"
r += r"(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4})"
r += r"{1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]"
r += r"{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]"
r += r"{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4})"
r += r"{0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]"
r += r"|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}"
r += r"[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}"
r += r"[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))"
return re.compile(r)
def main():
module = AnsibleModule(
argument_spec=dict(
@ -241,24 +267,62 @@ def main():
cmds = []
def execute(cmd):
ipv4_regexp = compile_ipv4_regexp()
ipv6_regexp = compile_ipv6_regexp()
def filter_line_that_not_start_with(pattern, content):
return ''.join([line for line in content.splitlines(True) if line.startswith(pattern)])
def filter_line_that_contains(pattern, content):
return [line for line in content.splitlines(True) if pattern in line]
def filter_line_that_not_contains(pattern, content):
return ''.join([line for line in content.splitlines(True) if not line.contains(pattern)])
def filter_line_that_match_func(match_func, content):
return ''.join([line for line in content.splitlines(True) if match_func(line) is not None])
def filter_line_that_contains_ipv4(content):
return filter_line_that_match_func(ipv4_regexp.search, content)
def filter_line_that_contains_ipv6(content):
return filter_line_that_match_func(ipv6_regexp.search, content)
def is_starting_by_ipv4(ip):
return ipv4_regexp.match(ip) is not None
def is_starting_by_ipv6(ip):
return ipv6_regexp.match(ip) is not None
def execute(cmd, ignore_error=False):
cmd = ' '.join(map(itemgetter(-1), filter(itemgetter(0), cmd)))
cmds.append(cmd)
(rc, out, err) = module.run_command(cmd)
(rc, out, err) = module.run_command(cmd, environ_update={"LANG": "C"})
if rc != 0:
module.fail_json(msg=err or out)
if rc != 0 and not ignore_error:
module.fail_json(msg=err or out, commands=cmds)
return out
def get_current_rules():
user_rules_files = ["/lib/ufw/user.rules",
"/lib/ufw/user6.rules",
"/etc/ufw/user.rules",
"/etc/ufw/user6.rules",
"/var/lib/ufw/user.rules",
"/var/lib/ufw/user6.rules"]
cmd = [[grep_bin], ["-h"], ["'^### tuple'"]]
cmd.extend([[f] for f in user_rules_files])
return execute(cmd, ignore_error=True)
def ufw_version():
"""
Returns the major and minor version of ufw installed on the system.
"""
rc, out, err = module.run_command("%s --version" % ufw_bin)
if rc != 0:
module.fail_json(
msg="Failed to get ufw version.", rc=rc, out=out, err=err
)
out = execute([[ufw_bin], ["--version"]])
lines = [x for x in out.split('\n') if x.strip() != '']
if len(lines) == 0:
@ -291,25 +355,65 @@ def main():
# Ensure ufw is available
ufw_bin = module.get_bin_path('ufw', True)
grep_bin = module.get_bin_path('grep', True)
# Save the pre state and rules in order to recognize changes
(_, pre_state, _) = module.run_command(ufw_bin + ' status verbose')
(_, pre_rules, _) = module.run_command("grep '^### tuple' /lib/ufw/user.rules /lib/ufw/user6.rules /etc/ufw/user.rules /etc/ufw/user6.rules")
pre_state = execute([[ufw_bin], ['status verbose']])
pre_rules = get_current_rules()
# Execute commands
changed = False
# Execute filter
for (command, value) in commands.items():
cmd = [[ufw_bin], [module.check_mode, '--dry-run']]
if command == 'state':
states = {'enabled': 'enable', 'disabled': 'disable',
'reloaded': 'reload', 'reset': 'reset'}
execute(cmd + [['-f'], [states[value]]])
if value in ['reloaded', 'reset']:
changed = True
if module.check_mode:
# "active" would also match "inactive", hence the space
ufw_enabled = pre_state.find(" active") != -1
if (value == 'disabled' and ufw_enabled) or (value == 'enabled' and not ufw_enabled):
changed = True
else:
execute(cmd + [['-f'], [states[value]]])
elif command == 'logging':
execute(cmd + [[command], [value]])
extract = re.search(r'Logging: (on|off) \(([a-z]+)\)', pre_state)
if extract:
current_level = extract.group(2)
current_on_off_value = extract.group(1)
if value != "off":
if value != "on" and (value != current_level or current_on_off_value == "off"):
changed = True
elif current_on_off_value != "off":
changed = True
else:
changed = True
if not module.check_mode:
execute(cmd + [[command], [value]])
elif command == 'default':
execute(cmd + [[command], [value], [params['direction']]])
if module.check_mode:
regexp = r'Default: (deny|allow|reject) \(incoming\), (deny|allow|reject) \(outgoing\), (deny|allow|reject|disabled) \(routed\)'
extract = re.search(regexp, pre_state)
if extract is not None:
current_default_values = {}
current_default_values["incoming"] = extract.group(1)
current_default_values["outgoing"] = extract.group(2)
current_default_values["routed"] = extract.group(3)
if current_default_values[params['direction']] != value:
changed = True
else:
changed = True
else:
execute(cmd + [[command], [value], [params['direction']]])
elif command == 'rule':
# Rules are constructed according to the long format
@ -336,14 +440,34 @@ def main():
if (ufw_major == 0 and ufw_minor >= 35) or ufw_major > 0:
cmd.append([params['comment'], "comment '%s'" % params['comment']])
execute(cmd)
rules_dry = execute(cmd)
if module.check_mode:
nb_skipping_line = len(filter_line_that_contains("Skipping", rules_dry))
if not (nb_skipping_line > 0 and nb_skipping_line == len(rules_dry.splitlines(True))):
rules_dry = filter_line_that_not_start_with("### tuple", rules_dry)
# ufw dry-run doesn't send all rules so have to compare ipv4 or ipv6 rules
if is_starting_by_ipv4(params['from_ip']) or is_starting_by_ipv4(params['to_ip']):
if filter_line_that_contains_ipv4(pre_rules) != filter_line_that_contains_ipv4(rules_dry):
changed = True
elif is_starting_by_ipv6(params['from_ip']) or is_starting_by_ipv6(params['to_ip']):
if filter_line_that_contains_ipv6(pre_rules) != filter_line_that_contains_ipv6(rules_dry):
changed = True
elif pre_rules != rules_dry:
changed = True
# Get the new state
(_, post_state, _) = module.run_command(ufw_bin + ' status verbose')
(_, post_rules, _) = module.run_command("grep '^### tuple' /lib/ufw/user.rules /lib/ufw/user6.rules /etc/ufw/user.rules /etc/ufw/user6.rules")
changed = (pre_state != post_state) or (pre_rules != post_rules)
return module.exit_json(changed=changed, commands=cmds, msg=post_state.rstrip())
if module.check_mode:
return module.exit_json(changed=changed, commands=cmds)
else:
post_state = execute([[ufw_bin], ['status'], ['verbose']])
if not changed:
post_rules = get_current_rules()
changed = (pre_state != post_state) or (pre_rules != post_rules)
return module.exit_json(changed=changed, commands=cmds, msg=post_state.rstrip())
if __name__ == '__main__':