Acclerate improvements

* Added capability to support multiple keys, so clients from different
  machines can connect to a single daemon instance
* Any activity on the daemon will cause the timeout to extend, so that the
  daemon must be idle for the full number of minutes before it will auto-
  shutdown
* Various other small fixes to remove some redundancy

Fixes #5171
This commit is contained in:
James Cammarata 2014-03-19 12:59:06 -05:00
commit 3ea5d573aa
3 changed files with 340 additions and 108 deletions

View file

@ -22,10 +22,10 @@ import socket
import struct
import time
from ansible.callbacks import vvv, vvvv
from ansible.errors import AnsibleError, AnsibleFileNotFound
from ansible.runner.connection_plugins.ssh import Connection as SSHConnection
from ansible.runner.connection_plugins.paramiko_ssh import Connection as ParamikoConnection
from ansible import utils
from ansible import errors
from ansible import constants
# the chunk size to read and send, assuming mtu 1500 and
@ -85,7 +85,9 @@ class Connection(object):
utils.AES_KEYS = self.runner.aes_keys
def _execute_accelerate_module(self):
args = "password=%s port=%s debug=%d ipv6=%s" % (base64.b64encode(self.key.__str__()), str(self.accport), int(utils.VERBOSITY), self.runner.accelerate_ipv6)
args = "password=%s port=%s minutes=%d debug=%d ipv6=%s" % (base64.b64encode(self.key.__str__()), str(self.accport), constants.ACCELERATE_TIMEOUT, int(utils.VERBOSITY), self.runner.accelerate_ipv6)
if constants.ACCELERATE_MULTI_KEY:
args += " multi_key=yes"
inject = dict(password=self.key)
if getattr(self.runner, 'accelerate_inventory_host', False):
inject = utils.combine_vars(inject, self.runner.inventory.get_variables(self.runner.accelerate_inventory_host))
@ -109,33 +111,38 @@ class Connection(object):
while tries > 0:
try:
self.conn.connect((self.host,self.accport))
if not self.validate_user():
# the accelerated daemon was started with a
# different remote_user. The above command
# should have caused the accelerate daemon to
# shutdown, so we'll reconnect.
wrong_user = True
break
except:
vvvv("failed, retrying...")
except socket.error:
vvvv("connection to %s failed, retrying..." % self.host)
time.sleep(0.1)
tries -= 1
if tries == 0:
vvv("Could not connect via the accelerated connection, exceeded # of tries")
raise errors.AnsibleError("Failed to connect")
raise AnsibleError("FAILED")
elif wrong_user:
vvv("Restarting daemon with a different remote_user")
raise errors.AnsibleError("Wrong user")
raise AnsibleError("WRONG_USER")
self.conn.settimeout(constants.ACCELERATE_TIMEOUT)
except:
if not self.validate_user():
# the accelerated daemon was started with a
# different remote_user. The above command
# should have caused the accelerate daemon to
# shutdown, so we'll reconnect.
wrong_user = True
except AnsibleError, e:
if allow_ssh:
if "WRONG_USER" in e:
vvv("Switching users, waiting for the daemon on %s to shutdown completely..." % self.host)
time.sleep(5)
vvv("Falling back to ssh to startup accelerated mode")
res = self._execute_accelerate_module()
if not res.is_successful():
raise errors.AnsibleError("Failed to launch the accelerated daemon on %s (reason: %s)" % (self.host,res.result.get('msg')))
raise AnsibleError("Failed to launch the accelerated daemon on %s (reason: %s)" % (self.host,res.result.get('msg')))
return self.connect(allow_ssh=False)
else:
raise errors.AnsibleError("Failed to connect to %s:%s" % (self.host,self.accport))
raise AnsibleError("Failed to connect to %s:%s" % (self.host,self.accport))
self.is_connected = True
return self
@ -163,11 +170,12 @@ class Connection(object):
if not d:
vvvv("%s: received nothing, bailing out" % self.host)
return None
vvvv("%s: received %d bytes" % (self.host, len(d)))
data += d
vvvv("%s: received all of the data, returning" % self.host)
return data
except socket.timeout:
raise errors.AnsibleError("timed out while waiting to receive data")
raise AnsibleError("timed out while waiting to receive data")
def validate_user(self):
'''
@ -176,6 +184,7 @@ class Connection(object):
daemon to exit if they don't match
'''
vvvv("%s: sending request for validate_user" % self.host)
data = dict(
mode='validate_user',
username=self.user,
@ -183,15 +192,16 @@ class Connection(object):
data = utils.jsonify(data)
data = utils.encrypt(self.key, data)
if self.send_data(data):
raise errors.AnsibleError("Failed to send command to %s" % self.host)
raise AnsibleError("Failed to send command to %s" % self.host)
vvvv("%s: waiting for validate_user response" % self.host)
while True:
# we loop here while waiting for the response, because a
# long running command may cause us to receive keepalive packets
# ({"pong":"true"}) rather than the response we want.
response = self.recv_data()
if not response:
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
raise AnsibleError("Failed to get a response from %s" % self.host)
response = utils.decrypt(self.key, response)
response = utils.parse_json(response)
if "pong" in response:
@ -199,11 +209,11 @@ class Connection(object):
vvvv("%s: received a keepalive packet" % self.host)
continue
else:
vvvv("%s: received the response" % self.host)
vvvv("%s: received the validate_user response: %s" % (self.host, response))
break
if response.get('failed'):
raise errors.AnsibleError("Error while validating user: %s" % response.get("msg"))
return False
else:
return response.get('rc') == 0
@ -211,10 +221,10 @@ class Connection(object):
''' run a command on the remote host '''
if su or su_user:
raise errors.AnsibleError("Internal Error: this module does not support running commands via su")
raise AnsibleError("Internal Error: this module does not support running commands via su")
if in_data:
raise errors.AnsibleError("Internal Error: this module does not support optimized module pipelining")
raise AnsibleError("Internal Error: this module does not support optimized module pipelining")
if executable == "":
executable = constants.DEFAULT_EXECUTABLE
@ -233,7 +243,7 @@ class Connection(object):
data = utils.jsonify(data)
data = utils.encrypt(self.key, data)
if self.send_data(data):
raise errors.AnsibleError("Failed to send command to %s" % self.host)
raise AnsibleError("Failed to send command to %s" % self.host)
while True:
# we loop here while waiting for the response, because a
@ -241,7 +251,7 @@ class Connection(object):
# ({"pong":"true"}) rather than the response we want.
response = self.recv_data()
if not response:
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
raise AnsibleError("Failed to get a response from %s" % self.host)
response = utils.decrypt(self.key, response)
response = utils.parse_json(response)
if "pong" in response:
@ -260,7 +270,7 @@ class Connection(object):
vvv("PUT %s TO %s" % (in_path, out_path), host=self.host)
if not os.path.exists(in_path):
raise errors.AnsibleFileNotFound("file or module does not exist: %s" % in_path)
raise AnsibleFileNotFound("file or module does not exist: %s" % in_path)
fd = file(in_path, 'rb')
fstat = os.stat(in_path)
@ -279,27 +289,27 @@ class Connection(object):
data = utils.encrypt(self.key, data)
if self.send_data(data):
raise errors.AnsibleError("failed to send the file to %s" % self.host)
raise AnsibleError("failed to send the file to %s" % self.host)
response = self.recv_data()
if not response:
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
raise AnsibleError("Failed to get a response from %s" % self.host)
response = utils.decrypt(self.key, response)
response = utils.parse_json(response)
if response.get('failed',False):
raise errors.AnsibleError("failed to put the file in the requested location")
raise AnsibleError("failed to put the file in the requested location")
finally:
fd.close()
vvvv("waiting for final response after PUT")
response = self.recv_data()
if not response:
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
raise AnsibleError("Failed to get a response from %s" % self.host)
response = utils.decrypt(self.key, response)
response = utils.parse_json(response)
if response.get('failed',False):
raise errors.AnsibleError("failed to put the file in the requested location")
raise AnsibleError("failed to put the file in the requested location")
def fetch_file(self, in_path, out_path):
''' save a remote file to the specified path '''
@ -309,7 +319,7 @@ class Connection(object):
data = utils.jsonify(data)
data = utils.encrypt(self.key, data)
if self.send_data(data):
raise errors.AnsibleError("failed to initiate the file fetch with %s" % self.host)
raise AnsibleError("failed to initiate the file fetch with %s" % self.host)
fh = open(out_path, "w")
try:
@ -317,11 +327,11 @@ class Connection(object):
while True:
response = self.recv_data()
if not response:
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
raise AnsibleError("Failed to get a response from %s" % self.host)
response = utils.decrypt(self.key, response)
response = utils.parse_json(response)
if response.get('failed', False):
raise errors.AnsibleError("Error during file fetch, aborting")
raise AnsibleError("Error during file fetch, aborting")
out = base64.b64decode(response['data'])
fh.write(out)
bytes += len(out)
@ -330,7 +340,7 @@ class Connection(object):
data = utils.jsonify(dict())
data = utils.encrypt(self.key, data)
if self.send_data(data):
raise errors.AnsibleError("failed to send ack during file fetch")
raise AnsibleError("failed to send ack during file fetch")
if response.get('last', False):
break
finally: