mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-07-22 04:40:22 -07:00
refactoring async
- centralized skipping - also fixed module name broken by previous refactor - let action modules handle async processing - moved async into base action class's module exec - action plugins can now run final action as async - actually skip copy if base skips - fixed normal for new paths - ensure internal stat is never async - default poll to 10 as per docs - added hint for callback fix on poll - restructured late tmp, now a pipeline query - moving action handler to connection as networking does - fixed network assumption invocation is always passed - centralized key cleanup, normalized internal var - _supress_tmpdir_delete now in _ansible_xxx and gets removed from results - delay internal key removal till after we use em - nicer tmp removing, using existing methods - moved cleanup tmp flag to mking tmp func
This commit is contained in:
parent
22db51f15c
commit
c86a17b7a0
22 changed files with 254 additions and 371 deletions
|
@ -67,9 +67,10 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
self._shared_loader_obj = shared_loader_obj
|
||||
# Backwards compat: self._display isn't really needed, just import the global display and use that.
|
||||
self._display = display
|
||||
|
||||
self._cleanup_remote_tmp = False
|
||||
|
||||
self._supports_check_mode = True
|
||||
self._supports_async = False
|
||||
|
||||
@abstractmethod
|
||||
def run(self, tmp=None, task_vars=None):
|
||||
|
@ -88,14 +89,20 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
|
||||
* Module parameters. These are stored in self._task.args
|
||||
"""
|
||||
# store the module invocation details into the results
|
||||
results = {}
|
||||
if self._task.async == 0:
|
||||
results['invocation'] = dict(
|
||||
module_name = self._task.action,
|
||||
module_args = self._task.args,
|
||||
)
|
||||
return results
|
||||
|
||||
result = {'skipped': True}
|
||||
|
||||
if self._task.async and not self._supports_async:
|
||||
result['msg'] = 'async is not supported for this task.'
|
||||
elif self._play_context.check_mode and not self._supports_check_mode:
|
||||
result['msg'] = 'check mode is not supported for this task.'
|
||||
elif self._task.async and self._play_context.check_mode:
|
||||
result['msg'] = 'check mode and async cannot be used on same task.'
|
||||
else:
|
||||
# we run!
|
||||
del result['skipped']
|
||||
|
||||
return result
|
||||
|
||||
def _remote_file_exists(self, path):
|
||||
cmd = self._connection._shell.exists(path)
|
||||
|
@ -189,27 +196,33 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
|
||||
return getattr(self, 'TRANSFERS_FILES', False)
|
||||
|
||||
def _late_needs_tmp_path(self, tmp, module_style):
|
||||
def _is_pipelining_enabled(self, module_style, wrap_async=False):
|
||||
'''
|
||||
Determines if a temp path is required after some early actions have already taken place.
|
||||
Determines if we are required and can do pipelining
|
||||
'''
|
||||
if tmp and "tmp" in tmp:
|
||||
# tmp has already been created
|
||||
return False
|
||||
if not self._connection.has_pipelining or not self._play_context.pipelining or C.DEFAULT_KEEP_REMOTE_FILES or self._play_context.become_method == 'su':
|
||||
# tmp is necessary to store the module source code
|
||||
# or we want to keep the files on the target system
|
||||
return True
|
||||
if module_style != "new":
|
||||
# even when conn has pipelining, old style modules need tmp to store arguments
|
||||
return True
|
||||
return False
|
||||
# any of these require a true
|
||||
for condition in [
|
||||
self._connection.has_pipelining,
|
||||
self._play_context.pipelining,
|
||||
module_style == "new", # old style modules do not support pipelining
|
||||
not C.DEFAULT_KEEP_REMOTE_FILES, # user wants remote files
|
||||
not wrap_async, # async does not support pipelining
|
||||
self._play_context.become_method != 'su', # su does not work with pipelining,
|
||||
# FIXME: we might need to make become_method exclusion a configurable list
|
||||
]:
|
||||
if not condition:
|
||||
return False
|
||||
|
||||
def _make_tmp_path(self, remote_user):
|
||||
return True
|
||||
|
||||
def _make_tmp_path(self, remote_user=None):
|
||||
'''
|
||||
Create and return a temporary path on a remote box.
|
||||
'''
|
||||
|
||||
if remote_user is None:
|
||||
remote_user = self._play_context.remote_user
|
||||
|
||||
basefile = 'ansible-tmp-%s-%s' % (time.time(), random.randint(0, 2**48))
|
||||
use_system_tmp = False
|
||||
|
||||
|
@ -248,6 +261,8 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
if 'stdout' in result and result['stdout'] != u'':
|
||||
output = output + u": %s" % result['stdout']
|
||||
raise AnsibleConnectionFailure(output)
|
||||
else:
|
||||
self._cleanup_remote_tmp = True
|
||||
|
||||
try:
|
||||
stdout_parts = result['stdout'].strip().split('%s=' % basefile, 1)
|
||||
|
@ -275,7 +290,12 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
cmd = self._connection._shell.remove(tmp_path, recurse=True)
|
||||
# If we have gotten here we have a working ssh configuration.
|
||||
# If ssh breaks we could leave tmp directories out on the remote system.
|
||||
self._low_level_execute_command(cmd, sudoable=False)
|
||||
tmp_rm_res = self._low_level_execute_command(cmd, sudoable=False)
|
||||
|
||||
tmp_rm_data = self._parse_returned_data(tmp_rm_res)
|
||||
if tmp_rm_data.get('rc', 0) != 0:
|
||||
display.warning('Error deleting remote temporary files (rc: {0}, stderr: {1})'.format(tmp_rm_res.get('rc'),
|
||||
tmp_rm_res.get('stderr', 'No error string available.')))
|
||||
|
||||
def _transfer_file(self, local_path, remote_path):
|
||||
self._connection.put_file(local_path, remote_path)
|
||||
|
@ -307,7 +327,7 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
|
||||
return remote_path
|
||||
|
||||
def _fixup_perms(self, remote_path, remote_user, execute=True, recursive=True):
|
||||
def _fixup_perms(self, remote_path, remote_user=None, execute=True, recursive=True):
|
||||
"""
|
||||
We need the files we upload to be readable (and sometimes executable)
|
||||
by the user being sudo'd to but we want to limit other people's access
|
||||
|
@ -319,6 +339,8 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
for custom actions (non-recursive mode only).
|
||||
|
||||
"""
|
||||
if remote_user is None:
|
||||
remote_user = self._play_context.remote_user
|
||||
|
||||
display.deprecated('_fixup_perms is deprecated. Use _fixup_perms2 instead.', version='2.4', removed=False)
|
||||
|
||||
|
@ -329,7 +351,7 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
|
||||
return self._fixup_perms2([remote_path], remote_user, execute)
|
||||
|
||||
def _fixup_perms2(self, remote_paths, remote_user, execute=True):
|
||||
def _fixup_perms2(self, remote_paths, remote_user=None, execute=True):
|
||||
"""
|
||||
We need the files we upload to be readable (and sometimes executable)
|
||||
by the user being sudo'd to but we want to limit other people's access
|
||||
|
@ -352,6 +374,9 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
information we only do this ansible is configured with
|
||||
"allow_world_readable_tmpfiles" in the ansible.cfg
|
||||
"""
|
||||
if remote_user is None:
|
||||
remote_user = self._play_context.remote_user
|
||||
|
||||
if self._connection._shell.SHELL_FAMILY == 'powershell':
|
||||
# This won't work on Powershell as-is, so we'll just completely skip until
|
||||
# we have a need for it, at which point we'll have to do something different.
|
||||
|
@ -403,12 +428,11 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
' (rc: {0}, err: {1}). For information on working around this,'
|
||||
' see https://docs.ansible.com/ansible/become.html#becoming-an-unprivileged-user'.format(res['rc'], to_native(res['stderr'])))
|
||||
elif execute:
|
||||
# Can't depend on the file being transferred with execute
|
||||
# permissions. Only need user perms because no become was
|
||||
# used here
|
||||
# Can't depend on the file being transferred with execute permissions.
|
||||
# Only need user perms because no become was used here
|
||||
res = self._remote_chmod(remote_paths, 'u+x')
|
||||
if res['rc'] != 0:
|
||||
raise AnsibleError('Failed to set file mode on remote files (rc: {0}, err: {1})'.format(res['rc'], to_native(res['stderr'])))
|
||||
raise AnsibleError('Failed to set execute bit on remote files (rc: {0}, err: {1})'.format(res['rc'], to_native(res['stderr'])))
|
||||
|
||||
return remote_paths
|
||||
|
||||
|
@ -447,7 +471,7 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
get_checksum=True,
|
||||
checksum_algo='sha1',
|
||||
)
|
||||
mystat = self._execute_module(module_name='stat', module_args=module_args, task_vars=all_vars, tmp=tmp, delete_remote_tmp=(tmp is None))
|
||||
mystat = self._execute_module(module_name='stat', module_args=module_args, task_vars=all_vars, tmp=tmp, delete_remote_tmp=(tmp is None), wrap_async=False)
|
||||
|
||||
if mystat.get('failed'):
|
||||
msg = mystat.get('module_stderr')
|
||||
|
@ -479,7 +503,7 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
3 = its a directory, not a file
|
||||
4 = stat module failed, likely due to not finding python
|
||||
'''
|
||||
x = "0" # unknown error has occurred
|
||||
x = "0" # unknown error has occured
|
||||
try:
|
||||
remote_stat = self._execute_remote_stat(path, all_vars, follow=follow)
|
||||
if remote_stat['exists'] and remote_stat['isdir']:
|
||||
|
@ -562,23 +586,25 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
# let module know about filesystems that selinux treats specially
|
||||
module_args['_ansible_selinux_special_fs'] = C.DEFAULT_SELINUX_SPECIAL_FS
|
||||
|
||||
def _execute_module(self, module_name=None, module_args=None, tmp=None, task_vars=None, persist_files=False, delete_remote_tmp=True):
|
||||
|
||||
|
||||
def _execute_module(self, module_name=None, module_args=None, tmp=None, task_vars=None, persist_files=False, delete_remote_tmp=True, wrap_async=False):
|
||||
'''
|
||||
Transfer and run a module along with its arguments.
|
||||
'''
|
||||
if task_vars is None:
|
||||
task_vars = dict()
|
||||
|
||||
# if a module name was not specified for this execution, use
|
||||
# the action from the task
|
||||
remote_module_path = None
|
||||
args_file_path = None
|
||||
remote_files = []
|
||||
|
||||
# if a module name was not specified for this execution, use the action from the task
|
||||
if module_name is None:
|
||||
module_name = self._task.action
|
||||
if module_args is None:
|
||||
module_args = self._task.args
|
||||
|
||||
# Get the connection user for permission checks
|
||||
remote_user = task_vars.get('ansible_ssh_user') or self._play_context.remote_user
|
||||
|
||||
self._update_module_args(module_name, module_args, task_vars)
|
||||
|
||||
(module_style, shebang, module_data, module_path) = self._configure_module(module_name=module_name, module_args=module_args, task_vars=task_vars)
|
||||
|
@ -586,23 +612,18 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
if not shebang and module_style != 'binary':
|
||||
raise AnsibleError("module (%s) is missing interpreter line" % module_name)
|
||||
|
||||
# a remote tmp path may be necessary and not already created
|
||||
remote_module_path = None
|
||||
args_file_path = None
|
||||
if not tmp and self._late_needs_tmp_path(tmp, module_style):
|
||||
tmp = self._make_tmp_path(remote_user)
|
||||
if not self._is_pipelining_enabled(module_style, wrap_async):
|
||||
|
||||
# we might need remote tmp dir
|
||||
if not tmp or not 'tmp' in tmp:
|
||||
tmp = self._make_tmp_path()
|
||||
|
||||
if tmp and \
|
||||
(module_style != 'new' or \
|
||||
not self._connection.has_pipelining or \
|
||||
not self._play_context.pipelining or \
|
||||
C.DEFAULT_KEEP_REMOTE_FILES or \
|
||||
self._play_context.become_method == 'su'):
|
||||
remote_module_filename = self._connection._shell.get_remote_filename(module_path)
|
||||
remote_module_path = self._connection._shell.join_path(tmp, remote_module_filename)
|
||||
if module_style in ('old', 'non_native_want_json', 'binary'):
|
||||
# we'll also need a temp file to hold our module arguments
|
||||
args_file_path = self._connection._shell.join_path(tmp, 'args')
|
||||
|
||||
if module_style in ('old', 'non_native_want_json', 'binary'):
|
||||
# we'll also need a temp file to hold our module arguments
|
||||
args_file_path = self._connection._shell.join_path(tmp, 'args')
|
||||
|
||||
if remote_module_path or module_style != 'new':
|
||||
display.debug("transferring module to remote %s" % remote_module_path)
|
||||
|
@ -623,67 +644,101 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
|
||||
environment_string = self._compute_environment_string()
|
||||
|
||||
remote_files = None
|
||||
if tmp and remote_module_path:
|
||||
remote_files = [tmp, remote_module_path]
|
||||
|
||||
if args_file_path:
|
||||
remote_files = tmp, remote_module_path, args_file_path
|
||||
elif remote_module_path:
|
||||
remote_files = tmp, remote_module_path
|
||||
|
||||
# Fix permissions of the tmp path and tmp files. This should be
|
||||
# called after all files have been transferred.
|
||||
if remote_files:
|
||||
self._fixup_perms2(remote_files, remote_user)
|
||||
|
||||
cmd = ""
|
||||
in_data = None
|
||||
|
||||
if self._connection.has_pipelining and self._play_context.pipelining and not C.DEFAULT_KEEP_REMOTE_FILES and module_style == 'new':
|
||||
in_data = module_data
|
||||
else:
|
||||
if remote_module_path:
|
||||
cmd = remote_module_path
|
||||
|
||||
rm_tmp = None
|
||||
if tmp and "tmp" in tmp and not C.DEFAULT_KEEP_REMOTE_FILES and not persist_files and delete_remote_tmp:
|
||||
if not self._play_context.become or self._play_context.become_user == 'root':
|
||||
# not sudoing or sudoing to root, so can cleanup files in the same step
|
||||
rm_tmp = tmp
|
||||
|
||||
cmd = self._connection._shell.build_module_command(environment_string, shebang, cmd, arg_path=args_file_path, rm_tmp=rm_tmp)
|
||||
cmd = cmd.strip()
|
||||
remote_files.append(args_file_path)
|
||||
|
||||
sudoable = True
|
||||
if module_name == "accelerate":
|
||||
# always run the accelerate module as the user
|
||||
# specified in the play, not the sudo_user
|
||||
sudoable = False
|
||||
in_data = None
|
||||
cmd = ""
|
||||
|
||||
if wrap_async:
|
||||
# configure, upload, and chmod the async_wrapper module
|
||||
(async_module_style, shebang, async_module_data, async_module_path) = self._configure_module(module_name='async_wrapper', module_args=dict(), task_vars=task_vars)
|
||||
async_module_remote_filename = self._connection._shell.get_remote_filename(async_module_path)
|
||||
remote_async_module_path = self._connection._shell.join_path(tmp, async_module_remote_filename)
|
||||
self._transfer_data(remote_async_module_path, async_module_data)
|
||||
remote_files.append(remote_async_module_path)
|
||||
|
||||
async_limit = self._task.async
|
||||
async_jid = str(random.randint(0, 999999999999))
|
||||
|
||||
# call the interpreter for async_wrapper directly
|
||||
# this permits use of a script for an interpreter on non-Linux platforms
|
||||
# TODO: re-implement async_wrapper as a regular module to avoid this special case
|
||||
interpreter = shebang.replace('#!', '').strip()
|
||||
async_cmd = [interpreter, remote_async_module_path, async_jid, async_limit, remote_module_path]
|
||||
|
||||
if environment_string:
|
||||
async_cmd.insert(0, environment_string)
|
||||
|
||||
if args_file_path:
|
||||
async_cmd.append(args_file_path)
|
||||
else:
|
||||
# maintain a fixed number of positional parameters for async_wrapper
|
||||
async_cmd.append('_')
|
||||
|
||||
if not self._should_remove_tmp_path(tmp):
|
||||
async_cmd.append("-preserve_tmp")
|
||||
|
||||
cmd= " ".join(to_text(x) for x in async_cmd)
|
||||
|
||||
else:
|
||||
|
||||
if self._is_pipelining_enabled(module_style):
|
||||
in_data = module_data
|
||||
else:
|
||||
cmd = remote_module_path
|
||||
|
||||
rm_tmp = None
|
||||
|
||||
if self._should_remove_tmp_path(tmp) and not persist_files and delete_remote_tmp:
|
||||
if not self._play_context.become or self._play_context.become_user == 'root':
|
||||
# not sudoing or sudoing to root, so can cleanup files in the same step
|
||||
rm_tmp = tmp
|
||||
|
||||
cmd = self._connection._shell.build_module_command(environment_string, shebang, cmd, arg_path=args_file_path, rm_tmp=rm_tmp).strip()
|
||||
|
||||
if module_name == "accelerate":
|
||||
# always run the accelerate module as the user
|
||||
# specified in the play, not the sudo_user
|
||||
sudoable = False
|
||||
|
||||
# Fix permissions of the tmp path and tmp files. This should be called after all files have been transferred.
|
||||
if remote_files:
|
||||
# remove none/empty
|
||||
remote_files = [ x for x in remote_files if x]
|
||||
self._fixup_perms2(remote_files, self._play_context.remote_user)
|
||||
|
||||
# actually execute
|
||||
res = self._low_level_execute_command(cmd, sudoable=sudoable, in_data=in_data)
|
||||
|
||||
if tmp and "tmp" in tmp and not C.DEFAULT_KEEP_REMOTE_FILES and not persist_files and delete_remote_tmp:
|
||||
if self._play_context.become and self._play_context.become_user != 'root':
|
||||
# not sudoing to root, so maybe can't delete files as that other user
|
||||
# have to clean up temp files as original user in a second step
|
||||
tmp_rm_cmd = self._connection._shell.remove(tmp, recurse=True)
|
||||
tmp_rm_res = self._low_level_execute_command(tmp_rm_cmd, sudoable=False)
|
||||
tmp_rm_data = self._parse_returned_data(tmp_rm_res)
|
||||
if tmp_rm_data.get('rc', 0) != 0:
|
||||
display.warning('Error deleting remote temporary files (rc: {0}, stderr: {1})'.format(tmp_rm_res.get('rc'),
|
||||
tmp_rm_res.get('stderr', 'No error string available.')))
|
||||
|
||||
# parse the main result
|
||||
# parse the main result, also cleans up internal keys
|
||||
data = self._parse_returned_data(res)
|
||||
|
||||
# pre-split stdout into lines, if stdout is in the data and there
|
||||
# isn't already a stdout_lines value there
|
||||
#NOTE: INTERNAL KEYS ONLY ACCESSIBLE HERE
|
||||
# get internal info before cleaning
|
||||
tmpdir_delete = (not data.pop("_ansible_suppress_tmpdir_delete", False) and wrap_async)
|
||||
|
||||
# remove internal keys
|
||||
self._remove_internal_keys(data)
|
||||
data['_ansible_parsed'] = True
|
||||
|
||||
# cleanup tmp?
|
||||
if (self._play_context.become and self._play_context.become_user != 'root') and not persist_files and delete_remote_tmp or tmpdir_delete:
|
||||
self._remove_tmp_path(tmp)
|
||||
|
||||
#FIXME: for backwards compat, figure out if still makes sense
|
||||
if wrap_async:
|
||||
data['changed'] = True
|
||||
|
||||
# pre-split stdout/stderr into lines if needed
|
||||
if 'stdout' in data and 'stdout_lines' not in data:
|
||||
data['stdout_lines'] = data.get('stdout', u'').splitlines()
|
||||
|
||||
# remove bad/empty internal keys
|
||||
for key in ['warnings', 'deprecations']:
|
||||
if key in data and not data[key]:
|
||||
del data[key]
|
||||
if 'stderr' in data and 'stderr_lines' not in data:
|
||||
data['stderr_lines'] = data.get('stderr', u'').splitlines()
|
||||
|
||||
display.debug("done with _execute_module (%s, %s)" % (module_name, module_args))
|
||||
return data
|
||||
|
@ -694,6 +749,12 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
display.warning("Removed unexpected internal key in module return: %s = %s" % (key, data[key]))
|
||||
del data[key]
|
||||
|
||||
# remove bad/empty internal keys
|
||||
for key in ['warnings', 'deprecations']:
|
||||
if key in data and not data[key]:
|
||||
del data[key]
|
||||
|
||||
|
||||
def _clean_returned_data(self, data):
|
||||
remove_keys = set()
|
||||
fact_keys = set(data.keys())
|
||||
|
@ -737,8 +798,6 @@ class ActionBase(with_metaclass(ABCMeta, object)):
|
|||
display.warning(w)
|
||||
|
||||
data = json.loads(filtered_output)
|
||||
self._remove_internal_keys(data)
|
||||
data['_ansible_parsed'] = True
|
||||
|
||||
if 'ansible_facts' in data and isinstance(data['ansible_facts'], dict):
|
||||
self._clean_returned_data(data['ansible_facts'])
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue