diff --git a/lib/ansible/runner/connection_plugins/fireball2.py b/lib/ansible/runner/connection_plugins/fireball2.py index 1984f3425e..55da006540 100644 --- a/lib/ansible/runner/connection_plugins/fireball2.py +++ b/lib/ansible/runner/connection_plugins/fireball2.py @@ -26,6 +26,13 @@ from ansible import utils from ansible import errors from ansible import constants +# the chunk size to read and send, assuming mtu 1500 and +# leaving room for base64 (+33%) encoding and header (8 bytes) +# ((1400-8)/4)*3) = 1044 +# which leaves room for the TCP/IP header. We set this to a +# multiple of the value to speed up file reads. +CHUNK_SIZE=1044*20 + class Connection(object): ''' raw socket accelerated connection ''' @@ -148,27 +155,42 @@ class Connection(object): if not os.path.exists(in_path): raise errors.AnsibleFileNotFound("file or module does not exist: %s" % in_path) - data = file(in_path).read() - data = base64.b64encode(data) - data = dict(mode='put', data=data, out_path=out_path) + fd = file(in_path, 'rb') + fstat = os.stat(in_path) + try: + vvv("PUT file is %d bytes" % fstat.st_size) + while fd.tell() < fstat.st_size: + data = fd.read(CHUNK_SIZE) + last = False + if fd.tell() >= fstat.st_size: + last = True + data = dict(mode='put', data=base64.b64encode(data), out_path=out_path, last=last) + if self.runner.sudo: + data['user'] = self.runner.sudo_user + data = utils.jsonify(data) + data = utils.encrypt(self.key, data) - if self.runner.sudo: - data['user'] = self.runner.sudo_user + if self.send_data(data): + raise errors.AnsibleError("failed to send the file to %s" % self.host) - # TODO: support chunked file transfer - data = utils.jsonify(data) - data = utils.encrypt(self.key, data) - if self.send_data(data): - raise errors.AnsibleError("failed to send the file to %s" % self.host) + response = self.recv_data() + if not response: + raise errors.AnsibleError("Failed to get a response from %s" % self.host) + response = utils.decrypt(self.key, response) + response = utils.parse_json(response) - response = self.recv_data() - if not response: - raise errors.AnsibleError("Failed to get a response from %s" % self.host) - response = utils.decrypt(self.key, response) - response = utils.parse_json(response) + if response.get('failed',False): + raise errors.AnsibleError("failed to put the file in the requested location") + finally: + fd.close() + response = self.recv_data() + if not response: + raise errors.AnsibleError("Failed to get a response from %s" % self.host) + response = utils.decrypt(self.key, response) + response = utils.parse_json(response) - if response.get('failed',False): - raise errors.AnsibleError("failed to put the file in the requested location") + if response.get('failed',False): + raise errors.AnsibleError("failed to put the file in the requested location") def fetch_file(self, in_path, out_path): ''' save a remote file to the specified path ''' @@ -180,17 +202,36 @@ class Connection(object): if self.send_data(data): raise errors.AnsibleError("failed to initiate the file fetch with %s" % self.host) - response = self.recv_data() - if not response: - raise errors.AnsibleError("Failed to get a response from %s" % self.host) - response = utils.decrypt(self.key, response) - response = utils.parse_json(response) - response = response['data'] - response = base64.b64decode(response) - fh = open(out_path, "w") - fh.write(response) - fh.close() + try: + bytes = 0 + while True: + response = self.recv_data() + if not response: + raise errors.AnsibleError("Failed to get a response from %s" % self.host) + response = utils.decrypt(self.key, response) + response = utils.parse_json(response) + if response.get('failed', False): + raise errors.AnsibleError("Error during file fetch, aborting") + out = base64.b64decode(response['data']) + fh.write(out) + bytes += len(out) + # send an empty response back to signify we + # received the last chunk without errors + data = utils.jsonify(dict()) + data = utils.encrypt(self.key, data) + if self.send_data(data): + raise errors.AnsibleError("failed to send ack during file fetch") + if response.get('last', False): + break + finally: + # we don't currently care about this final response, + # we just receive it and drop it. It may be used at some + # point in the future or we may just have the put/fetch + # operations not send back a final response at all + response = self.recv_data() + vvv("FETCH wrote %d bytes to %s" % (bytes, out_path)) + fh.close() def close(self): ''' terminate the connection ''' diff --git a/library/utilities/fireball2 b/library/utilities/fireball2 index 0e11dbfe4b..3c95d2921b 100644 --- a/library/utilities/fireball2 +++ b/library/utilities/fireball2 @@ -80,6 +80,12 @@ import SocketServer syslog.openlog('ansible-%s' % os.path.basename(__file__)) PIDFILE = os.path.expanduser("~/.fireball2.pid") +# the chunk size to read and send, assuming mtu 1500 and +# leaving room for base64 (+33%) encoding and header (100 bytes) +# 4 * (975/3) + 100 = 1400 +# which leaves room for the TCP/IP header +CHUNK_SIZE=10240 + def log(msg): syslog.syslog(syslog.LOG_NOTICE, msg) @@ -227,13 +233,40 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): if 'in_path' not in data: return dict(failed=True, msg='internal error: in_path is required') - # FIXME: should probably support chunked file transfer for binary files - # at some point. For now, just base64 encodes the file - # so don't use it to move ISOs, use rsync. + try: + fd = file(data['in_path'], 'rb') + fstat = os.stat(data['in_path']) + log("FETCH file is %d bytes" % fstat.st_size) + while fd.tell() < fstat.st_size: + data = fd.read(CHUNK_SIZE) + last = False + if fd.tell() >= fstat.st_size: + last = True + data = dict(data=base64.b64encode(data), last=last) + data = json.dumps(data) + data = self.server.key.Encrypt(data) - fh = open(data['in_path']) - data = base64.b64encode(fh.read()) - return dict(data=data) + if self.send_data(data): + return dict(failed=True, stderr="failed to send data") + + response = self.recv_data() + if not response: + log("failed to get a response, aborting") + return dict(failed=True, stderr="Failed to get a response from %s" % self.host) + response = self.server.key.Decrypt(response) + response = json.loads(response) + + if response.get('failed',False): + log("got a failed response from the master") + return dict(failed=True, stderr="Master reported failure, aborting transfer") + except Exception, e: + tb = traceback.format_exc() + log("failed to fetch the file: %s" % tb) + return dict(failed=True, stderr="Could not fetch the file: %s" % str(e)) + finally: + fd.close() + + return dict() def put(self, data): if 'data' not in data: @@ -251,19 +284,33 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): out_path = data['out_path'] out_fd = open(out_path, 'w') - # FIXME: should probably support chunked file transfer for binary files - # at some point. For now, just base64 encodes the file - # so don't use it to move ISOs, use rsync. - try: - out_fd.write(base64.b64decode(data['data'])) - out_fd.close() + bytes=0 + while True: + out = base64.b64decode(data['data']) + bytes += len(out) + out_fd.write(out) + response = json.dumps(dict()) + response = self.server.key.Encrypt(response) + self.send_data(response) + if data['last']: + break + data = self.recv_data() + if not data: + raise "" + data = self.server.key.Decrypt(data) + data = json.loads(data) except: + tb = traceback.format_exc() + log("failed to put the file: %s" % tb) return dict(failed=True, stdout="Could not write the file") + finally: + log("wrote %d bytes" % bytes) + out_fd.close() if final_path: log("moving %s to %s" % (out_path, final_path)) - args = ['sudo','mv',out_path,final_path] + args = ['sudo','cp',out_path,final_path] rc, stdout, stderr = self.server.module.run_command(args, close_fds=True) if rc != 0: return dict(failed=True, stdout="failed to copy the file into position with sudo")