[PR #10642/f6e1d908 backport][stable-11] parted: command args as list rather than string (#10774)

parted: command args as list rather than string (#10642)

* parted: command args as list rather than string

* add changelog frag

* add missing command line dash args

* make scripts as lists as well

* Apply suggestions from code review



---------


(cherry picked from commit f6e1d90870)

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
patchback[bot] 2025-08-31 12:11:16 +02:00 committed by GitHub
commit 144945894f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 41 additions and 46 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- parted - using safer mechanism to run external command (https://github.com/ansible-collections/community.general/pull/10642).

View file

@ -552,9 +552,9 @@ def parted(script, device, align):
""" """
global module, parted_exec # pylint: disable=global-variable-not-assigned global module, parted_exec # pylint: disable=global-variable-not-assigned
align_option = '-a %s' % align align_option = ['-a', align]
if align == 'undefined': if align == 'undefined':
align_option = '' align_option = []
""" """
Use option --fix (-f) if available. Versions prior Use option --fix (-f) if available. Versions prior
@ -562,13 +562,12 @@ def parted(script, device, align):
http://savannah.gnu.org/news/?id=10114 http://savannah.gnu.org/news/?id=10114
""" """
if parted_version() >= (3, 4, 64): if parted_version() >= (3, 4, 64):
script_option = '-s -f' script_option = ['-s', '-f']
else: else:
script_option = '-s' script_option = ['-s']
if script and not module.check_mode: if script and not module.check_mode:
# TODO: convert run_comand() argument to list! command = [parted_exec] + script_option + ['-m'] + align_option + [device, '--'] + script
command = "%s %s -m %s %s -- %s" % (parted_exec, script_option, align_option, device, script)
rc, out, err = module.run_command(command) rc, out, err = module.run_command(command)
if rc != 0: if rc != 0:
@ -594,10 +593,7 @@ def part_exists(partitions, attribute, number):
Looks if a partition that has a specific value for a specific attribute Looks if a partition that has a specific value for a specific attribute
actually exists. actually exists.
""" """
return any( return any(part.get(attribute) == number for part in partitions)
part[attribute] and
part[attribute] == number for part in partitions
)
def check_size_format(size_str): def check_size_format(size_str):
@ -612,8 +608,8 @@ def main():
global module, units_si, units_iec, parted_exec # pylint: disable=global-variable-not-assigned global module, units_si, units_iec, parted_exec # pylint: disable=global-variable-not-assigned
changed = False changed = False
output_script = "" output_script = []
script = "" script = []
module = AnsibleModule( module = AnsibleModule(
argument_spec=dict( argument_spec=dict(
device=dict(type='str', required=True), device=dict(type='str', required=True),
@ -695,20 +691,19 @@ def main():
# Assign label if required # Assign label if required
mklabel_needed = current_device['generic'].get('table', None) != label mklabel_needed = current_device['generic'].get('table', None) != label
if mklabel_needed: if mklabel_needed:
script += "mklabel %s " % label script += ["mklabel", label]
# Create partition if required # Create partition if required
if part_type and (mklabel_needed or not part_exists(current_parts, 'num', number)): if part_type and (mklabel_needed or not part_exists(current_parts, 'num', number)):
script += "mkpart %s %s%s %s " % ( script += ["mkpart"]
part_type, script += [part_type]
'%s ' % fs_type if fs_type is not None else '', if fs_type is not None:
part_start, script += [fs_type]
part_end script += [part_start, part_end]
)
# Set the unit of the run # Set the unit of the run
if unit and script: if unit and script:
script = "unit %s %s" % (unit, script) script = ["unit", unit] + script
# If partition exists, try to resize # If partition exists, try to resize
if resize and part_exists(current_parts, 'num', number): if resize and part_exists(current_parts, 'num', number):
@ -724,10 +719,7 @@ def main():
desired_part_end = convert_to_bytes(size, parsed_unit) desired_part_end = convert_to_bytes(size, parsed_unit)
if current_part_end != desired_part_end: if current_part_end != desired_part_end:
script += "resizepart %s %s " % ( script += ["resizepart", str(number), part_end]
number,
part_end
)
# Execute the script and update the data structure. # Execute the script and update the data structure.
# This will create the partition for the next steps # This will create the partition for the next steps
@ -735,7 +727,7 @@ def main():
output_script += script output_script += script
parted(script, device, align) parted(script, device, align)
changed = True changed = True
script = "" script = []
if not module.check_mode: if not module.check_mode:
current_parts = get_device_info(device, unit)['partitions'] current_parts = get_device_info(device, unit)['partitions']
@ -748,10 +740,8 @@ def main():
# Assign name to the partition # Assign name to the partition
if name is not None and partition.get('name', None) != name: if name is not None and partition.get('name', None) != name:
# Wrap double quotes in single quotes so the shell doesn't strip # The double quotes need to be included in the arg passed to parted
# the double quotes as those need to be included in the arg script += ['name', str(number), '"%s"' % name]
# passed to parted
script += 'name %s \'"%s"\' ' % (number, name)
# Manage flags # Manage flags
if flags: if flags:
@ -765,14 +755,14 @@ def main():
flags_on = list(set(flags) - set(partition['flags'])) flags_on = list(set(flags) - set(partition['flags']))
for f in flags_on: for f in flags_on:
script += "set %s %s on " % (number, f) script += ["set", str(number), f, "on"]
for f in flags_off: for f in flags_off:
script += "set %s %s off " % (number, f) script += ["set", str(number), f, "off"]
# Set the unit of the run # Set the unit of the run
if unit and script: if unit and script:
script = "unit %s %s" % (unit, script) script = ["unit", unit] + script
# Execute the script # Execute the script
if script: if script:
@ -783,21 +773,20 @@ def main():
elif state == 'absent': elif state == 'absent':
# Remove the partition # Remove the partition
if part_exists(current_parts, 'num', number) or module.check_mode: if part_exists(current_parts, 'num', number) or module.check_mode:
script = "rm %s " % number script = ["rm", str(number)]
output_script += script output_script += script
changed = True changed = True
parted(script, device, align) parted(script, device, align)
elif state == 'info': elif state == 'info':
output_script = "unit '%s' print " % unit output_script = ["unit", unit, "print"]
# Final status of the device # Final status of the device
final_device_status = get_device_info(device, unit) final_device_status = get_device_info(device, unit)
module.exit_json( module.exit_json(
changed=changed, changed=changed,
disk=final_device_status['generic'], disk=final_device_status['generic'],
partitions=final_device_status['partitions'], partitions=final_device_status['partitions'],
script=output_script.strip() script=output_script
) )

View file

@ -202,7 +202,7 @@ class TestParted(ModuleTestCase):
'state': 'present', 'state': 'present',
}): }):
with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1): with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1):
self.execute_module(changed=True, script='unit KiB mkpart primary 0% 100%') self.execute_module(changed=True, script=['unit', 'KiB', 'mkpart', 'primary', '0%', '100%'])
def test_create_new_partition_1G(self): def test_create_new_partition_1G(self):
with set_module_args({ with set_module_args({
@ -212,7 +212,7 @@ class TestParted(ModuleTestCase):
'part_end': '1GiB', 'part_end': '1GiB',
}): }):
with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1): with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1):
self.execute_module(changed=True, script='unit KiB mkpart primary 0% 1GiB') self.execute_module(changed=True, script=['unit', 'KiB', 'mkpart', 'primary', '0%', '1GiB'])
def test_create_new_partition_minus_1G(self): def test_create_new_partition_minus_1G(self):
with set_module_args({ with set_module_args({
@ -223,7 +223,7 @@ class TestParted(ModuleTestCase):
'part_start': '-1GiB', 'part_start': '-1GiB',
}): }):
with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1): with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1):
self.execute_module(changed=True, script='unit KiB mkpart primary ext2 -1GiB 100%') self.execute_module(changed=True, script=['unit', 'KiB', 'mkpart', 'primary', 'ext2', '-1GiB', '100%'])
def test_remove_partition_number_1(self): def test_remove_partition_number_1(self):
with set_module_args({ with set_module_args({
@ -232,7 +232,7 @@ class TestParted(ModuleTestCase):
'state': 'absent', 'state': 'absent',
}): }):
with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1): with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1):
self.execute_module(changed=True, script='rm 1') self.execute_module(changed=True, script=['rm', '1'])
def test_resize_partition(self): def test_resize_partition(self):
with set_module_args({ with set_module_args({
@ -243,7 +243,7 @@ class TestParted(ModuleTestCase):
'resize': True 'resize': True
}): }):
with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1): with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1):
self.execute_module(changed=True, script='resizepart 3 100%') self.execute_module(changed=True, script=['resizepart', '3', '100%'])
def test_change_flag(self): def test_change_flag(self):
# Flags are set in a second run of parted(). # Flags are set in a second run of parted().
@ -264,9 +264,9 @@ class TestParted(ModuleTestCase):
# When using multiple flags: # When using multiple flags:
# order of execution is non deterministic, because set() operations are used in # order of execution is non deterministic, because set() operations are used in
# the current implementation. # the current implementation.
expected_calls_order1 = [call('unit KiB set 3 lvm on set 3 boot on ', expected_calls_order1 = [call(['unit', 'KiB', 'set', '3', 'lvm', 'on', 'set', '3', 'boot', 'on'],
'/dev/sdb', 'optimal')] '/dev/sdb', 'optimal')]
expected_calls_order2 = [call('unit KiB set 3 boot on set 3 lvm on ', expected_calls_order2 = [call(['unit', 'KiB', 'set', '3', 'boot', 'on', 'set', '3', 'lvm', 'on'],
'/dev/sdb', 'optimal')] '/dev/sdb', 'optimal')]
self.assertTrue(self.parted.mock_calls == expected_calls_order1 or self.assertTrue(self.parted.mock_calls == expected_calls_order1 or
self.parted.mock_calls == expected_calls_order2) self.parted.mock_calls == expected_calls_order2)
@ -283,7 +283,8 @@ class TestParted(ModuleTestCase):
'_ansible_check_mode': True, '_ansible_check_mode': True,
}): }):
with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1): with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1):
self.execute_module(changed=True, script='unit KiB mkpart primary ext3 257GiB 100% unit KiB set 4 boot on') self.execute_module(changed=True,
script=['unit', 'KiB', 'mkpart', 'primary', 'ext3', '257GiB', '100%', 'unit', 'KiB', 'set', '4', 'boot', 'on'])
def test_create_label_gpt(self): def test_create_label_gpt(self):
# Like previous test, current implementation use parted to create the partition and # Like previous test, current implementation use parted to create the partition and
@ -299,7 +300,10 @@ class TestParted(ModuleTestCase):
'_ansible_check_mode': True, '_ansible_check_mode': True,
}): }):
with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict2): with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict2):
self.execute_module(changed=True, script='unit KiB mklabel gpt mkpart primary 0% 100% unit KiB name 1 \'"lvmpartition"\' set 1 lvm on') self.execute_module(
changed=True,
script=['unit', 'KiB', 'mklabel', 'gpt', 'mkpart', 'primary', '0%', '100%',
'unit', 'KiB', 'name', '1', '"lvmpartition"', 'set', '1', 'lvm', 'on'])
def test_change_label_gpt(self): def test_change_label_gpt(self):
# When partitions already exists and label is changed, mkpart should be called even when partition already exists, # When partitions already exists and label is changed, mkpart should be called even when partition already exists,
@ -312,7 +316,7 @@ class TestParted(ModuleTestCase):
'_ansible_check_mode': True, '_ansible_check_mode': True,
}): }):
with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1): with patch('ansible_collections.community.general.plugins.modules.parted.get_device_info', return_value=parted_dict1):
self.execute_module(changed=True, script='unit KiB mklabel gpt mkpart primary 0% 100%') self.execute_module(changed=True, script=['unit', 'KiB', 'mklabel', 'gpt', 'mkpart', 'primary', '0%', '100%'])
def test_check_mode_unchanged(self): def test_check_mode_unchanged(self):
# Test that get_device_info result is checked in check mode too # Test that get_device_info result is checked in check mode too