Fix sanity errors

This commit is contained in:
Jorge Gallegos 2025-09-04 18:28:04 -07:00
commit 0296c92c00
No known key found for this signature in database

View file

@ -1,6 +1,8 @@
# Copyright (c) 2025 Red Hat # Copyright (c) 2025 Red Hat
# GNU General Public License v3.0+ https://www.gnu.org/licenses/gpl-3.0.txt # GNU General Public License v3.0+ https://www.gnu.org/licenses/gpl-3.0.txt
from __future__ import annotations
# I had to duplicate (almost?) all of the documentation found in the # I had to duplicate (almost?) all of the documentation found in the
# ansible.plugins.connection.ssh plugin, due to how ansible-doc and ansible-test sanity # ansible.plugins.connection.ssh plugin, due to how ansible-doc and ansible-test sanity
# work, they look at the lexical structure of the code. I had initially done: # work, they look at the lexical structure of the code. I had initially done:
@ -13,13 +15,14 @@
# 2. Make host_key_checking default to False # 2. Make host_key_checking default to False
# 3. Added known_hosts_file option pointing to ~/.ssh/google_compute_known_hosts # 3. Added known_hosts_file option pointing to ~/.ssh/google_compute_known_hosts
DOCUMENTATION = """ DOCUMENTATION = """
name: gcloud-iap name: iap
short_description: connect via SSH through Google Cloud's Identity Aware Proxy (IAP) short_description: connect via SSH through Google Cloud's Identity Aware Proxy (IAP)
description: description:
- This connection plugin behaves almost like the stock SSH plugin, but it creates - This connection plugin behaves almost like the stock SSH plugin, but it creates
a new IAP process per host in the inventory so connections are tunneled through a new IAP process per host in the inventory so connections are tunneled through
it. it.
author: Jorge A Gallegos (jgallego@redhat.com) author:
- Jorge A Gallegos (@thekad)
notes: notes:
- This plugin requires you to have configured gcloud authentication prior to using - This plugin requires you to have configured gcloud authentication prior to using
it. You can change the active configuration used, but the plugin won't auth it. You can change the active configuration used, but the plugin won't auth
@ -487,7 +490,6 @@ DOCUMENTATION = """
""" """
import os import os
import os.path as ospath
import re import re
import pty import pty
import shlex import shlex
@ -498,10 +500,11 @@ import threading
import time import time
import tempfile import tempfile
import typing as T import typing as T
from os import path as ospath
import ansible.plugins.connection.ssh as sshconn from ansible.plugins.connection import ssh as sshconn
import ansible.errors as errors from ansible import errors
import ansible.utils.display as display from ansible.utils import display
D = display.Display() D = display.Display()
DEFAULT_GCLOUD: T.Optional[str] = shutil.which("gcloud") DEFAULT_GCLOUD: T.Optional[str] = shutil.which("gcloud")
@ -607,6 +610,7 @@ class IAP:
"""Monitor the thread handling the IAP subprocess until it is 'up'""" """Monitor the thread handling the IAP subprocess until it is 'up'"""
while self.process is not None and self.process.poll() is None: while self.process is not None and self.process.poll() is None:
# pylint: disable=disallowed-name
rlist, _, _ = select.select([self.master_fd], [], [], 0.1) rlist, _, _ = select.select([self.master_fd], [], [], 0.1)
if rlist is not None: if rlist is not None:
try: try:
@ -692,6 +696,7 @@ class Connection(sshconn.Connection):
token_file: T.Optional[str] = self.get_option("gcloud_access_token_file") token_file: T.Optional[str] = self.get_option("gcloud_access_token_file")
config: T.Optional[str] = self.get_option("gcloud_configuration") config: T.Optional[str] = self.get_option("gcloud_configuration")
port: T.Optional[int] = self.get_option("port") port: T.Optional[int] = self.get_option("port")
timeout: T.Optional[int] = self.get_option("timeout")
# this shouldn't happen, but still. # this shouldn't happen, but still.
if host is None: if host is None:
@ -710,9 +715,9 @@ class Connection(sshconn.Connection):
token_file=token_file, token_file=token_file,
) )
success = self.iaps[host].ready.wait(timeout=5) success = self.iaps[host].ready.wait(timeout=timeout)
is_up: bool = False is_up: bool = False
for _ in range(3): for _ in range(3): # pylint: disable=disallowed-name
is_up = self.iaps[host].up is_up = self.iaps[host].up
if success and is_up: if success and is_up:
D.vvv("IAP: TUNNEL IS UP", host=host) D.vvv("IAP: TUNNEL IS UP", host=host)