Module ssh2net

ssh2net network ssh client library

Expand source code
"""ssh2net network ssh client library"""
import logging
from logging import NullHandler

from ssh2net.base import SSH2Net
from ssh2net.channel import SSH2NetChannel
from ssh2net.session import SSH2NetSession
from ssh2net.netmiko_compatibility import connect_handler as ConnectHandler
from ssh2net.ssh_config import SSH2NetSSHConfig
from ssh2net.core.driver import BaseNetworkDriver
from ssh2net.core.cisco_iosxe.driver import IOSXEDriver
from ssh2net.core.cisco_nxos.driver import NXOSDriver
from ssh2net.core.cisco_iosxr.driver import IOSXRDriver
from ssh2net.core.arista_eos.driver import EOSDriver
from ssh2net.core.juniper_junos.driver import JunosDriver

__version__ = "2019.10.27"
__all__ = (
    "SSH2Net",
    "SSH2NetSession",
    "SSH2NetChannel",
    "SSH2NetSSHConfig",
    "ConnectHandler",
    "BaseNetworkDriver",
    "IOSXEDriver",
    "NXOSDriver",
    "IOSXRDriver",
    "EOSDriver",
    "JunosDriver",
)


# Class to filter duplicate log entries for the channel logger
# Stolen from: https://stackoverflow.com/questions/44691558/ \
# suppress-multiple-messages-with-same-content-in-python-logging-module-aka-log-co
class DuplicateFilter(logging.Filter):
    def filter(self, record):
        # Fields to compare to previous log entry if these fields match; skip log entry
        current_log = (record.module, record.levelno, record.msg)
        if current_log != getattr(self, "last_log", None):
            self.last_log = current_log
            return True
        return False


# Setup session logger
session_log = logging.getLogger(f"{__name__}_session")
logging.getLogger(f"{__name__}_session").addHandler(NullHandler())

# Setup channel logger
channel_log = logging.getLogger(f"{__name__}_channel")
# Add duplicate filter to channel log
channel_log.addFilter(DuplicateFilter())
logging.getLogger(f"{__name__}_channel").addHandler(NullHandler())

Sub-modules

ssh2net.base

ssh2net.base

ssh2net.channel

ssh2net.channel

ssh2net.community

ssh2net community platform drivers

ssh2net.core

ssh2net core platform drivers

ssh2net.decorators

ssh2net.decorators

ssh2net.exceptions

ssh2net.exceptions

ssh2net.helper

ssh2net.helper

ssh2net.netmiko_compatibility

ssh2net.netmiko_compatibility

ssh2net.session

ssh2net.session

ssh2net.session_miko

ssh2net.session_miko

ssh2net.session_ssh2

ssh2net.session_ssh2

ssh2net.ssh_config

ssh2net.ssh_config

Functions

def ConnectHandler(auto_open=True, **kwargs)

Convert netmiko style "ConnectHandler" device creation to SSH2Net style

Args

auto_open
auto open connection or not (primarily for testing purposes)
**kwargs
keyword arguments

Returns

driver
SSH2Net connection object for specified device-type

Raises

TypeError
if unsupported netmiko device type is provided
Expand source code
def connect_handler(auto_open=True, **kwargs):
    """
    Convert netmiko style "ConnectHandler" device creation to SSH2Net style

    Args:
        auto_open: auto open connection or not (primarily for testing purposes)
        **kwargs: keyword arguments

    Returns:
        driver: SSH2Net connection object for specified device-type

    Raises:
        TypeError: if unsupported netmiko device type is provided

    """
    if kwargs["device_type"] not in NETMIKO_DEVICE_TYPE_MAPPER.keys():
        raise TypeError(f"Unsupported netmiko device type for ssh2net: {kwargs['device_type']}")

    driver_info = NETMIKO_DEVICE_TYPE_MAPPER.get(kwargs["device_type"])
    driver_class = driver_info["driver"]
    driver_args = driver_info["arg_mapper"]
    kwargs.pop("device_type")

    transformed_kwargs = transform_netmiko_kwargs(kwargs)
    final_kwargs = {**transformed_kwargs, **driver_args}

    driver = driver_class(**final_kwargs)

    if auto_open:
        driver.open_shell()

    return driver

Classes

class BaseNetworkDriver (auth_secondary=None, **kwargs)

Initialize SSH2Net BaseNetworkDriver Object

Args

auth_secondary
password to use for secondary authentication (enable)

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
class BaseNetworkDriver(SSH2Net):
    def __init__(self, auth_secondary: Optional[Union[str]] = None, **kwargs: Dict[str, Any]):
        """
        Initialize SSH2Net BaseNetworkDriver Object

        Args:
            auth_secondary: password to use for secondary authentication (enable)

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa
        """
        self.auth_secondary = auth_secondary
        super().__init__(**kwargs)
        self.privs = PRIVS
        self.default_desired_priv = None
        self.textfsm_platform = None

    def _determine_current_priv(self, current_prompt: str):
        """
        Determine current privilege level from prompt string

        Args:
            current_prompt: string of current prompt

        Returns:
            priv_level: NamedTuple of current privilege level

        Raises:
            UnknownPrivLevel: if privilege level cannot be determined  # noqa
            # darglint raises DAR401 for some reason hence the noqa...

        """
        for priv_level in self.privs.values():
            if re.search(priv_level.pattern, current_prompt):
                return priv_level
        raise UnknownPrivLevel

    def _escalate(self) -> None:
        """
        Escalate to the next privilege level up

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        current_priv = self._determine_current_priv(self.get_prompt())
        if current_priv.escalate:
            if current_priv.escalate_auth:
                self.send_inputs_interact(
                    (
                        current_priv.escalate,
                        current_priv.escalate_prompt,
                        self.auth_enable,
                        self.privs.get("escalate_priv"),
                    ),
                    hidden_response=True,
                )
            else:
                self.send_inputs(current_priv.escalate)

    def _deescalate(self) -> None:
        """
        Deescalate to the next privilege level down

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        current_priv = self._determine_current_priv(self.get_prompt())
        if current_priv.deescalate:
            self.send_inputs(current_priv.deescalate)

    def attain_priv(self, desired_priv) -> None:
        """
        Attain desired priv level

        Args:
            desired_priv: string name of desired privilege level
                (see ssh2net.core.<device_type>.driver for levels)

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        while True:
            current_priv = self._determine_current_priv(self.get_prompt())
            if current_priv == self.privs[desired_priv]:
                return
            if current_priv.level > self.privs[desired_priv].level:
                self._deescalate()
            else:
                self._escalate()

    def send_command(self, commands):
        """
        Send command(s)

        Args:
            commands: string or list of strings to send to device in privilege exec mode

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa
        """
        self.attain_priv(self.default_desired_priv)
        result = self.send_inputs(commands)
        return result

    def send_config_set(self, configs):
        """
        Send configuration(s)

        Args:
            configs: string or list of strings to send to device in config mode

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa
        """
        self.attain_priv("configuration")
        result = self.send_inputs(configs)
        self.attain_priv(self.default_desired_priv)
        return result

    def textfsm_parse_output(self, command: str, output: str) -> str:
        """
        Parse output with TextFSM and ntc-templates

        Args:
            command: command used to get output
            output: output from command

        Returns:
            output: parsed output

        Raises:
            N/A  # noqa
        """
        template = _textfsm_get_template(self.textfsm_platform, command)
        if template:
            output = textfsm_parse(template, output)
        return output

Ancestors

Subclasses

Methods

def attain_priv(self, desired_priv)

Attain desired priv level

Args

desired_priv
string name of desired privilege level (see ssh2net.core..driver for levels)

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
def attain_priv(self, desired_priv) -> None:
    """
    Attain desired priv level

    Args:
        desired_priv: string name of desired privilege level
            (see ssh2net.core.<device_type>.driver for levels)

    Returns:
        N/A  # noqa

    Raises:
        N/A  # noqa

    """
    while True:
        current_priv = self._determine_current_priv(self.get_prompt())
        if current_priv == self.privs[desired_priv]:
            return
        if current_priv.level > self.privs[desired_priv].level:
            self._deescalate()
        else:
            self._escalate()
def send_command(self, commands)

Send command(s)

Args

commands
string or list of strings to send to device in privilege exec mode

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
def send_command(self, commands):
    """
    Send command(s)

    Args:
        commands: string or list of strings to send to device in privilege exec mode

    Returns:
        N/A  # noqa

    Raises:
        N/A  # noqa
    """
    self.attain_priv(self.default_desired_priv)
    result = self.send_inputs(commands)
    return result
def send_config_set(self, configs)

Send configuration(s)

Args

configs
string or list of strings to send to device in config mode

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
def send_config_set(self, configs):
    """
    Send configuration(s)

    Args:
        configs: string or list of strings to send to device in config mode

    Returns:
        N/A  # noqa

    Raises:
        N/A  # noqa
    """
    self.attain_priv("configuration")
    result = self.send_inputs(configs)
    self.attain_priv(self.default_desired_priv)
    return result
def textfsm_parse_output(self, command, output)

Parse output with TextFSM and ntc-templates

Args

command
command used to get output
output
output from command

Returns

output
parsed output

Raises

N/A # noqa
 
Expand source code
def textfsm_parse_output(self, command: str, output: str) -> str:
    """
    Parse output with TextFSM and ntc-templates

    Args:
        command: command used to get output
        output: output from command

    Returns:
        output: parsed output

    Raises:
        N/A  # noqa
    """
    template = _textfsm_get_template(self.textfsm_platform, command)
    if template:
        output = textfsm_parse(template, output)
    return output

Inherited members

class EOSDriver (**kwargs)

Initialize SSH2Net EOSDriver Object

Args

**kwargs
keyword args to pass to inherited class(es)

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
class EOSDriver(BaseNetworkDriver):
    def __init__(self, **kwargs: Dict[str, Any]):
        """
        Initialize SSH2Net EOSDriver Object

        Args:
            **kwargs: keyword args to pass to inherited class(es)

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa
        """
        super().__init__(**kwargs)
        self.privs = PRIVS
        self.default_desired_priv = "privilege_exec"
        self.textfsm_platform = "arista_eos"

Ancestors

Inherited members

class IOSXEDriver (**kwargs)

Initialize SSH2Net IOSXEDriver Object

Args

**kwargs
keyword args to pass to inherited class(es)

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
class IOSXEDriver(BaseNetworkDriver):
    def __init__(self, **kwargs: Dict[str, Any]):
        """
        Initialize SSH2Net IOSXEDriver Object

        Args:
            **kwargs: keyword args to pass to inherited class(es)

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa
        """
        super().__init__(**kwargs)
        self.privs = PRIVS
        self.default_desired_priv = "privilege_exec"
        self.textfsm_platform = "cisco_ios"

Ancestors

Inherited members

class IOSXRDriver (**kwargs)

Initialize SSH2Net IOSXRDriver Object

Args

**kwargs
keyword args to pass to inherited class(es)

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
class IOSXRDriver(BaseNetworkDriver):
    def __init__(self, **kwargs: Dict[str, Any]):
        """
        Initialize SSH2Net IOSXRDriver Object

        Args:
            **kwargs: keyword args to pass to inherited class(es)

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa
        """
        super().__init__(**kwargs)
        self.privs = PRIVS
        self.default_desired_priv = "privilege_exec"
        self.textfsm_platform = "cisco_xr"

Ancestors

Inherited members

class JunosDriver (**kwargs)

Initialize SSH2Net IOSXEDriver Object

Args

**kwargs
keyword args to pass to inherited class(es)

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
class JunosDriver(BaseNetworkDriver):
    def __init__(self, **kwargs: Dict[str, Any]):
        """
        Initialize SSH2Net IOSXEDriver Object

        Args:
            **kwargs: keyword args to pass to inherited class(es)

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa
        """
        super().__init__(**kwargs)
        self.privs = PRIVS
        self.default_desired_priv = "exec"

Ancestors

Inherited members

class NXOSDriver (**kwargs)

Initialize SSH2Net NXOSDriver Object

Args

**kwargs
keyword args to pass to inherited class(es)

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
class NXOSDriver(BaseNetworkDriver):
    def __init__(self, **kwargs: Dict[str, Any]):
        """
        Initialize SSH2Net NXOSDriver Object

        Args:
            **kwargs: keyword args to pass to inherited class(es)

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa
        """
        super().__init__(**kwargs)
        self.privs = PRIVS
        self.default_desired_priv = "privilege_exec"

Ancestors

Inherited members

class SSH2Net (setup_host='', setup_validate_host=False, setup_port=22, setup_timeout=5, setup_ssh_config_file=False, setup_use_paramiko=False, session_timeout=5000, session_keepalive=False, session_keepalive_interval=10, session_keepalive_type='network', session_keepalive_pattern='\x05', auth_user='', auth_password=None, auth_public_key=None, comms_strip_ansi=False, comms_prompt_regex='^[a-z0-9.\\-@()/:]{1,32}[#>$]$', comms_operation_timeout=10, comms_return_char='\n', comms_pre_login_handler='', comms_disable_paging='terminal length 0')

Initialize SSH2Net Object

Setup basic parameters required to connect to devices via ssh. Pay extra attention to the "comms_prompt_regex" as this is highly critical to this tool working well!

Args

setup_host
ip address or hostname to connect to
setup_validate_host
whether or not to validate ip address is valid or dns is resolvable
setup_port
port to open ssh session to
setup_timeout
timeout in seconds for opening underlying socket to host
setup_ssh_config_file
ssh config file to use or True to try system default files
setup_use_paramiko
use paramiko instead of ssh2-python
session_timeout
time in ms for session read operations; 0 is "forever" and will block
session_keepalive
whether or not to try to keep session alive
session_keepalive_interval
interval to use for session keepalives
session_keepalive_type
network|standard – "network" sends actual characters over the channel as "normal" ssh keepalive doesn't keep sessions open. "standard" sends "normal" ssh keepalives via ssh2 library. In both cases a thread is spawned in which the keepalives are sent. This introduces a locking mechanism which in theory will slow things down slightly, however provides the ability to keep the session alive indefinitely.
session_keepalive_pattern
pattern to send to keep network channel alive. Default is u"\005" which is equivalent to "ctrl+e". This pattern moves cursor to end of the line which should be an innocuous pattern. This will only be entered if a lock can be acquired.
auth_user
username to use to connect to host
auth_password
password to use to connect to host
auth_public_key
path to ssh public key to use to connect to host
comms_strip_ansi
whether or not to strip ansi characters from channel
comms_prompt_regex
regex pattern to use for prompt matching. this is the single most important attribute here! if this does not match a prompt, ssh2net will not work! IMPORTANT: regex search uses multi-line + case insensitive flags. multi-line allows for highly reliably matching for prompts after stripping trailing white space, case insensitive is just a convenience factor so i can be lazy.
comms_operation_timeout
timeout in seconds for waiting for channel operations. this is NOT the "read" timeout. this is the timeout for the entire operation sent to send_inputs/send_inputs_interact
comms_return_char
character to use to send returns to host
comms_pre_login_handler
callable or string that resolves to an importable function to handle pre-login (pre disable paging) operations
comms_disable_paging
callable, string that resolves to an importable function, or string to send to device to disable paging

Returns

N/A # noqa
 

Raises

ValueError
in the following situations: - setup_port is not an integer - setup_timeout is not an integer - setup_use_paramiko is not a bool - session_timeout is not an integer - session_keepalive is not a bool - session_keepalive_interval is not an integer - session_keepalive_type is not "network" or "standard" - comms_operation_timeout is not an integer - comms_return_char is not a string
Expand source code
class SSH2Net(SSH2NetSession):
    def __init__(
        self,
        setup_host: str = "",
        setup_validate_host: Optional[bool] = False,
        setup_port: Optional[int] = 22,
        setup_timeout: Optional[int] = 5,
        setup_ssh_config_file: Optional[Union[str, bool]] = False,
        setup_use_paramiko: Optional[bool] = False,
        session_timeout: Optional[int] = 5000,
        session_keepalive: Optional[bool] = False,
        session_keepalive_interval: Optional[int] = 10,
        session_keepalive_type: Optional[str] = "network",
        session_keepalive_pattern: Optional[str] = "\005",
        auth_user: str = "",
        auth_password: Optional[Union[str]] = None,
        auth_public_key: Optional[Union[str]] = None,
        comms_strip_ansi: Optional[bool] = False,
        comms_prompt_regex: Optional[str] = r"^[a-z0-9.\-@()/:]{1,32}[#>$]$",
        comms_operation_timeout: Optional[int] = 10,
        comms_return_char: Optional[str] = "\n",
        comms_pre_login_handler: Optional[Union[str, Callable]] = "",
        comms_disable_paging: Optional[Union[str, Callable]] = "terminal length 0",
    ):
        r"""
        Initialize SSH2Net Object

        Setup basic parameters required to connect to devices via ssh. Pay extra attention
        to the "comms_prompt_regex" as this is highly critical to this tool working well!

        Args:
            setup_host: ip address or hostname to connect to
            setup_validate_host: whether or not to validate ip address is valid or dns is resolvable
            setup_port: port to open ssh session to
            setup_timeout: timeout in seconds for opening underlying socket to host
            setup_ssh_config_file: ssh config file to use or True to try system default files
            setup_use_paramiko: use paramiko instead of ssh2-python
            session_timeout: time in ms for session read operations; 0 is "forever" and will block
            session_keepalive: whether or not to try to keep session alive
            session_keepalive_interval: interval to use for session keepalives
            session_keepalive_type: network|standard -- "network" sends actual characters over the
                channel as "normal" ssh keepalive doesn't keep sessions open. "standard" sends
                "normal" ssh keepalives via ssh2 library. In both cases a thread is spawned in
                which the keepalives are sent. This introduces a locking mechanism which in
                theory will slow things down slightly, however provides the ability to keep the
                session alive indefinitely.
            session_keepalive_pattern: pattern to send to keep network channel alive. Default is
                u"\005" which is equivalent to "ctrl+e". This pattern moves cursor to end of the
                line which should be an innocuous pattern. This will only be entered *if* a lock
                can be acquired.
            auth_user: username to use to connect to host
            auth_password: password to use to connect to host
            auth_public_key: path to ssh public key to use to connect to host
            comms_strip_ansi: whether or not to strip ansi characters from channel
            comms_prompt_regex: regex pattern to use for prompt matching.
                this is the single most important attribute here! if this does not match a prompt,
                ssh2net will not work!
                IMPORTANT: regex search uses multi-line + case insensitive flags. multi-line allows
                for highly reliably matching for prompts after stripping trailing white space,
                case insensitive is just a convenience factor so i can be lazy.
            comms_operation_timeout: timeout in seconds for waiting for channel operations.
                this is NOT the "read" timeout. this is the timeout for the entire operation
                sent to send_inputs/send_inputs_interact
            comms_return_char: character to use to send returns to host
            comms_pre_login_handler: callable or string that resolves to an importable function to
                handle pre-login (pre disable paging) operations
            comms_disable_paging: callable, string that resolves to an importable function, or
                string to send to device to disable paging

        Returns:
            N/A  # noqa

        Raises:
            ValueError: in the following situations:
                - setup_port is not an integer
                - setup_timeout is not an integer
                - setup_use_paramiko is not a bool
                - session_timeout is not an integer
                - session_keepalive is not a bool
                - session_keepalive_interval is not an integer
                - session_keepalive_type is not "network" or "standard"
                - comms_operation_timeout is not an integer
                - comms_return_char is not a string

        """
        # set a flag to indicate if a shell has been invoked
        self._shell: bool = False

        # setup setup args
        self._setup_setup_args(
            setup_host, setup_validate_host, setup_port, setup_timeout, setup_use_paramiko
        )

        # setup session args
        self._setup_session_args(
            session_timeout,
            session_keepalive,
            session_keepalive_interval,
            session_keepalive_type,
            session_keepalive_pattern,
        )

        # auth setup
        self._setup_auth_args(auth_user, auth_public_key, auth_password)

        # comms setup
        self._setup_comms_args(
            comms_strip_ansi,
            comms_prompt_regex,
            comms_operation_timeout,
            comms_return_char,
            comms_pre_login_handler,
            comms_disable_paging,
        )

        if setup_ssh_config_file:
            if isinstance(setup_ssh_config_file, bool) and setup_ssh_config_file:
                setup_ssh_config_file = ""
            self._setup_ssh_config_args(setup_ssh_config_file)

        session_log.info(f"{str(self)}; {repr(self)}")

    def __enter__(self):
        """
        Enter method for context manager

        Args:
            N/A  # noqa

        Returns:
            self: instance of self

        Raises:
            N/A  # noqa

        """
        self.open_shell()
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        """
        Exit method to cleanup for context manager

        Args:
            exception_type: exception type being raised
            exception_value: message from exception being raised
            traceback: traceback from exception being raised

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        self.close()

    def __str__(self):
        """
        Magic str method for SSH2Net class

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        return f"SSH2Net Connection Object for host {self.host}"

    def __repr__(self):
        """
        Magic repr method for SSH2Net class

        Args:
            N/A  # noqa

        Returns:
            repr: repr for class object

        Raises:
            N/A  # noqa

        """
        class_dict = self.__dict__.copy()
        class_dict["auth_password"] = "********"
        return f"SSH2Net {class_dict}"

    def __bool__(self):
        """
        Magic bool method based on result of session_alive

        Args:
            N/A  # noqa

        Returns:
            bool: True/False if session is alive or not

        Raises:
            N/A  # noqa

        """
        return self._session_alive()

    @staticmethod
    def _set_comms_pre_login_handler(
        comms_pre_login_handler: Union[Callable, str]
    ) -> Union[Callable, str]:
        """
        Return comms_pre_login_handler argument

        Args:
            comms_pre_login_handler: callable function, or string representing a path to
                a callable

        Returns:
            comms_pre_login_handler: callable or default empty string value

        Raises:
            ValueError: if provided string does not result in a callable

        """
        if comms_pre_login_handler:
            if callable(comms_pre_login_handler):
                return comms_pre_login_handler
            ext_func = validate_external_function(comms_pre_login_handler)
            if ext_func:
                return ext_func
            session_log.critical(f"Invalid comms_pre_login_handler: {comms_pre_login_handler}")
            raise ValueError(
                f"{comms_pre_login_handler} is an invalid comms_pre_login_handler function "
                "or path to a function."
            )
        return comms_pre_login_handler

    @staticmethod
    def _set_comms_disable_paging(
        comms_disable_paging: Union[Callable, str]
    ) -> Union[Callable, str]:
        """
        Return comms_disable_paging argument

        Args:
            comms_disable_paging: callable function, string representing a path to
                a callable, or a string to send to device to disable paging

        Returns:
            comms_disable_paging: callable or string to use to disable paging

        Raises:
            ValueError: if provided string does not result in a callable

        """
        if comms_disable_paging != "terminal length 0":
            if callable(comms_disable_paging):
                return comms_disable_paging
            ext_func = validate_external_function(comms_disable_paging)
            if ext_func:
                return ext_func
            if isinstance(comms_disable_paging, str):
                return comms_disable_paging
            session_log.critical(f"Invalid comms_disable_paging: {comms_disable_paging}")
            raise ValueError(
                f"{comms_disable_paging} is an invalid comms_disable_paging function, "
                "path to a function, or is not a string."
            )
        return comms_disable_paging

    @staticmethod
    def _invalid_arg_type(target_type, arg_name, arg) -> None:
        """
        Handle invalid argument types for SSH2Net constructor

        Args:
            target_type: expected type of argument
            arg_name: argument name
            arg: value of provided argument

        Returns:
            N/A  # noqa

        Raises:
            ValueError

        """
        session_log.critical(f"Invalid '{arg_name}': {arg}")
        raise TypeError(f"'{arg_name}' must be {target_type}, got: {type(arg)}'")

    def _setup_setup_args(
        self, setup_host, setup_validate_host, setup_port, setup_timeout, setup_use_paramiko
    ) -> None:
        """
        Process and set "setup" args

        Note: setup_ssh_config_file is processed after auth setup

        Args:
            setup_host: ip address or hostname to connect to
            setup_validate_host: whether or not to validate ip address is valid or dns is resolvable
            setup_port: port to open ssh session to
            setup_timeout: timeout in seconds for opening underlying socket to host
            setup_use_paramiko: use paramiko instead of ssh2-python

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        self.host = setup_host.strip()
        if setup_validate_host:
            self._validate_host()
        self.port = int(setup_port)
        self.setup_timeout = int(setup_timeout)
        if isinstance(setup_use_paramiko, bool):
            self.setup_use_paramiko = setup_use_paramiko
        else:
            self._invalid_arg_type(bool, "setup_use_paramiko", setup_use_paramiko)

    def _setup_session_args(
        self,
        session_timeout,
        session_keepalive,
        session_keepalive_interval,
        session_keepalive_type,
        session_keepalive_pattern,
    ) -> None:
        r"""
        Process and set "session" args

        Args:
            session_timeout: time in ms for session read operations; 0 is "forever" and will block
            session_keepalive: whether or not to try to keep session alive
            session_keepalive_interval: interval to use for session keepalives
            session_keepalive_type: network|standard -- "network" sends actual characters over the
                channel as "normal" ssh keepalive doesn't keep sessions open. "standard" sends
                "normal" ssh keepalives via ssh2 library. In both cases a thread is spawned in
                which the keepalives are sent. This introduces a locking mechanism which in
                theory will slow things down slightly, however provides the ability to keep the
                session alive indefinitely.
            session_keepalive_pattern: pattern to send to keep network channel alive. Default is
                u"\005" which is equivalent to "ctrl+e". This pattern moves cursor to end of the
                line which should be an innocuous pattern. This will only be entered *if* a lock
                can be acquired.

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        self.session_timeout = int(session_timeout)
        if isinstance(session_keepalive, bool):
            self.session_keepalive = session_keepalive
        else:
            self._invalid_arg_type(bool, "session_keepalive", session_keepalive)
        self.session_keepalive_interval = int(session_keepalive_interval)
        if session_keepalive_type not in ["network", "standard"]:
            raise ValueError(
                f"{session_keepalive_type} is an invalid session_keepalive_type; must be "
                "'network' or 'standard'."
            )
        self.session_keepalive_type = session_keepalive_type
        self.session_keepalive_pattern = session_keepalive_pattern

    def _setup_auth_args(self, auth_user, auth_public_key, auth_password) -> None:
        """
        Process and set "auth" args

        Args:
            auth_user: username to use to connect to host
            auth_password: password to use to connect to host
            auth_public_key: path to ssh public key to use to connect to host

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        self.auth_user = auth_user.strip()
        if auth_public_key:
            self.auth_public_key = os.path.expanduser(auth_public_key.strip().encode())
        else:
            self.auth_public_key = auth_public_key
        if auth_password:
            self.auth_password = auth_password.strip()
        else:
            self.auth_password = auth_password

    def _setup_comms_args(
        self,
        comms_strip_ansi,
        comms_prompt_regex,
        comms_operation_timeout,
        comms_return_char,
        comms_pre_login_handler,
        comms_disable_paging,
    ):
        """
        Process and set "comms" args

        Args:
            comms_strip_ansi: whether or not to strip ansi characters from channel
            comms_prompt_regex: regex pattern to use for prompt matching.
                this is the single most important attribute here! if this does not match a prompt,
                ssh2net will not work!
                IMPORTANT: regex search uses multi-line + case insensitive flags. multi-line allows
                for highly reliably matching for prompts after stripping trailing white space,
                case insensitive is just a convenience factor so i can be lazy.
            comms_operation_timeout: timeout in seconds for waiting for channel operations.
                this is NOT the "read" timeout. this is the timeout for the entire operation
                sent to send_inputs/send_inputs_interact
            comms_return_char: character to use to send returns to host
            comms_pre_login_handler: callable or string that resolves to an importable function to
                handle pre-login (pre disable paging) operations
            comms_disable_paging: callable, string that resolves to an importable function, or
                string to send to device to disable paging

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        if isinstance(comms_strip_ansi, bool):
            self.comms_strip_ansi = comms_strip_ansi
        else:
            self._invalid_arg_type(bool, "comms_strip_ansi", comms_strip_ansi)

        # try to compile prompt to raise TypeError before opening any connections
        re.compile(comms_prompt_regex, flags=re.M | re.I)
        self.comms_prompt_regex = comms_prompt_regex
        self.comms_operation_timeout = int(comms_operation_timeout)

        # validate that the return character set is a string
        # do this to ensure provided value is a string; this prevents an int being cast to string
        # making it look like things are ok
        if isinstance(comms_return_char, str):
            self.comms_return_char = comms_return_char
        else:
            self._invalid_arg_type(str, "comms_return_char", comms_return_char)
        self.comms_pre_login_handler = self._set_comms_pre_login_handler(comms_pre_login_handler)
        self.comms_disable_paging = self._set_comms_disable_paging(comms_disable_paging)

    def _setup_ssh_config_args(self, setup_ssh_config_file) -> None:
        """
        Set any args from ssh config file to override existing settings

        Args:
            setup_ssh_config_file: string of path to ssh config file, or bool True

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        ssh_config = SSH2NetSSHConfig(setup_ssh_config_file)
        host_config = ssh_config.lookup(self.host)
        if host_config.port:
            self.setup_port = host_config.port
        if host_config.user:
            self.auth_user = host_config.user
        if host_config.identity_file:
            self.auth_public_key = os.path.expanduser(host_config.identity_file.strip().encode())

    """ pre socket setup """  # noqa

    def _validate_host(self) -> None:
        """
        Validate host is valid IP or resolvable DNS name

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            ValidationError: if host is invalid IP and is non resolvable

        """
        try:
            ipaddress.ip_address(self.host)
            return
        except ValueError:
            session_log.info(f"Failed to validate host {self.host} as an ip address")
        try:
            socket.gethostbyname(self.host)
            return
        except socket.gaierror:
            session_log.info(f"Failed to validate host {self.host} as a resolvable dns name")
        raise ValidationError(f"Host {self.host} is not an IP or resolvable DNS name.")

    """ socket setup """  # noqa

    def _socket_alive(self) -> bool:
        """
        Check if underlying socket is alive

        Args:
            N/A  # noqa

        Returns:
            bool True/False if socket is alive

        Raises:
            N/A  # noqa

        """
        try:
            self.sock.send(b"")
            return True
        except OSError:
            # socket is not alive
            session_log.debug(f"Socket to host {self.host} is not alive")
            return False
        except AttributeError:
            # socket never created yet
            session_log.debug(f"Socket to host {self.host} has never been created")
            return False

    def _socket_open(self) -> None:
        """
        Open underlying socket

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            SetupTimeout: if socket connection times out

        """
        if not self._socket_alive():
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.settimeout(self.setup_timeout)
            try:
                self.sock.connect((self.host, self.port))
            except socket.timeout:
                session_log.critical(
                    f"Timed out trying to open socket to {self.host} on port {self.port}"
                )
                raise SetupTimeout(
                    f"Timed out trying to open socket to {self.host} on port {self.port}"
                )
            session_log.debug(f"Socket to host {self.host} opened")

    def _socket_close(self) -> None:
        """
        Close underlying socket

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        if self._socket_alive():
            self.sock.close()
            session_log.debug(f"Socket to host {self.host} closed")

    def close(self) -> None:
        """
        Fully close socket, session, and channel

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        self._channel_close()
        self._session_close()
        self._socket_close()
        session_log.info(f"{str(self)}; Closed")

Ancestors

Subclasses

Methods

def close(self)

Fully close socket, session, and channel

Args

N/A # noqa

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
def close(self) -> None:
    """
    Fully close socket, session, and channel

    Args:
        N/A  # noqa

    Returns:
        N/A  # noqa

    Raises:
        N/A  # noqa

    """
    self._channel_close()
    self._session_close()
    self._socket_close()
    session_log.info(f"{str(self)}; Closed")

Inherited members

class SSH2NetChannel (*args, **kwargs)
Expand source code
class SSH2NetChannel:
    @staticmethod
    def _rstrip_all_lines(output: bytes) -> str:
        """
        Right strip all lines in provided output

        Args:
            output: bytes object to handle

        Returns:
            output: bytes object with each line right stripped

        Raises:
            N/A  # noqa

        """
        output = output.decode("unicode_escape").strip().splitlines()
        output = [line.rstrip() for line in output]
        return "\n".join(output)

    @staticmethod
    def _restructure_output(output: str, strip_prompt: bool = False) -> str:
        """
        Clean up preceding empty lines, and strip prompt if desired

        Args:
            output: list of strings to parse
            strip_prompt: bool True/False whether to strip prompt or not

        Returns:
            output: string of joined output lines

        Raises:
            N/A  # noqa

        """
        output = output.splitlines()
        # purge empty rows before actual output
        for row in output.copy():
            if row == "":
                output = output[1:]
            else:
                break
        # should improve -- simply peels the last line out of the list...
        if strip_prompt:
            output = output[:-1]
        output = "\n".join(output)
        return output

    @staticmethod
    def _strip_ansi(output: bytes) -> bytes:
        """
        Strip ansi characters from bytes string

        Args:
            output: bytes from channel that need ansi stripped

        Returns:
            output: bytes string with ansi stripped

        Raises:
            N/A  # noqa

        """
        ansi_escape_pattern = re.compile(rb"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
        output = re.sub(ansi_escape_pattern, b"", output)
        return output

    @channel_timeout(Timeout)
    def _read_until_input(self, channel_input: str) -> None:
        """
        Read until all input has been entered, then send return.

        strip off everything before and including the command

        Args:
            channel_input: string to write to channel

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        output = b""
        while channel_input.encode() not in output:
            if not self.comms_strip_ansi:
                output += self.channel.read()[1]
            else:
                output += self._strip_ansi(self.channel.read()[1])
        channel_log.debug(f"Read: {repr(output)}")
        # once the input has been fully written to channel; flush it and send return char
        self.channel.flush()
        self.channel.write(self.comms_return_char)
        channel_log.debug(f"Write (sending return character): {repr(self.comms_return_char)}")

    @channel_timeout(Timeout)
    def _read_until_prompt(self, output=None, prompt=None):
        """
        Read the channel until the desired prompt is seen

        Args:
            output: bytes of previously seen output if any
            prompt: string of prompt to look for; refactor to prefer regex

        Returns:
            output: bytes of any channel reads after prompt has been seen

        Raises:
            N/A  # noqa

        """
        if not output:
            output = b""

        # prefer to use regex match where possible; assume pattern is regex if starting with
        # ^ or ending with $ -- this works as we always use multi line search
        if not prompt:
            prompt_pattern = re.compile(self.comms_prompt_regex, flags=re.M | re.I)
            prompt_regex = True
        else:
            if prompt.startswith("^") or prompt.endswith("$"):
                prompt_pattern = re.compile(prompt, flags=re.M | re.I)
                prompt_regex = True
            else:
                prompt_pattern = prompt
                prompt_regex = False

        # disabling session blocking means the while loop will actually iterate
        # without this iteration we can never properly check for prompts
        self.session.set_blocking(False)
        while True:
            if not self.comms_strip_ansi:
                output += self.channel.read()[1]
            else:
                output += self._strip_ansi(self.channel.read()[1])
            channel_log.debug(f"Read: {repr(output)}")
            # we do not need to deal w/ line replacement for the actual output, only for
            # parsing if a prompt-like thing is at the end of the output
            output_copy = output
            output_copy = re.sub("\r", "\n", output_copy.decode("unicode_escape").strip())
            if prompt_regex:
                channel_match = re.search(prompt_pattern, output_copy)
            elif prompt in output_copy:
                channel_match = True
            else:
                channel_match = False
            if channel_match:
                output = self._rstrip_all_lines(output)
                self.session.set_blocking(True)
                return output

    @operation_timeout("comms_operation_timeout")
    def _send_input(self, channel_input: str, strip_prompt: bool):
        """
        Send input to device and return results

        Args:
            channel_input: string input to write to channel
            strip_prompt: bool True/False for whether or not to strip prompt

        Returns:
            output: string of cleaned channel data

        Raises:
            N/A  # noqa

        """
        self._acquire_session_lock()
        session_log.debug(
            f"Attempting to send input: {channel_input}; strip_prompt: {strip_prompt}"
        )
        self.channel.flush()
        self.channel.write(channel_input)
        channel_log.debug(f"Write: {repr(channel_input)}")
        self._read_until_input(channel_input)
        output = self._read_until_prompt()
        self.session_lock.release_lock()
        return self._restructure_output(output, strip_prompt=strip_prompt)

    @operation_timeout("comms_operation_timeout")
    def _send_input_interact(
        self,
        channel_input: str,
        expectation: str,
        response: str,
        finale: str,
        hidden_response: bool = False,
    ) -> str:
        """
        Respond to a single "staged" prompt and return results

        Args:
            channel_input: string input to write to channel
            expectation: string of what to expect from channel
            response: string what to respond to the "expectation"
            finale: string of prompt to look for to know when "done"
            hidden_response: True/False response is hidden (i.e. password input)

        Returns:
            output: string of cleaned channel data

        Raises:
            N/A  # noqa

        """
        self._acquire_session_lock()
        session_log.debug(
            f"Attempting to send input interact: {channel_input}; "
            f"expecting: {expectation}; responding: {response}; "
            f"with a finale: {finale}; hidden_response: {hidden_response}"
        )
        self.channel.flush()
        self.channel.write(channel_input)
        channel_log.debug(f"Write: {repr(channel_input)}")
        self._read_until_input(channel_input)
        output = self._read_until_prompt(prompt=expectation)
        # if response is simply a return; add that so it shows in output
        # likewise if response is "hidden" (i.e. password input), add return
        # otherwise, skip
        if not response:
            output += self.comms_return_char
        elif hidden_response is True:
            output += self.comms_return_char
        self.channel.write(response)
        channel_log.debug(f"Write: {repr(response)}")
        self.channel.write(self.comms_return_char)
        channel_log.debug(f"Write (sending return character): {repr(self.comms_return_char)}")
        output += self._read_until_prompt(prompt=finale)
        self.session_lock.release_lock()
        return self._restructure_output(output)

    def open_and_execute(self, command: str):
        """
        Open ssh channel and execute a command; closes channel when done.

        "one time use" method -- best for running one command then moving on; otherwise
        use "open_shell" instead, though this will likely be substantially faster for "single"
        operations.

        Args:
            command: string input to write to channel

        Returns:
            result: output from command sent over the channel

        Raises:
            N/A  # noqa

        """
        session_log.info(f"Attempting to open channel for command execution")
        if self._shell:
            self._channel_close()
        self._channel_open()
        output = b""
        channel_buff = 1
        session_log.debug(f"Channel open, executing command: {command}")
        self.channel.execute(command)
        while channel_buff > 0:
            try:
                channel_buff, data = self.channel.read()
                output += data
            except SocketRecvError:
                break
        output = self._rstrip_all_lines(output)
        result = self._restructure_output(output)
        self.close()
        session_log.info(f"Command executed, channel closed")
        return result

    def open_shell(self) -> None:
        """
        Open and prepare interactive SSH shell

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        session_log.info(f"Attempting to open interactive shell")
        # open the channel itself
        self._channel_open()
        # invoke a shell on the channel
        self._channel_invoke_shell()
        # pre-login handling if needed for things like wlc
        if self.comms_pre_login_handler:
            self.comms_pre_login_handler(self)
        # send disable paging if needed
        if self.comms_disable_paging:
            if callable(self.comms_disable_paging):
                self.comms_disable_paging(self)
            else:
                self.send_inputs(self.comms_disable_paging)
        self._session_keepalive()
        session_log.info("Interactive shell opened")

    @channel_timeout(Timeout)
    def get_prompt(self) -> bool:
        """
        Read from shell and get the current shell prompt

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        pattern = re.compile(self.comms_prompt_regex, flags=re.M | re.I)
        self.session.set_timeout(1000)
        self.channel.flush()
        self.channel.write(self.comms_return_char)
        channel_log.debug(f"Write (sending return character): {repr(self.comms_return_char)}")
        while True:
            output = self.channel.read()[1].rstrip(b"\\")
            output = output.decode("unicode_escape").strip()
            channel_match = re.search(pattern, output)
            if channel_match:
                self.session.set_timeout(self.session_timeout)
                current_prompt = channel_match.group(0)
                return current_prompt

    def send_inputs(self, inputs, strip_prompt: Optional[bool] = True) -> List[bytes]:
        """
        Primary entry point to send data to devices in shell mode; accept inputs and return results

        Args:
            inputs: list of strings or string of inputs to send to channel
            strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt)

        Returns:
            result: list of output from the input command(s)

        Raises:
            N/A  # noqa

        """
        if isinstance(inputs, str):
            inputs = [inputs]
        results = []
        for channel_input in inputs:
            output = self._send_input(channel_input, strip_prompt)
            results.append(output)
        return results

    def send_inputs_interact(self, inputs, hidden_response=False) -> List[Tuple[str, bytes]]:
        """
        Primary entry point to interact with devices in shell mode; used to handle prompts

        accepts inputs and looks for expected prompt;
        sends the appropriate response, then waits for the "finale"
        returns the results of the interaction

        could be "chained" together to respond to more than a "single" staged prompt

        Args:
            inputs: tuple containing strings representing:
                initial input
                expectation (what should ssh2net expect after input)
                response (response to expectation)
                finale (what should ssh2net expect when "done")
            hidden_response: True/False response is hidden (i.e. password input)

        Returns:
            result: list of output from the input command(s)

        Raises:
            N/A  # noqa

        """
        if isinstance(inputs, tuple):
            inputs = [inputs]
        results = []
        for channel_input, expectation, response, finale in inputs:
            output = self._send_input_interact(
                channel_input, expectation, response, finale, hidden_response
            )
            results.append(output)
        return results

Subclasses

Methods

def get_prompt(self, *args, **kwargs)
Expand source code
def retry_wrapper(self, *args, **kwargs):
    attempt, delay = attempts, starting_delay
    while attempt > 1:
        try:
            return wrapped_func(self, *args, **kwargs)
        except exception_to_check:
            channel_log.info(f"Retrying read operation in {delay} seconds...")
            time.sleep(delay)
            attempt -= 1
            delay *= backoff
    return wrapped_func(self, *args, **kwargs)
def open_and_execute(self, command)

Open ssh channel and execute a command; closes channel when done.

"one time use" method – best for running one command then moving on; otherwise use "open_shell" instead, though this will likely be substantially faster for "single" operations.

Args

command
string input to write to channel

Returns

result
output from command sent over the channel

Raises

N/A # noqa
 
Expand source code
def open_and_execute(self, command: str):
    """
    Open ssh channel and execute a command; closes channel when done.

    "one time use" method -- best for running one command then moving on; otherwise
    use "open_shell" instead, though this will likely be substantially faster for "single"
    operations.

    Args:
        command: string input to write to channel

    Returns:
        result: output from command sent over the channel

    Raises:
        N/A  # noqa

    """
    session_log.info(f"Attempting to open channel for command execution")
    if self._shell:
        self._channel_close()
    self._channel_open()
    output = b""
    channel_buff = 1
    session_log.debug(f"Channel open, executing command: {command}")
    self.channel.execute(command)
    while channel_buff > 0:
        try:
            channel_buff, data = self.channel.read()
            output += data
        except SocketRecvError:
            break
    output = self._rstrip_all_lines(output)
    result = self._restructure_output(output)
    self.close()
    session_log.info(f"Command executed, channel closed")
    return result
def open_shell(self)

Open and prepare interactive SSH shell

Args

N/A # noqa

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
def open_shell(self) -> None:
    """
    Open and prepare interactive SSH shell

    Args:
        N/A  # noqa

    Returns:
        N/A  # noqa

    Raises:
        N/A  # noqa

    """
    session_log.info(f"Attempting to open interactive shell")
    # open the channel itself
    self._channel_open()
    # invoke a shell on the channel
    self._channel_invoke_shell()
    # pre-login handling if needed for things like wlc
    if self.comms_pre_login_handler:
        self.comms_pre_login_handler(self)
    # send disable paging if needed
    if self.comms_disable_paging:
        if callable(self.comms_disable_paging):
            self.comms_disable_paging(self)
        else:
            self.send_inputs(self.comms_disable_paging)
    self._session_keepalive()
    session_log.info("Interactive shell opened")
def send_inputs(self, inputs, strip_prompt=True)

Primary entry point to send data to devices in shell mode; accept inputs and return results

Args

inputs
list of strings or string of inputs to send to channel
strip_prompt
strip prompt or not, defaults to True (yes, strip the prompt)

Returns

result
list of output from the input command(s)

Raises

N/A # noqa
 
Expand source code
def send_inputs(self, inputs, strip_prompt: Optional[bool] = True) -> List[bytes]:
    """
    Primary entry point to send data to devices in shell mode; accept inputs and return results

    Args:
        inputs: list of strings or string of inputs to send to channel
        strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt)

    Returns:
        result: list of output from the input command(s)

    Raises:
        N/A  # noqa

    """
    if isinstance(inputs, str):
        inputs = [inputs]
    results = []
    for channel_input in inputs:
        output = self._send_input(channel_input, strip_prompt)
        results.append(output)
    return results
def send_inputs_interact(self, inputs, hidden_response=False)

Primary entry point to interact with devices in shell mode; used to handle prompts

accepts inputs and looks for expected prompt; sends the appropriate response, then waits for the "finale" returns the results of the interaction

could be "chained" together to respond to more than a "single" staged prompt

Args

inputs
tuple containing strings representing: initial input expectation (what should ssh2net expect after input) response (response to expectation) finale (what should ssh2net expect when "done")
hidden_response
True/False response is hidden (i.e. password input)

Returns

result
list of output from the input command(s)

Raises

N/A # noqa
 
Expand source code
def send_inputs_interact(self, inputs, hidden_response=False) -> List[Tuple[str, bytes]]:
    """
    Primary entry point to interact with devices in shell mode; used to handle prompts

    accepts inputs and looks for expected prompt;
    sends the appropriate response, then waits for the "finale"
    returns the results of the interaction

    could be "chained" together to respond to more than a "single" staged prompt

    Args:
        inputs: tuple containing strings representing:
            initial input
            expectation (what should ssh2net expect after input)
            response (response to expectation)
            finale (what should ssh2net expect when "done")
        hidden_response: True/False response is hidden (i.e. password input)

    Returns:
        result: list of output from the input command(s)

    Raises:
        N/A  # noqa

    """
    if isinstance(inputs, tuple):
        inputs = [inputs]
    results = []
    for channel_input, expectation, response, finale in inputs:
        output = self._send_input_interact(
            channel_input, expectation, response, finale, hidden_response
        )
        results.append(output)
    return results
class SSH2NetSSHConfig (ssh_config_file='')

Initialize SSH2NetSSHConfig Object

Parse OpenSSH config file

Try to load the following data for all entries in config file: Host HostName Port User AddressFamily BindAddress ConnectTimeout IdentitiesOnly IdentityFile KbdInteractiveAuthentication PasswordAuthentication PreferredAuthentications

Args

ssh_config_file
string path to ssh configuration file to use if not provided will try to use users ssh config file in ~/.ssh/config first, then will try /etc/ssh/config_file

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
class SSH2NetSSHConfig:
    def __init__(self, ssh_config_file=""):
        """
        Initialize SSH2NetSSHConfig Object

        Parse OpenSSH config file

        Try to load the following data for all entries in config file:
            Host
            HostName
            Port
            User
            AddressFamily
            BindAddress
            ConnectTimeout
            IdentitiesOnly
            IdentityFile
            KbdInteractiveAuthentication
            PasswordAuthentication
            PreferredAuthentications

        Args:
            ssh_config_file: string path to ssh configuration file to use if not provided
                will try to use users ssh config file in ~/.ssh/config first, then will try
                /etc/ssh/config_file

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        self.ssh_config_file = self._select_config_file(ssh_config_file)
        if self.ssh_config_file:
            with open(self.ssh_config_file, "r") as f:
                self.ssh_config_file = f.read()
            self.hosts = self._parse()
            if not self.hosts:
                self.hosts = None
        else:
            self.hosts = None

    def __str__(self):
        """
        Magic str method for SSH2NetSSHConfig class

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        return "SSH2NetSSHConfig Object"

    def __repr__(self):
        """
        Magic repr method for SSH2NetSSHConfig class

        Args:
            N/A  # noqa

        Returns:
            repr: repr for class object

        Raises:
            N/A  # noqa

        """
        class_dict = self.__dict__.copy()
        del class_dict["ssh_config_file"]
        return f"SSH2NetSSHConfig {class_dict}"

    def __bool__(self):
        """
        Magic bool method; return True if ssh_config_file

        Args:
            N/A  # noqa

        Returns:
            bool: True/False if ssh_config_file

        Raises:
            N/A  # noqa

        """
        if self.ssh_config_file:
            return True
        return False

    @staticmethod
    def _select_config_file(ssh_config_file):
        """
        Select ssh configuration file

        Args:
            ssh_config_file: string representation of ssh config file to try to use

        Returns:
            ssh_config_file: Pathlib path object or None

        Raises:
            N/A  # noqa

        """
        if Path(ssh_config_file).is_file():
            return Path(ssh_config_file)
        if Path(os.path.expanduser("~/.ssh/config")).is_file():
            return Path(os.path.expanduser("~/.ssh/config"))
        if Path("/etc/ssh/ssh_config").is_file():
            return Path("/etc/ssh/ssh_config")
        return None

    @staticmethod
    def _strip_comments(line):
        """
        Strip out comments from ssh config file lines

        Args:
            line: to strip comments from

        Returns:
            line: rejoined ssh config file line after stripping comments

        Raises:
            N/A  # noqa

        """
        line = " ".join(shlex.split(line, comments=True))
        return line

    def _parse(self):
        """
        Parse SSH configuration file

        Args:
            N/A  # noqa

        Returns:
            discovered_hosts: dict of host objects discovered in ssh config file

        Raises:
            N/A  # noqa

        """

        # uncomment next line and handle global patterns (stuff before hosts) at some point
        # global_config_pattern = re.compile(r"^.*?\b(?=host)", flags=re.I | re.S)
        # use word boundaries with a positive lookahead to get everything between the word host
        # need to do this as whitespace/formatting is not really a thing in ssh_config file
        # match host\s to ensure we don't pick up hostname and split things there accidentally
        host_pattern = re.compile(r"\bhost.*?\b(?=host\s|\s+$)", flags=re.I | re.S)
        host_entries = re.findall(host_pattern, self.ssh_config_file)
        discovered_hosts = {}
        if not host_entries:
            return discovered_hosts

        # do we need to add whitespace between match and end of line to ensure we match correctly?
        hosts_pattern = re.compile(r"^\s*host[\s=]+(.*)$", flags=re.I | re.M)
        hostname_pattern = re.compile(r"^\s*hostname[\s=]+([\w.-]*)$", flags=re.I | re.M)
        port_pattern = re.compile(r"^\s*port[\s=]+([\d]*)$", flags=re.I | re.M)
        user_pattern = re.compile(r"^\s*user[\s=]+([\w]*)$", flags=re.I | re.M)
        # address_family_pattern = None
        # bind_address_pattern = None
        # connect_timeout_pattern = None
        identities_only_pattern = re.compile(
            r"^\s*identitiesonly[\s=]+(yes|no)$", flags=re.I | re.M
        )
        identity_file_pattern = re.compile(
            r"^\s*identityfile[\s=]+([\w.\/\@~-]*)$", flags=re.I | re.M
        )
        # keyboard_interactive_pattern = None
        # password_authentication_pattern = None
        # preferred_authentication_pattern = None

        for host_entry in host_entries:
            host = Host()
            host_line = re.search(hosts_pattern, host_entry)
            if host_line:
                host.hosts = self._strip_comments(host_line.groups()[0])
            hostname = re.search(hostname_pattern, host_entry)
            if hostname:
                host.hostname = self._strip_comments(hostname.groups()[0])
            port = re.search(port_pattern, host_entry)
            if port:
                host.port = self._strip_comments(port.groups()[0])
            user = re.search(user_pattern, host_entry)
            if user:
                host.user = self._strip_comments(user.groups()[0])
            # address_family = re.search(user_pattern, host_entry[0])
            # bind_address = re.search(user_pattern, host_entry[0])
            # connect_timeout = re.search(user_pattern, host_entry[0])
            identities_only = re.search(identities_only_pattern, host_entry)
            if identities_only:
                host.identities_only = self._strip_comments(identities_only.groups()[0])
            identity_file = re.search(identity_file_pattern, host_entry)
            if identity_file:
                host.identity_file = self._strip_comments(identity_file.groups()[0])
            # keyboard_interactive = re.search(user_pattern, host_entry[0])
            # password_authentication = re.search(user_pattern, host_entry[0])
            # preferred_authentication = re.search(user_pattern, host_entry[0])
            discovered_hosts[host.hosts] = host
        return discovered_hosts

    def _lookup_fuzzy_match(self, host):
        """
        Look up fuzzy matched hosts

        Get the best match ssh config Host entry for a given host; this allows for using
        the splat and question-mark operators in ssh config file

        Args:
            host: host to lookup in discovered_hosts dict

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        possible_matches = []
        for host_entry in self.hosts.keys():
            host_list = host_entry.split()
            for host_pattern in host_list:
                # replace periods with literal period
                # replace asterisk (match 0 or more things) with appropriate regex
                # replace question mark (match one thing) with appropriate regex
                host_pattern = (
                    host_pattern.replace(".", r"\.").replace("*", r"(.*)").replace("?", r"(.)")
                )
                # compile with case insensitive
                host_pattern = re.compile(host_pattern, flags=re.I)
                result = re.search(host_pattern, host)
                # if we get a result, append it and the original pattern to the possible matches
                if result:
                    possible_matches.append((result, host_entry))
        # initialize a None best match
        best_match = None
        for match in possible_matches:
            if best_match is None:
                best_match = match
            # count how many chars were replaced to get regex to work
            chars_replaced = 0
            for start_char, end_char in match[0].regs[1:]:
                chars_replaced += end_char - start_char
            # count how many chars were replaced to get regex to work on best match
            best_match_chars_replaced = 0
            for start_char, end_char in best_match[0].regs[1:]:
                best_match_chars_replaced += end_char - start_char
            # if match replaced less chars than "best_match" we have a new best match
            if chars_replaced < best_match_chars_replaced:
                best_match = match
        return self.hosts[best_match[1]]

    def lookup(self, host):
        """
        Lookup a given host

        Args:
            host: host to lookup in discovered_hosts dict

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        # return exact 1:1 match if exists
        if host in self.hosts.keys():
            return self.hosts[host]
        # return match if given host is an exact match for a host entry
        for host_entry in self.hosts.keys():
            host_list = host_entry.split()
            if host in host_list:
                return self.hosts[host_entry]
        # otherwise need to select the most correct host entry
        return self._lookup_fuzzy_match(host)

Methods

def lookup(self, host)

Lookup a given host

Args

host
host to lookup in discovered_hosts dict

Returns

N/A # noqa
 

Raises

N/A # noqa
 
Expand source code
def lookup(self, host):
    """
    Lookup a given host

    Args:
        host: host to lookup in discovered_hosts dict

    Returns:
        N/A  # noqa

    Raises:
        N/A  # noqa

    """
    # return exact 1:1 match if exists
    if host in self.hosts.keys():
        return self.hosts[host]
    # return match if given host is an exact match for a host entry
    for host_entry in self.hosts.keys():
        host_list = host_entry.split()
        if host in host_list:
            return self.hosts[host_entry]
    # otherwise need to select the most correct host entry
    return self._lookup_fuzzy_match(host)
class SSH2NetSession (*args, **kwargs)
Expand source code
class SSH2NetSession(SSH2NetChannel):
    def _session_alive(self):
        """
        Check if session is alive and authenticated

        Args:
            N/A  # noqa

        Returns:
            bool True/False session is alive and authenticated

        Raises:
            N/A  # noqa

        """
        try:
            # if authenticated we can assume session is good to go
            return self._session_check_authenticated()
        except AttributeError:
            # session never created yet; there may be other exceptions we need to catch here
            logging.debug(f"Session to host {self.host} has never been created")
            return False

    def _keepalive_thread(self) -> None:
        """
        Attempt to keep sessions alive.

        In the case of "networking" equipment this will try to acquire a session lock and send an
        innocuous character -- such as CTRL+E -- to keep the device "exec-timeout" from expiring.

        For "normal" devices that allow for a standard ssh keepalive, this thread will simply use
        those mechanisms to maintain the session. This will likely break (for "normal" devices) if
        using paramiko for the underlying driver, but has not been tested yet!

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        lock_counter = 0
        last_keepalive = datetime.now()
        if self.session_keepalive_type == "network":
            while True:
                if not self._session_alive():
                    return
                diff = datetime.now() - last_keepalive
                if diff.seconds >= self.session_keepalive_interval:
                    if not self.session_lock.locked():
                        lock_counter = 0
                        self.session_lock.acquire_lock()
                        self.channel.write(self.session_keepalive_pattern)
                        self.session_lock.release_lock()
                        last_keepalive = datetime.now()
                    else:
                        lock_counter += 1
                        if lock_counter >= 3:
                            print(
                                f"Keepalive thread missed {lock_counter} consecutive keepalives..."
                            )
                time.sleep(self.session_keepalive_interval / 10)
        elif self.session_keepalive_type == "standard":
            self.session.keepalive_config(
                want_reply=False, interval=self.session_keepalive_interval
            )
            while True:
                if not self._session_alive():
                    return
                self.session.keepalive_send()
                time.sleep(self.session_keepalive_interval / 10)

    def _session_keepalive(self) -> None:
        """
        Spawn keepalive thread for ssh session

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        if not self.session_keepalive:
            return
        pool = ThreadPoolExecutor()
        pool.submit(self._keepalive_thread)

    def _acquire_session_lock(self) -> None:
        """
        Attempt to acquire session lock

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        while True:
            if not self.session_lock.locked():
                self.session_lock.acquire_lock()
                return

    def _session_open(self) -> None:
        """
        Open SSH session

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        if self.setup_use_paramiko is False:
            ssh2_session_obj = SSH2NetSessionSSH2(self)
            self._session_open_connect = (
                ssh2_session_obj._session_open_connect  # pylint: disable=W0212
            )
            self._session_public_key_auth = (
                ssh2_session_obj._session_public_key_auth  # pylint: disable=W0212
            )
            self._session_password_auth = (
                ssh2_session_obj._session_password_auth  # pylint: disable=W0212
            )
            self._channel_open_driver = (
                ssh2_session_obj._channel_open_driver  # pylint: disable=W0212
            )
            self._channel_invoke_shell = (
                ssh2_session_obj._channel_invoke_shell  # pylint: disable=W0212
            )
        else:
            miko_sesion_obj = SSH2NetSessionParamiko(self)
            self._session_open_connect = (
                miko_sesion_obj._session_open_connect  # pylint: disable=W0212
            )
            self._session_public_key_auth = (
                miko_sesion_obj._session_public_key_auth  # pylint: disable=W0212
            )
            self._session_password_auth = (
                miko_sesion_obj._session_password_auth  # pylint: disable=W0212
            )
            self._channel_open_driver = (
                miko_sesion_obj._channel_open_driver  # pylint: disable=W0212
            )
            self._channel_invoke_shell = (
                miko_sesion_obj._channel_invoke_shell  # pylint: disable=W0212
            )

        if not self._socket_alive():
            self._socket_open()
        if not self._session_alive():
            self._session_open_connect()

        logging.debug(f"Session to host {self.host} opened")
        self.session_lock = Lock()
        if self.auth_public_key:
            self._session_public_key_auth()
            if self._session_alive():
                return
        if self.auth_password:
            self._session_password_auth()
            if self._session_alive():
                return

    def _session_check_authenticated(self) -> bool:
        """
        Check if session is authenticated

        Args:
            N/A  # noqa

        Returns:
            bool: True/False for session authenticated

        Raises:
            N/A  # noqa

        """
        if self.setup_use_paramiko is False:
            return self.session.userauth_authenticated()
        return self.session.is_authenticated()

    def _session_close(self) -> None:
        """
        Close SSH SSH2NetSession

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        if self.session is not None:  # pylint: disable=E0203
            if self.setup_use_paramiko:
                self.session.close()  # pylint: disable=E0203
            else:
                self.session.disconnect()  # pylint: disable=E0203
            self.session = None
            logging.debug(f"Session to host {self.host} closed")

    """ channel setup """  # noqa

    def _channel_alive(self) -> bool:
        """
        Check if channel is alive

        Args:
            N/A  # noqa

        Returns:
            bool True/False channel is alive

        Raises:
            N/A  # noqa

        """
        try:
            if self.channel:
                return True
        except AttributeError:
            # channel not created, or closed
            logging.debug(f"Channel to host {self.host} has never been created")
            return False
        return False

    def _channel_open(self) -> None:
        """
        Open channel

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        if not self._session_alive():
            self._session_open()
        if not self._channel_alive():
            self._channel_open_driver()

    def _channel_close(self) -> None:
        """
        Close channel

        Args:
            N/A  # noqa

        Returns:
            N/A  # noqa

        Raises:
            N/A  # noqa

        """
        if self.channel is not None:  # pylint: disable=E0203
            self.channel.close  # noqa
            self.channel = None
            logging.debug(f"Channel to host {self.host} closed")

Ancestors

Subclasses

Inherited members