community.general/tests/unit/plugins/modules/test_statsd.py
Felix Fontein 7df07f31a6
Some checks failed
EOL CI / EOL Sanity (Ⓐ2.13) (push) Has been cancelled
EOL CI / EOL Sanity (Ⓐ2.14) (push) Has been cancelled
EOL CI / EOL Sanity (Ⓐ2.15) (push) Has been cancelled
EOL CI / EOL Units (Ⓐ2.13+py2.7) (push) Has been cancelled
EOL CI / EOL Units (Ⓐ2.13+py3.8) (push) Has been cancelled
EOL CI / EOL Units (Ⓐ2.14+py3.9) (push) Has been cancelled
EOL CI / EOL Units (Ⓐ2.15+py3.10) (push) Has been cancelled
EOL CI / EOL Units (Ⓐ2.15+py3.5) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.13+alpine3+py:azp/posix/1/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.13+alpine3+py:azp/posix/2/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.13+alpine3+py:azp/posix/3/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.13+fedora35+py:azp/posix/1/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.13+fedora35+py:azp/posix/2/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.13+fedora35+py:azp/posix/3/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.13+opensuse15py2+py:azp/posix/1/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.13+opensuse15py2+py:azp/posix/2/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.13+opensuse15py2+py:azp/posix/3/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.14+alpine3+py:azp/posix/1/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.14+alpine3+py:azp/posix/2/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.14+alpine3+py:azp/posix/3/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.15+fedora37+py:azp/posix/1/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.15+fedora37+py:azp/posix/2/) (push) Has been cancelled
EOL CI / EOL I (Ⓐ2.15+fedora37+py:azp/posix/3/) (push) Has been cancelled
import-galaxy / Test to import built collection artifact with Galaxy importer (push) Has been cancelled
Verify REUSE / check (push) Has been cancelled
[stable-9] Unit tests: replace mock and compat with code from community.internal_test_tools (#9923)
* Unit tests: replace mock and compat with code from community.internal_test_tools (#9921)

* Replace compat with equivalent from community.internal_test_tools.

* Replace mock with equivalent from community.internal_test_tools.

* Remove ignore.txt entries.

* Add test that's no longer present in later versions.
2025-03-22 14:32:01 +01:00

101 lines
4.3 KiB
Python

# Copyright (c) Ansible project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible_collections.community.general.plugins.modules import statsd
from ansible_collections.community.internal_test_tools.tests.unit.compat.mock import patch, MagicMock
from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
class FakeStatsD(MagicMock):
def incr(self, *args, **kwargs):
pass
def gauge(self, *args, **kwargs):
pass
def close(self, *args, **kwargs):
pass
class TestStatsDModule(ModuleTestCase):
def setUp(self):
super(TestStatsDModule, self).setUp()
statsd.HAS_STATSD = True
self.module = statsd
def tearDown(self):
super(TestStatsDModule, self).tearDown()
def patch_udp_statsd_client(self, **kwargs):
return patch('ansible_collections.community.general.plugins.modules.statsd.udp_statsd_client', autospec=True, **kwargs)
def patch_tcp_statsd_client(self, **kwargs):
return patch('ansible_collections.community.general.plugins.modules.statsd.tcp_statsd_client', autospec=True, **kwargs)
def test_udp_without_parameters(self):
"""Test udp without parameters"""
with self.patch_udp_statsd_client(side_effect=FakeStatsD) as fake_statsd:
with self.assertRaises(AnsibleFailJson) as result:
with set_module_args({}):
self.module.main()
def test_tcp_without_parameters(self):
"""Test tcp without parameters"""
with self.patch_tcp_statsd_client(side_effect=FakeStatsD) as fake_statsd:
with self.assertRaises(AnsibleFailJson) as result:
with set_module_args({}):
self.module.main()
def test_udp_with_parameters(self):
"""Test udp with parameters"""
with self.patch_udp_statsd_client(side_effect=FakeStatsD) as fake_statsd:
with self.assertRaises(AnsibleExitJson) as result:
with set_module_args({
'metric': 'my_counter',
'metric_type': 'counter',
'value': 1,
}):
self.module.main()
self.assertEqual(result.exception.args[0]['msg'], 'Sent counter my_counter -> 1 to StatsD')
self.assertEqual(result.exception.args[0]['changed'], True)
with self.patch_udp_statsd_client(side_effect=FakeStatsD) as fake_statsd:
with self.assertRaises(AnsibleExitJson) as result:
with set_module_args({
'metric': 'my_gauge',
'metric_type': 'gauge',
'value': 3,
}):
self.module.main()
self.assertEqual(result.exception.args[0]['msg'], 'Sent gauge my_gauge -> 3 (delta=False) to StatsD')
self.assertEqual(result.exception.args[0]['changed'], True)
def test_tcp_with_parameters(self):
"""Test tcp with parameters"""
with self.patch_tcp_statsd_client(side_effect=FakeStatsD) as fake_statsd:
with self.assertRaises(AnsibleExitJson) as result:
with set_module_args({
'protocol': 'tcp',
'metric': 'my_counter',
'metric_type': 'counter',
'value': 1,
}):
self.module.main()
self.assertEqual(result.exception.args[0]['msg'], 'Sent counter my_counter -> 1 to StatsD')
self.assertEqual(result.exception.args[0]['changed'], True)
with self.patch_tcp_statsd_client(side_effect=FakeStatsD) as fake_statsd:
with self.assertRaises(AnsibleExitJson) as result:
with set_module_args({
'protocol': 'tcp',
'metric': 'my_gauge',
'metric_type': 'gauge',
'value': 3,
}):
self.module.main()
self.assertEqual(result.exception.args[0]['msg'], 'Sent gauge my_gauge -> 3 (delta=False) to StatsD')
self.assertEqual(result.exception.args[0]['changed'], True)