mirror of
https://github.com/ansible-collections/community.general.git
synced 2025-04-25 03:41:25 -07:00
We couldn't copy to_unicode, to_bytes, to_str into module_utils because of licensing. So once created it we had two sets of functions that did the same things but had different implementations. To remedy that, this change removes the ansible.utils.unicode versions of those functions.
256 lines
9.9 KiB
Python
256 lines
9.9 KiB
Python
# (c) 2014, Chris Church <chris@ninemoreminutes.com>
|
|
#
|
|
# This file is part of Ansible.
|
|
#
|
|
# Ansible is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Ansible is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import base64
|
|
import os
|
|
import re
|
|
import shlex
|
|
|
|
from ansible.errors import AnsibleError
|
|
from ansible.module_utils._text import to_bytes, to_text
|
|
|
|
|
|
_common_args = ['PowerShell', '-NoProfile', '-NonInteractive', '-ExecutionPolicy', 'Unrestricted']
|
|
|
|
# Primarily for testing, allow explicitly specifying PowerShell version via
|
|
# an environment variable.
|
|
_powershell_version = os.environ.get('POWERSHELL_VERSION', None)
|
|
if _powershell_version:
|
|
_common_args = ['PowerShell', '-Version', _powershell_version] + _common_args[1:]
|
|
|
|
|
|
class ShellModule(object):
|
|
|
|
# Common shell filenames that this plugin handles
|
|
# Powershell is handled differently. It's selected when winrm is the
|
|
# connection
|
|
COMPATIBLE_SHELLS = frozenset()
|
|
# Family of shells this has. Must match the filename without extension
|
|
SHELL_FAMILY = 'powershell'
|
|
|
|
env = dict()
|
|
|
|
# We're being overly cautious about which keys to accept (more so than
|
|
# the Windows environment is capable of doing), since the powershell
|
|
# env provider's limitations don't appear to be documented.
|
|
safe_envkey = re.compile(r'^[\d\w_]{1,255}$')
|
|
|
|
def assert_safe_env_key(self, key):
|
|
if not self.safe_envkey.match(key):
|
|
raise AnsibleError("Invalid PowerShell environment key: %s" % key)
|
|
return key
|
|
|
|
def safe_env_value(self, key, value):
|
|
if len(value) > 32767:
|
|
raise AnsibleError("PowerShell environment value for key '%s' exceeds 32767 characters in length" % key)
|
|
# powershell single quoted literals need single-quote doubling as their only escaping
|
|
value = value.replace("'", "''")
|
|
return to_text(value, errors='surrogate_or_strict')
|
|
|
|
def env_prefix(self, **kwargs):
|
|
env = self.env.copy()
|
|
env.update(kwargs)
|
|
return ';'.join(["$env:%s='%s'" % (self.assert_safe_env_key(k), self.safe_env_value(k,v)) for k,v in env.items()])
|
|
|
|
def join_path(self, *args):
|
|
parts = []
|
|
for arg in args:
|
|
arg = self._unquote(arg).replace('/', '\\')
|
|
parts.extend([a for a in arg.split('\\') if a])
|
|
path = '\\'.join(parts)
|
|
if path.startswith('~'):
|
|
return path
|
|
return '\'%s\'' % path
|
|
|
|
def get_remote_filename(self, pathname):
|
|
# powershell requires that script files end with .ps1
|
|
base_name = os.path.basename(pathname.strip())
|
|
name, ext = os.path.splitext(base_name.strip())
|
|
if ext.lower() not in ['.ps1', '.exe']:
|
|
return name + '.ps1'
|
|
|
|
return base_name.strip()
|
|
|
|
def path_has_trailing_slash(self, path):
|
|
# Allow Windows paths to be specified using either slash.
|
|
path = self._unquote(path)
|
|
return path.endswith('/') or path.endswith('\\')
|
|
|
|
def chmod(self, paths, mode):
|
|
raise NotImplementedError('chmod is not implemented for Powershell')
|
|
|
|
def chown(self, paths, user):
|
|
raise NotImplementedError('chown is not implemented for Powershell')
|
|
|
|
def set_user_facl(self, paths, user, mode):
|
|
raise NotImplementedError('set_user_facl is not implemented for Powershell')
|
|
|
|
def remove(self, path, recurse=False):
|
|
path = self._escape(self._unquote(path))
|
|
if recurse:
|
|
return self._encode_script('''Remove-Item "%s" -Force -Recurse;''' % path)
|
|
else:
|
|
return self._encode_script('''Remove-Item "%s" -Force;''' % path)
|
|
|
|
def mkdtemp(self, basefile, system=False, mode=None):
|
|
basefile = self._escape(self._unquote(basefile))
|
|
# FIXME: Support system temp path!
|
|
return self._encode_script('''(New-Item -Type Directory -Path $env:temp -Name "%s").FullName | Write-Host -Separator '';''' % basefile)
|
|
|
|
def expand_user(self, user_home_path):
|
|
# PowerShell only supports "~" (not "~username"). Resolve-Path ~ does
|
|
# not seem to work remotely, though by default we are always starting
|
|
# in the user's home directory.
|
|
user_home_path = self._unquote(user_home_path)
|
|
if user_home_path == '~':
|
|
script = 'Write-Host (Get-Location).Path'
|
|
elif user_home_path.startswith('~\\'):
|
|
script = 'Write-Host ((Get-Location).Path + "%s")' % self._escape(user_home_path[1:])
|
|
else:
|
|
script = 'Write-Host "%s"' % self._escape(user_home_path)
|
|
return self._encode_script(script)
|
|
|
|
def exists(self, path):
|
|
path = self._escape(self._unquote(path))
|
|
script = '''
|
|
If (Test-Path "%s")
|
|
{
|
|
$res = 0;
|
|
}
|
|
Else
|
|
{
|
|
$res = 1;
|
|
}
|
|
Write-Host "$res";
|
|
Exit $res;
|
|
''' % path
|
|
return self._encode_script(script)
|
|
|
|
def checksum(self, path, *args, **kwargs):
|
|
path = self._escape(self._unquote(path))
|
|
script = '''
|
|
If (Test-Path -PathType Leaf "%(path)s")
|
|
{
|
|
$sp = new-object -TypeName System.Security.Cryptography.SHA1CryptoServiceProvider;
|
|
$fp = [System.IO.File]::Open("%(path)s", [System.IO.Filemode]::Open, [System.IO.FileAccess]::Read);
|
|
[System.BitConverter]::ToString($sp.ComputeHash($fp)).Replace("-", "").ToLower();
|
|
$fp.Dispose();
|
|
}
|
|
ElseIf (Test-Path -PathType Container "%(path)s")
|
|
{
|
|
Write-Host "3";
|
|
}
|
|
Else
|
|
{
|
|
Write-Host "1";
|
|
}
|
|
''' % dict(path=path)
|
|
return self._encode_script(script)
|
|
|
|
def build_module_command(self, env_string, shebang, cmd, arg_path=None, rm_tmp=None):
|
|
cmd_parts = shlex.split(to_bytes(cmd), posix=False)
|
|
cmd_parts = map(to_text, cmd_parts)
|
|
if shebang and shebang.lower() == '#!powershell':
|
|
if not self._unquote(cmd_parts[0]).lower().endswith('.ps1'):
|
|
cmd_parts[0] = '"%s.ps1"' % self._unquote(cmd_parts[0])
|
|
cmd_parts.insert(0, '&')
|
|
elif shebang and shebang.startswith('#!'):
|
|
cmd_parts.insert(0, shebang[2:])
|
|
elif not shebang:
|
|
# The module is assumed to be a binary
|
|
cmd_parts[0] = self._unquote(cmd_parts[0])
|
|
cmd_parts.append(arg_path)
|
|
script = '''
|
|
Try
|
|
{
|
|
%s
|
|
%s
|
|
}
|
|
Catch
|
|
{
|
|
$_obj = @{ failed = $true }
|
|
If ($_.Exception.GetType)
|
|
{
|
|
$_obj.Add('msg', $_.Exception.Message)
|
|
}
|
|
Else
|
|
{
|
|
$_obj.Add('msg', $_.ToString())
|
|
}
|
|
If ($_.InvocationInfo.PositionMessage)
|
|
{
|
|
$_obj.Add('exception', $_.InvocationInfo.PositionMessage)
|
|
}
|
|
ElseIf ($_.ScriptStackTrace)
|
|
{
|
|
$_obj.Add('exception', $_.ScriptStackTrace)
|
|
}
|
|
Try
|
|
{
|
|
$_obj.Add('error_record', ($_ | ConvertTo-Json | ConvertFrom-Json))
|
|
}
|
|
Catch
|
|
{
|
|
}
|
|
Echo $_obj | ConvertTo-Json -Compress -Depth 99
|
|
Exit 1
|
|
}
|
|
''' % (env_string, ' '.join(cmd_parts))
|
|
if rm_tmp:
|
|
rm_tmp = self._escape(self._unquote(rm_tmp))
|
|
rm_cmd = 'Remove-Item "%s" -Force -Recurse -ErrorAction SilentlyContinue' % rm_tmp
|
|
script = '%s\nFinally { %s }' % (script, rm_cmd)
|
|
return self._encode_script(script)
|
|
|
|
def _unquote(self, value):
|
|
'''Remove any matching quotes that wrap the given value.'''
|
|
value = to_text(value or '')
|
|
m = re.match(r'^\s*?\'(.*?)\'\s*?$', value)
|
|
if m:
|
|
return m.group(1)
|
|
m = re.match(r'^\s*?"(.*?)"\s*?$', value)
|
|
if m:
|
|
return m.group(1)
|
|
return value
|
|
|
|
def _escape(self, value, include_vars=False):
|
|
'''Return value escaped for use in PowerShell command.'''
|
|
# http://www.techotopia.com/index.php/Windows_PowerShell_1.0_String_Quoting_and_Escape_Sequences
|
|
# http://stackoverflow.com/questions/764360/a-list-of-string-replacements-in-python
|
|
subs = [('\n', '`n'), ('\r', '`r'), ('\t', '`t'), ('\a', '`a'),
|
|
('\b', '`b'), ('\f', '`f'), ('\v', '`v'), ('"', '`"'),
|
|
('\'', '`\''), ('`', '``'), ('\x00', '`0')]
|
|
if include_vars:
|
|
subs.append(('$', '`$'))
|
|
pattern = '|'.join('(%s)' % re.escape(p) for p, s in subs)
|
|
substs = [s for p, s in subs]
|
|
replace = lambda m: substs[m.lastindex - 1]
|
|
return re.sub(pattern, replace, value)
|
|
|
|
def _encode_script(self, script, as_list=False, strict_mode=True):
|
|
'''Convert a PowerShell script to a single base64-encoded command.'''
|
|
script = to_text(script)
|
|
if strict_mode:
|
|
script = u'Set-StrictMode -Version Latest\r\n%s' % script
|
|
script = '\n'.join([x.strip() for x in script.splitlines() if x.strip()])
|
|
encoded_script = base64.b64encode(script.encode('utf-16-le'))
|
|
cmd_parts = _common_args + ['-EncodedCommand', encoded_script]
|
|
if as_list:
|
|
return cmd_parts
|
|
return ' '.join(cmd_parts)
|