Move ssh and local connection plugins from using raw select to selectors

At the moment, this change will use EPoll on Linux, KQueue on *BSDs,
etc, so it should alleviate problems with too many open file
descriptors.

* Bundle a copy of selectors2 so that we have the selectors API everywhere.
* Add licensing information to selectors2 file so it's clear what the
  licensing terms and conditions are.
* Exclude the bundled copy of selectors2 from our boilerplate code-smell test
* Rewrite ssh_run tests to attempt to work around problem with mocking
  select on shippable

Fixes #14143
This commit is contained in:
Toshio Kuratomi 2017-02-01 08:48:23 -08:00
parent 2c70450e23
commit d1a6b07fe1
7 changed files with 1100 additions and 227 deletions

View file

@ -1,5 +1,6 @@
# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
# Copyright 2015 Abhijit Menon-Sen <ams@2ndQuadrant.com>
# Copyright 2017 Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
@ -24,11 +25,11 @@ import fcntl
import hashlib
import os
import pty
import select
import subprocess
import time
from ansible import constants as C
from ansible.compat import selectors
from ansible.compat.six import PY3, text_type, binary_type
from ansible.compat.six.moves import shlex_quote
from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNotFound
@ -443,148 +444,158 @@ class Connection(ConnectionBase):
# they will race each other when we can't connect, and the connect
# timeout usually fails
timeout = 2 + self._play_context.timeout
rpipes = [p.stdout, p.stderr]
for fd in rpipes:
for fd in (p.stdout, p.stderr):
fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
# If we can send initial data without waiting for anything, we do so
# before we call select.
### TODO: bcoca would like to use SelectSelector() when open
# filehandles is low, then switch to more efficient ones when higher.
# select is faster when filehandles is low.
selector = selectors.DefaultSelector()
selector.register(p.stdout, selectors.EVENT_READ)
selector.register(p.stderr, selectors.EVENT_READ)
# If we can send initial data without waiting for anything, we do so
# before we start polling
if states[state] == 'ready_to_send' and in_data:
self._send_initial_data(stdin, in_data)
state += 1
while True:
rfd, wfd, efd = select.select(rpipes, [], [], timeout)
try:
while True:
events = selector.select(timeout)
# We pay attention to timeouts only while negotiating a prompt.
# We pay attention to timeouts only while negotiating a prompt.
if not rfd:
if state <= states.index('awaiting_escalation'):
# If the process has already exited, then it's not really a
# timeout; we'll let the normal error handling deal with it.
if p.poll() is not None:
if not events:
# We timed out
if state <= states.index('awaiting_escalation'):
# If the process has already exited, then it's not really a
# timeout; we'll let the normal error handling deal with it.
if p.poll() is not None:
break
self._terminate_process(p)
raise AnsibleError('Timeout (%ds) waiting for privilege escalation prompt: %s' % (timeout, to_native(b_stdout)))
# Read whatever output is available on stdout and stderr, and stop
# listening to the pipe if it's been closed.
for key, event in events:
if key.fileobj == p.stdout:
b_chunk = p.stdout.read()
if b_chunk == b'':
# stdout has been closed, stop watching it
selector.unregister(p.stdout)
# When ssh has ControlMaster (+ControlPath/Persist) enabled, the
# first connection goes into the background and we never see EOF
# on stderr. If we see EOF on stdout, lower the select timeout
# to reduce the time wasted selecting on stderr if we observe
# that the process has not yet existed after this EOF. Otherwise
# we may spend a long timeout period waiting for an EOF that is
# not going to arrive until the persisted connection closes.
timeout = 1
b_tmp_stdout += b_chunk
display.debug("stdout chunk (state=%s):\n>>>%s<<<\n" % (state, to_text(b_chunk)))
elif key.fileobj == p.stderr:
b_chunk = p.stderr.read()
if b_chunk == b'':
# stderr has been closed, stop watching it
selector.unregister(p.stderr)
b_tmp_stderr += b_chunk
display.debug("stderr chunk (state=%s):\n>>>%s<<<\n" % (state, to_text(b_chunk)))
# We examine the output line-by-line until we have negotiated any
# privilege escalation prompt and subsequent success/error message.
# Afterwards, we can accumulate output without looking at it.
if state < states.index('ready_to_send'):
if b_tmp_stdout:
b_output, b_unprocessed = self._examine_output('stdout', states[state], b_tmp_stdout, sudoable)
b_stdout += b_output
b_tmp_stdout = b_unprocessed
if b_tmp_stderr:
b_output, b_unprocessed = self._examine_output('stderr', states[state], b_tmp_stderr, sudoable)
b_stderr += b_output
b_tmp_stderr = b_unprocessed
else:
b_stdout += b_tmp_stdout
b_stderr += b_tmp_stderr
b_tmp_stdout = b_tmp_stderr = b''
# If we see a privilege escalation prompt, we send the password.
# (If we're expecting a prompt but the escalation succeeds, we
# didn't need the password and can carry on regardless.)
if states[state] == 'awaiting_prompt':
if self._flags['become_prompt']:
display.debug('Sending become_pass in response to prompt')
stdin.write(to_bytes(self._play_context.become_pass) + b'\n')
self._flags['become_prompt'] = False
state += 1
elif self._flags['become_success']:
state += 1
# We've requested escalation (with or without a password), now we
# wait for an error message or a successful escalation.
if states[state] == 'awaiting_escalation':
if self._flags['become_success']:
display.debug('Escalation succeeded')
self._flags['become_success'] = False
state += 1
elif self._flags['become_error']:
display.debug('Escalation failed')
self._terminate_process(p)
self._flags['become_error'] = False
raise AnsibleError('Incorrect %s password' % self._play_context.become_method)
elif self._flags['become_nopasswd_error']:
display.debug('Escalation requires password')
self._terminate_process(p)
self._flags['become_nopasswd_error'] = False
raise AnsibleError('Missing %s password' % self._play_context.become_method)
elif self._flags['become_prompt']:
# This shouldn't happen, because we should see the "Sorry,
# try again" message first.
display.debug('Escalation prompt repeated')
self._terminate_process(p)
self._flags['become_prompt'] = False
raise AnsibleError('Incorrect %s password' % self._play_context.become_method)
# Once we're sure that the privilege escalation prompt, if any, has
# been dealt with, we can send any initial data and start waiting
# for output.
if states[state] == 'ready_to_send':
if in_data:
self._send_initial_data(stdin, in_data)
state += 1
# Now we're awaiting_exit: has the child process exited? If it has,
# and we've read all available output from it, we're done.
if p.poll() is not None:
if not selector.get_map() or not events:
break
self._terminate_process(p)
raise AnsibleError('Timeout (%ds) waiting for privilege escalation prompt: %s' % (timeout, to_native(b_stdout)))
# We should not see further writes to the stdout/stderr file
# descriptors after the process has closed, set the select
# timeout to gather any last writes we may have missed.
timeout = 0
continue
# Read whatever output is available on stdout and stderr, and stop
# listening to the pipe if it's been closed.
# If the process has not yet exited, but we've already read EOF from
# its stdout and stderr (and thus no longer watching any file
# descriptors), we can just wait for it to exit.
if p.stdout in rfd:
b_chunk = p.stdout.read()
if b_chunk == b'':
rpipes.remove(p.stdout)
# When ssh has ControlMaster (+ControlPath/Persist) enabled, the
# first connection goes into the background and we never see EOF
# on stderr. If we see EOF on stdout, lower the select timeout
# to reduce the time wasted selecting on stderr if we observe
# that the process has not yet existed after this EOF. Otherwise
# we may spend a long timeout period waiting for an EOF that is
# not going to arrive until the persisted connection closes.
timeout = 1
b_tmp_stdout += b_chunk
display.debug("stdout chunk (state=%s):\n>>>%s<<<\n" % (state, to_text(b_chunk)))
if p.stderr in rfd:
b_chunk = p.stderr.read()
if b_chunk == b'':
rpipes.remove(p.stderr)
b_tmp_stderr += b_chunk
display.debug("stderr chunk (state=%s):\n>>>%s<<<\n" % (state, to_text(b_chunk)))
# We examine the output line-by-line until we have negotiated any
# privilege escalation prompt and subsequent success/error message.
# Afterwards, we can accumulate output without looking at it.
if state < states.index('ready_to_send'):
if b_tmp_stdout:
b_output, b_unprocessed = self._examine_output('stdout', states[state], b_tmp_stdout, sudoable)
b_stdout += b_output
b_tmp_stdout = b_unprocessed
if b_tmp_stderr:
b_output, b_unprocessed = self._examine_output('stderr', states[state], b_tmp_stderr, sudoable)
b_stderr += b_output
b_tmp_stderr = b_unprocessed
else:
b_stdout += b_tmp_stdout
b_stderr += b_tmp_stderr
b_tmp_stdout = b_tmp_stderr = b''
# If we see a privilege escalation prompt, we send the password.
# (If we're expecting a prompt but the escalation succeeds, we
# didn't need the password and can carry on regardless.)
if states[state] == 'awaiting_prompt':
if self._flags['become_prompt']:
display.debug('Sending become_pass in response to prompt')
stdin.write(to_bytes(self._play_context.become_pass) + b'\n')
self._flags['become_prompt'] = False
state += 1
elif self._flags['become_success']:
state += 1
# We've requested escalation (with or without a password), now we
# wait for an error message or a successful escalation.
if states[state] == 'awaiting_escalation':
if self._flags['become_success']:
display.debug('Escalation succeeded')
self._flags['become_success'] = False
state += 1
elif self._flags['become_error']:
display.debug('Escalation failed')
self._terminate_process(p)
self._flags['become_error'] = False
raise AnsibleError('Incorrect %s password' % self._play_context.become_method)
elif self._flags['become_nopasswd_error']:
display.debug('Escalation requires password')
self._terminate_process(p)
self._flags['become_nopasswd_error'] = False
raise AnsibleError('Missing %s password' % self._play_context.become_method)
elif self._flags['become_prompt']:
# This shouldn't happen, because we should see the "Sorry,
# try again" message first.
display.debug('Escalation prompt repeated')
self._terminate_process(p)
self._flags['become_prompt'] = False
raise AnsibleError('Incorrect %s password' % self._play_context.become_method)
# Once we're sure that the privilege escalation prompt, if any, has
# been dealt with, we can send any initial data and start waiting
# for output.
if states[state] == 'ready_to_send':
if in_data:
self._send_initial_data(stdin, in_data)
state += 1
# Now we're awaiting_exit: has the child process exited? If it has,
# and we've read all available output from it, we're done.
if p.poll() is not None:
if not rpipes or not rfd:
elif not selector.get_map():
p.wait()
break
# We should not see further writes to the stdout/stderr file
# descriptors after the process has closed, set the select
# timeout to gather any last writes we may have missed.
timeout = 0
continue
# If the process has not yet exited, but we've already read EOF from
# its stdout and stderr (and thus removed both from rpipes), we can
# just wait for it to exit.
elif not rpipes:
p.wait()
break
# Otherwise there may still be outstanding data to read.
# close stdin after process is terminated and stdout/stderr are read
# completely (see also issue #848)
stdin.close()
# Otherwise there may still be outstanding data to read.
finally:
selector.close()
# close stdin after process is terminated and stdout/stderr are read
# completely (see also issue #848)
stdin.close()
if C.HOST_KEY_CHECKING:
if cmd[0] == b"sshpass" and p.returncode == 6: