Module ssh2net
ssh2net network ssh client library
Expand source code
"""ssh2net network ssh client library"""
import logging
from logging import NullHandler
from ssh2net.base import SSH2Net
from ssh2net.channel import SSH2NetChannel
from ssh2net.session import SSH2NetSession
from ssh2net.netmiko_compatibility import connect_handler as ConnectHandler
from ssh2net.ssh_config import SSH2NetSSHConfig
from ssh2net.core.driver import BaseNetworkDriver
from ssh2net.core.cisco_iosxe.driver import IOSXEDriver
from ssh2net.core.cisco_nxos.driver import NXOSDriver
from ssh2net.core.cisco_iosxr.driver import IOSXRDriver
from ssh2net.core.arista_eos.driver import EOSDriver
from ssh2net.core.juniper_junos.driver import JunosDriver
__version__ = "2019.10.27"
__all__ = (
"SSH2Net",
"SSH2NetSession",
"SSH2NetChannel",
"SSH2NetSSHConfig",
"ConnectHandler",
"BaseNetworkDriver",
"IOSXEDriver",
"NXOSDriver",
"IOSXRDriver",
"EOSDriver",
"JunosDriver",
)
# Class to filter duplicate log entries for the channel logger
# Stolen from: https://stackoverflow.com/questions/44691558/ \
# suppress-multiple-messages-with-same-content-in-python-logging-module-aka-log-co
class DuplicateFilter(logging.Filter):
def filter(self, record):
# Fields to compare to previous log entry if these fields match; skip log entry
current_log = (record.module, record.levelno, record.msg)
if current_log != getattr(self, "last_log", None):
self.last_log = current_log
return True
return False
# Setup session logger
session_log = logging.getLogger(f"{__name__}_session")
logging.getLogger(f"{__name__}_session").addHandler(NullHandler())
# Setup channel logger
channel_log = logging.getLogger(f"{__name__}_channel")
# Add duplicate filter to channel log
channel_log.addFilter(DuplicateFilter())
logging.getLogger(f"{__name__}_channel").addHandler(NullHandler())
Sub-modules
ssh2net.base
-
ssh2net.base
ssh2net.channel
-
ssh2net.channel
ssh2net.community
-
ssh2net community platform drivers
ssh2net.core
-
ssh2net core platform drivers
ssh2net.decorators
-
ssh2net.decorators
ssh2net.exceptions
-
ssh2net.exceptions
ssh2net.helper
-
ssh2net.helper
ssh2net.netmiko_compatibility
-
ssh2net.netmiko_compatibility
ssh2net.session
-
ssh2net.session
ssh2net.session_miko
-
ssh2net.session_miko
ssh2net.session_ssh2
-
ssh2net.session_ssh2
ssh2net.ssh_config
-
ssh2net.ssh_config
Functions
def ConnectHandler(auto_open=True, **kwargs)
-
Convert netmiko style "ConnectHandler" device creation to SSH2Net style
Args
auto_open
- auto open connection or not (primarily for testing purposes)
**kwargs
- keyword arguments
Returns
driver
- SSH2Net connection object for specified device-type
Raises
TypeError
- if unsupported netmiko device type is provided
Expand source code
def connect_handler(auto_open=True, **kwargs): """ Convert netmiko style "ConnectHandler" device creation to SSH2Net style Args: auto_open: auto open connection or not (primarily for testing purposes) **kwargs: keyword arguments Returns: driver: SSH2Net connection object for specified device-type Raises: TypeError: if unsupported netmiko device type is provided """ if kwargs["device_type"] not in NETMIKO_DEVICE_TYPE_MAPPER.keys(): raise TypeError(f"Unsupported netmiko device type for ssh2net: {kwargs['device_type']}") driver_info = NETMIKO_DEVICE_TYPE_MAPPER.get(kwargs["device_type"]) driver_class = driver_info["driver"] driver_args = driver_info["arg_mapper"] kwargs.pop("device_type") transformed_kwargs = transform_netmiko_kwargs(kwargs) final_kwargs = {**transformed_kwargs, **driver_args} driver = driver_class(**final_kwargs) if auto_open: driver.open_shell() return driver
Classes
class BaseNetworkDriver (auth_secondary=None, **kwargs)
-
Initialize SSH2Net BaseNetworkDriver Object
Args
auth_secondary
- password to use for secondary authentication (enable)
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
class BaseNetworkDriver(SSH2Net): def __init__(self, auth_secondary: Optional[Union[str]] = None, **kwargs: Dict[str, Any]): """ Initialize SSH2Net BaseNetworkDriver Object Args: auth_secondary: password to use for secondary authentication (enable) Returns: N/A # noqa Raises: N/A # noqa """ self.auth_secondary = auth_secondary super().__init__(**kwargs) self.privs = PRIVS self.default_desired_priv = None self.textfsm_platform = None def _determine_current_priv(self, current_prompt: str): """ Determine current privilege level from prompt string Args: current_prompt: string of current prompt Returns: priv_level: NamedTuple of current privilege level Raises: UnknownPrivLevel: if privilege level cannot be determined # noqa # darglint raises DAR401 for some reason hence the noqa... """ for priv_level in self.privs.values(): if re.search(priv_level.pattern, current_prompt): return priv_level raise UnknownPrivLevel def _escalate(self) -> None: """ Escalate to the next privilege level up Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ current_priv = self._determine_current_priv(self.get_prompt()) if current_priv.escalate: if current_priv.escalate_auth: self.send_inputs_interact( ( current_priv.escalate, current_priv.escalate_prompt, self.auth_enable, self.privs.get("escalate_priv"), ), hidden_response=True, ) else: self.send_inputs(current_priv.escalate) def _deescalate(self) -> None: """ Deescalate to the next privilege level down Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ current_priv = self._determine_current_priv(self.get_prompt()) if current_priv.deescalate: self.send_inputs(current_priv.deescalate) def attain_priv(self, desired_priv) -> None: """ Attain desired priv level Args: desired_priv: string name of desired privilege level (see ssh2net.core.<device_type>.driver for levels) Returns: N/A # noqa Raises: N/A # noqa """ while True: current_priv = self._determine_current_priv(self.get_prompt()) if current_priv == self.privs[desired_priv]: return if current_priv.level > self.privs[desired_priv].level: self._deescalate() else: self._escalate() def send_command(self, commands): """ Send command(s) Args: commands: string or list of strings to send to device in privilege exec mode Returns: N/A # noqa Raises: N/A # noqa """ self.attain_priv(self.default_desired_priv) result = self.send_inputs(commands) return result def send_config_set(self, configs): """ Send configuration(s) Args: configs: string or list of strings to send to device in config mode Returns: N/A # noqa Raises: N/A # noqa """ self.attain_priv("configuration") result = self.send_inputs(configs) self.attain_priv(self.default_desired_priv) return result def textfsm_parse_output(self, command: str, output: str) -> str: """ Parse output with TextFSM and ntc-templates Args: command: command used to get output output: output from command Returns: output: parsed output Raises: N/A # noqa """ template = _textfsm_get_template(self.textfsm_platform, command) if template: output = textfsm_parse(template, output) return output
Ancestors
Subclasses
Methods
def attain_priv(self, desired_priv)
-
Attain desired priv level
Args
desired_priv
- string name of desired privilege level
(see ssh2net.core.
.driver for levels)
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
def attain_priv(self, desired_priv) -> None: """ Attain desired priv level Args: desired_priv: string name of desired privilege level (see ssh2net.core.<device_type>.driver for levels) Returns: N/A # noqa Raises: N/A # noqa """ while True: current_priv = self._determine_current_priv(self.get_prompt()) if current_priv == self.privs[desired_priv]: return if current_priv.level > self.privs[desired_priv].level: self._deescalate() else: self._escalate()
def send_command(self, commands)
-
Send command(s)
Args
commands
- string or list of strings to send to device in privilege exec mode
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
def send_command(self, commands): """ Send command(s) Args: commands: string or list of strings to send to device in privilege exec mode Returns: N/A # noqa Raises: N/A # noqa """ self.attain_priv(self.default_desired_priv) result = self.send_inputs(commands) return result
def send_config_set(self, configs)
-
Send configuration(s)
Args
configs
- string or list of strings to send to device in config mode
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
def send_config_set(self, configs): """ Send configuration(s) Args: configs: string or list of strings to send to device in config mode Returns: N/A # noqa Raises: N/A # noqa """ self.attain_priv("configuration") result = self.send_inputs(configs) self.attain_priv(self.default_desired_priv) return result
def textfsm_parse_output(self, command, output)
-
Parse output with TextFSM and ntc-templates
Args
command
- command used to get output
output
- output from command
Returns
output
- parsed output
Raises
N
/A
#noqa
Expand source code
def textfsm_parse_output(self, command: str, output: str) -> str: """ Parse output with TextFSM and ntc-templates Args: command: command used to get output output: output from command Returns: output: parsed output Raises: N/A # noqa """ template = _textfsm_get_template(self.textfsm_platform, command) if template: output = textfsm_parse(template, output) return output
Inherited members
class EOSDriver (**kwargs)
-
Initialize SSH2Net EOSDriver Object
Args
**kwargs
- keyword args to pass to inherited class(es)
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
class EOSDriver(BaseNetworkDriver): def __init__(self, **kwargs: Dict[str, Any]): """ Initialize SSH2Net EOSDriver Object Args: **kwargs: keyword args to pass to inherited class(es) Returns: N/A # noqa Raises: N/A # noqa """ super().__init__(**kwargs) self.privs = PRIVS self.default_desired_priv = "privilege_exec" self.textfsm_platform = "arista_eos"
Ancestors
Inherited members
class IOSXEDriver (**kwargs)
-
Initialize SSH2Net IOSXEDriver Object
Args
**kwargs
- keyword args to pass to inherited class(es)
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
class IOSXEDriver(BaseNetworkDriver): def __init__(self, **kwargs: Dict[str, Any]): """ Initialize SSH2Net IOSXEDriver Object Args: **kwargs: keyword args to pass to inherited class(es) Returns: N/A # noqa Raises: N/A # noqa """ super().__init__(**kwargs) self.privs = PRIVS self.default_desired_priv = "privilege_exec" self.textfsm_platform = "cisco_ios"
Ancestors
Inherited members
class IOSXRDriver (**kwargs)
-
Initialize SSH2Net IOSXRDriver Object
Args
**kwargs
- keyword args to pass to inherited class(es)
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
class IOSXRDriver(BaseNetworkDriver): def __init__(self, **kwargs: Dict[str, Any]): """ Initialize SSH2Net IOSXRDriver Object Args: **kwargs: keyword args to pass to inherited class(es) Returns: N/A # noqa Raises: N/A # noqa """ super().__init__(**kwargs) self.privs = PRIVS self.default_desired_priv = "privilege_exec" self.textfsm_platform = "cisco_xr"
Ancestors
Inherited members
class JunosDriver (**kwargs)
-
Initialize SSH2Net IOSXEDriver Object
Args
**kwargs
- keyword args to pass to inherited class(es)
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
class JunosDriver(BaseNetworkDriver): def __init__(self, **kwargs: Dict[str, Any]): """ Initialize SSH2Net IOSXEDriver Object Args: **kwargs: keyword args to pass to inherited class(es) Returns: N/A # noqa Raises: N/A # noqa """ super().__init__(**kwargs) self.privs = PRIVS self.default_desired_priv = "exec"
Ancestors
Inherited members
class NXOSDriver (**kwargs)
-
Initialize SSH2Net NXOSDriver Object
Args
**kwargs
- keyword args to pass to inherited class(es)
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
class NXOSDriver(BaseNetworkDriver): def __init__(self, **kwargs: Dict[str, Any]): """ Initialize SSH2Net NXOSDriver Object Args: **kwargs: keyword args to pass to inherited class(es) Returns: N/A # noqa Raises: N/A # noqa """ super().__init__(**kwargs) self.privs = PRIVS self.default_desired_priv = "privilege_exec"
Ancestors
Inherited members
class SSH2Net (setup_host='', setup_validate_host=False, setup_port=22, setup_timeout=5, setup_ssh_config_file=False, setup_use_paramiko=False, session_timeout=5000, session_keepalive=False, session_keepalive_interval=10, session_keepalive_type='network', session_keepalive_pattern='\x05', auth_user='', auth_password=None, auth_public_key=None, comms_strip_ansi=False, comms_prompt_regex='^[a-z0-9.\\-@()/:]{1,32}[#>$]$', comms_operation_timeout=10, comms_return_char='\n', comms_pre_login_handler='', comms_disable_paging='terminal length 0')
-
Initialize SSH2Net Object
Setup basic parameters required to connect to devices via ssh. Pay extra attention to the "comms_prompt_regex" as this is highly critical to this tool working well!
Args
setup_host
- ip address or hostname to connect to
setup_validate_host
- whether or not to validate ip address is valid or dns is resolvable
setup_port
- port to open ssh session to
setup_timeout
- timeout in seconds for opening underlying socket to host
setup_ssh_config_file
- ssh config file to use or True to try system default files
setup_use_paramiko
- use paramiko instead of ssh2-python
session_timeout
- time in ms for session read operations; 0 is "forever" and will block
session_keepalive
- whether or not to try to keep session alive
session_keepalive_interval
- interval to use for session keepalives
session_keepalive_type
- network|standard – "network" sends actual characters over the channel as "normal" ssh keepalive doesn't keep sessions open. "standard" sends "normal" ssh keepalives via ssh2 library. In both cases a thread is spawned in which the keepalives are sent. This introduces a locking mechanism which in theory will slow things down slightly, however provides the ability to keep the session alive indefinitely.
session_keepalive_pattern
- pattern to send to keep network channel alive. Default is u"\005" which is equivalent to "ctrl+e". This pattern moves cursor to end of the line which should be an innocuous pattern. This will only be entered if a lock can be acquired.
auth_user
- username to use to connect to host
auth_password
- password to use to connect to host
auth_public_key
- path to ssh public key to use to connect to host
comms_strip_ansi
- whether or not to strip ansi characters from channel
comms_prompt_regex
- regex pattern to use for prompt matching. this is the single most important attribute here! if this does not match a prompt, ssh2net will not work! IMPORTANT: regex search uses multi-line + case insensitive flags. multi-line allows for highly reliably matching for prompts after stripping trailing white space, case insensitive is just a convenience factor so i can be lazy.
comms_operation_timeout
- timeout in seconds for waiting for channel operations. this is NOT the "read" timeout. this is the timeout for the entire operation sent to send_inputs/send_inputs_interact
comms_return_char
- character to use to send returns to host
comms_pre_login_handler
- callable or string that resolves to an importable function to handle pre-login (pre disable paging) operations
comms_disable_paging
- callable, string that resolves to an importable function, or string to send to device to disable paging
Returns
N
/A
#noqa
Raises
ValueError
- in the following situations: - setup_port is not an integer - setup_timeout is not an integer - setup_use_paramiko is not a bool - session_timeout is not an integer - session_keepalive is not a bool - session_keepalive_interval is not an integer - session_keepalive_type is not "network" or "standard" - comms_operation_timeout is not an integer - comms_return_char is not a string
Expand source code
class SSH2Net(SSH2NetSession): def __init__( self, setup_host: str = "", setup_validate_host: Optional[bool] = False, setup_port: Optional[int] = 22, setup_timeout: Optional[int] = 5, setup_ssh_config_file: Optional[Union[str, bool]] = False, setup_use_paramiko: Optional[bool] = False, session_timeout: Optional[int] = 5000, session_keepalive: Optional[bool] = False, session_keepalive_interval: Optional[int] = 10, session_keepalive_type: Optional[str] = "network", session_keepalive_pattern: Optional[str] = "\005", auth_user: str = "", auth_password: Optional[Union[str]] = None, auth_public_key: Optional[Union[str]] = None, comms_strip_ansi: Optional[bool] = False, comms_prompt_regex: Optional[str] = r"^[a-z0-9.\-@()/:]{1,32}[#>$]$", comms_operation_timeout: Optional[int] = 10, comms_return_char: Optional[str] = "\n", comms_pre_login_handler: Optional[Union[str, Callable]] = "", comms_disable_paging: Optional[Union[str, Callable]] = "terminal length 0", ): r""" Initialize SSH2Net Object Setup basic parameters required to connect to devices via ssh. Pay extra attention to the "comms_prompt_regex" as this is highly critical to this tool working well! Args: setup_host: ip address or hostname to connect to setup_validate_host: whether or not to validate ip address is valid or dns is resolvable setup_port: port to open ssh session to setup_timeout: timeout in seconds for opening underlying socket to host setup_ssh_config_file: ssh config file to use or True to try system default files setup_use_paramiko: use paramiko instead of ssh2-python session_timeout: time in ms for session read operations; 0 is "forever" and will block session_keepalive: whether or not to try to keep session alive session_keepalive_interval: interval to use for session keepalives session_keepalive_type: network|standard -- "network" sends actual characters over the channel as "normal" ssh keepalive doesn't keep sessions open. "standard" sends "normal" ssh keepalives via ssh2 library. In both cases a thread is spawned in which the keepalives are sent. This introduces a locking mechanism which in theory will slow things down slightly, however provides the ability to keep the session alive indefinitely. session_keepalive_pattern: pattern to send to keep network channel alive. Default is u"\005" which is equivalent to "ctrl+e". This pattern moves cursor to end of the line which should be an innocuous pattern. This will only be entered *if* a lock can be acquired. auth_user: username to use to connect to host auth_password: password to use to connect to host auth_public_key: path to ssh public key to use to connect to host comms_strip_ansi: whether or not to strip ansi characters from channel comms_prompt_regex: regex pattern to use for prompt matching. this is the single most important attribute here! if this does not match a prompt, ssh2net will not work! IMPORTANT: regex search uses multi-line + case insensitive flags. multi-line allows for highly reliably matching for prompts after stripping trailing white space, case insensitive is just a convenience factor so i can be lazy. comms_operation_timeout: timeout in seconds for waiting for channel operations. this is NOT the "read" timeout. this is the timeout for the entire operation sent to send_inputs/send_inputs_interact comms_return_char: character to use to send returns to host comms_pre_login_handler: callable or string that resolves to an importable function to handle pre-login (pre disable paging) operations comms_disable_paging: callable, string that resolves to an importable function, or string to send to device to disable paging Returns: N/A # noqa Raises: ValueError: in the following situations: - setup_port is not an integer - setup_timeout is not an integer - setup_use_paramiko is not a bool - session_timeout is not an integer - session_keepalive is not a bool - session_keepalive_interval is not an integer - session_keepalive_type is not "network" or "standard" - comms_operation_timeout is not an integer - comms_return_char is not a string """ # set a flag to indicate if a shell has been invoked self._shell: bool = False # setup setup args self._setup_setup_args( setup_host, setup_validate_host, setup_port, setup_timeout, setup_use_paramiko ) # setup session args self._setup_session_args( session_timeout, session_keepalive, session_keepalive_interval, session_keepalive_type, session_keepalive_pattern, ) # auth setup self._setup_auth_args(auth_user, auth_public_key, auth_password) # comms setup self._setup_comms_args( comms_strip_ansi, comms_prompt_regex, comms_operation_timeout, comms_return_char, comms_pre_login_handler, comms_disable_paging, ) if setup_ssh_config_file: if isinstance(setup_ssh_config_file, bool) and setup_ssh_config_file: setup_ssh_config_file = "" self._setup_ssh_config_args(setup_ssh_config_file) session_log.info(f"{str(self)}; {repr(self)}") def __enter__(self): """ Enter method for context manager Args: N/A # noqa Returns: self: instance of self Raises: N/A # noqa """ self.open_shell() return self def __exit__(self, exception_type, exception_value, traceback): """ Exit method to cleanup for context manager Args: exception_type: exception type being raised exception_value: message from exception being raised traceback: traceback from exception being raised Returns: N/A # noqa Raises: N/A # noqa """ self.close() def __str__(self): """ Magic str method for SSH2Net class Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ return f"SSH2Net Connection Object for host {self.host}" def __repr__(self): """ Magic repr method for SSH2Net class Args: N/A # noqa Returns: repr: repr for class object Raises: N/A # noqa """ class_dict = self.__dict__.copy() class_dict["auth_password"] = "********" return f"SSH2Net {class_dict}" def __bool__(self): """ Magic bool method based on result of session_alive Args: N/A # noqa Returns: bool: True/False if session is alive or not Raises: N/A # noqa """ return self._session_alive() @staticmethod def _set_comms_pre_login_handler( comms_pre_login_handler: Union[Callable, str] ) -> Union[Callable, str]: """ Return comms_pre_login_handler argument Args: comms_pre_login_handler: callable function, or string representing a path to a callable Returns: comms_pre_login_handler: callable or default empty string value Raises: ValueError: if provided string does not result in a callable """ if comms_pre_login_handler: if callable(comms_pre_login_handler): return comms_pre_login_handler ext_func = validate_external_function(comms_pre_login_handler) if ext_func: return ext_func session_log.critical(f"Invalid comms_pre_login_handler: {comms_pre_login_handler}") raise ValueError( f"{comms_pre_login_handler} is an invalid comms_pre_login_handler function " "or path to a function." ) return comms_pre_login_handler @staticmethod def _set_comms_disable_paging( comms_disable_paging: Union[Callable, str] ) -> Union[Callable, str]: """ Return comms_disable_paging argument Args: comms_disable_paging: callable function, string representing a path to a callable, or a string to send to device to disable paging Returns: comms_disable_paging: callable or string to use to disable paging Raises: ValueError: if provided string does not result in a callable """ if comms_disable_paging != "terminal length 0": if callable(comms_disable_paging): return comms_disable_paging ext_func = validate_external_function(comms_disable_paging) if ext_func: return ext_func if isinstance(comms_disable_paging, str): return comms_disable_paging session_log.critical(f"Invalid comms_disable_paging: {comms_disable_paging}") raise ValueError( f"{comms_disable_paging} is an invalid comms_disable_paging function, " "path to a function, or is not a string." ) return comms_disable_paging @staticmethod def _invalid_arg_type(target_type, arg_name, arg) -> None: """ Handle invalid argument types for SSH2Net constructor Args: target_type: expected type of argument arg_name: argument name arg: value of provided argument Returns: N/A # noqa Raises: ValueError """ session_log.critical(f"Invalid '{arg_name}': {arg}") raise TypeError(f"'{arg_name}' must be {target_type}, got: {type(arg)}'") def _setup_setup_args( self, setup_host, setup_validate_host, setup_port, setup_timeout, setup_use_paramiko ) -> None: """ Process and set "setup" args Note: setup_ssh_config_file is processed after auth setup Args: setup_host: ip address or hostname to connect to setup_validate_host: whether or not to validate ip address is valid or dns is resolvable setup_port: port to open ssh session to setup_timeout: timeout in seconds for opening underlying socket to host setup_use_paramiko: use paramiko instead of ssh2-python Returns: N/A # noqa Raises: N/A # noqa """ self.host = setup_host.strip() if setup_validate_host: self._validate_host() self.port = int(setup_port) self.setup_timeout = int(setup_timeout) if isinstance(setup_use_paramiko, bool): self.setup_use_paramiko = setup_use_paramiko else: self._invalid_arg_type(bool, "setup_use_paramiko", setup_use_paramiko) def _setup_session_args( self, session_timeout, session_keepalive, session_keepalive_interval, session_keepalive_type, session_keepalive_pattern, ) -> None: r""" Process and set "session" args Args: session_timeout: time in ms for session read operations; 0 is "forever" and will block session_keepalive: whether or not to try to keep session alive session_keepalive_interval: interval to use for session keepalives session_keepalive_type: network|standard -- "network" sends actual characters over the channel as "normal" ssh keepalive doesn't keep sessions open. "standard" sends "normal" ssh keepalives via ssh2 library. In both cases a thread is spawned in which the keepalives are sent. This introduces a locking mechanism which in theory will slow things down slightly, however provides the ability to keep the session alive indefinitely. session_keepalive_pattern: pattern to send to keep network channel alive. Default is u"\005" which is equivalent to "ctrl+e". This pattern moves cursor to end of the line which should be an innocuous pattern. This will only be entered *if* a lock can be acquired. Returns: N/A # noqa Raises: N/A # noqa """ self.session_timeout = int(session_timeout) if isinstance(session_keepalive, bool): self.session_keepalive = session_keepalive else: self._invalid_arg_type(bool, "session_keepalive", session_keepalive) self.session_keepalive_interval = int(session_keepalive_interval) if session_keepalive_type not in ["network", "standard"]: raise ValueError( f"{session_keepalive_type} is an invalid session_keepalive_type; must be " "'network' or 'standard'." ) self.session_keepalive_type = session_keepalive_type self.session_keepalive_pattern = session_keepalive_pattern def _setup_auth_args(self, auth_user, auth_public_key, auth_password) -> None: """ Process and set "auth" args Args: auth_user: username to use to connect to host auth_password: password to use to connect to host auth_public_key: path to ssh public key to use to connect to host Returns: N/A # noqa Raises: N/A # noqa """ self.auth_user = auth_user.strip() if auth_public_key: self.auth_public_key = os.path.expanduser(auth_public_key.strip().encode()) else: self.auth_public_key = auth_public_key if auth_password: self.auth_password = auth_password.strip() else: self.auth_password = auth_password def _setup_comms_args( self, comms_strip_ansi, comms_prompt_regex, comms_operation_timeout, comms_return_char, comms_pre_login_handler, comms_disable_paging, ): """ Process and set "comms" args Args: comms_strip_ansi: whether or not to strip ansi characters from channel comms_prompt_regex: regex pattern to use for prompt matching. this is the single most important attribute here! if this does not match a prompt, ssh2net will not work! IMPORTANT: regex search uses multi-line + case insensitive flags. multi-line allows for highly reliably matching for prompts after stripping trailing white space, case insensitive is just a convenience factor so i can be lazy. comms_operation_timeout: timeout in seconds for waiting for channel operations. this is NOT the "read" timeout. this is the timeout for the entire operation sent to send_inputs/send_inputs_interact comms_return_char: character to use to send returns to host comms_pre_login_handler: callable or string that resolves to an importable function to handle pre-login (pre disable paging) operations comms_disable_paging: callable, string that resolves to an importable function, or string to send to device to disable paging Returns: N/A # noqa Raises: N/A # noqa """ if isinstance(comms_strip_ansi, bool): self.comms_strip_ansi = comms_strip_ansi else: self._invalid_arg_type(bool, "comms_strip_ansi", comms_strip_ansi) # try to compile prompt to raise TypeError before opening any connections re.compile(comms_prompt_regex, flags=re.M | re.I) self.comms_prompt_regex = comms_prompt_regex self.comms_operation_timeout = int(comms_operation_timeout) # validate that the return character set is a string # do this to ensure provided value is a string; this prevents an int being cast to string # making it look like things are ok if isinstance(comms_return_char, str): self.comms_return_char = comms_return_char else: self._invalid_arg_type(str, "comms_return_char", comms_return_char) self.comms_pre_login_handler = self._set_comms_pre_login_handler(comms_pre_login_handler) self.comms_disable_paging = self._set_comms_disable_paging(comms_disable_paging) def _setup_ssh_config_args(self, setup_ssh_config_file) -> None: """ Set any args from ssh config file to override existing settings Args: setup_ssh_config_file: string of path to ssh config file, or bool True Returns: N/A # noqa Raises: N/A # noqa """ ssh_config = SSH2NetSSHConfig(setup_ssh_config_file) host_config = ssh_config.lookup(self.host) if host_config.port: self.setup_port = host_config.port if host_config.user: self.auth_user = host_config.user if host_config.identity_file: self.auth_public_key = os.path.expanduser(host_config.identity_file.strip().encode()) """ pre socket setup """ # noqa def _validate_host(self) -> None: """ Validate host is valid IP or resolvable DNS name Args: N/A # noqa Returns: N/A # noqa Raises: ValidationError: if host is invalid IP and is non resolvable """ try: ipaddress.ip_address(self.host) return except ValueError: session_log.info(f"Failed to validate host {self.host} as an ip address") try: socket.gethostbyname(self.host) return except socket.gaierror: session_log.info(f"Failed to validate host {self.host} as a resolvable dns name") raise ValidationError(f"Host {self.host} is not an IP or resolvable DNS name.") """ socket setup """ # noqa def _socket_alive(self) -> bool: """ Check if underlying socket is alive Args: N/A # noqa Returns: bool True/False if socket is alive Raises: N/A # noqa """ try: self.sock.send(b"") return True except OSError: # socket is not alive session_log.debug(f"Socket to host {self.host} is not alive") return False except AttributeError: # socket never created yet session_log.debug(f"Socket to host {self.host} has never been created") return False def _socket_open(self) -> None: """ Open underlying socket Args: N/A # noqa Returns: N/A # noqa Raises: SetupTimeout: if socket connection times out """ if not self._socket_alive(): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(self.setup_timeout) try: self.sock.connect((self.host, self.port)) except socket.timeout: session_log.critical( f"Timed out trying to open socket to {self.host} on port {self.port}" ) raise SetupTimeout( f"Timed out trying to open socket to {self.host} on port {self.port}" ) session_log.debug(f"Socket to host {self.host} opened") def _socket_close(self) -> None: """ Close underlying socket Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ if self._socket_alive(): self.sock.close() session_log.debug(f"Socket to host {self.host} closed") def close(self) -> None: """ Fully close socket, session, and channel Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ self._channel_close() self._session_close() self._socket_close() session_log.info(f"{str(self)}; Closed")
Ancestors
Subclasses
Methods
def close(self)
-
Fully close socket, session, and channel
Args
N/A # noqa
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
def close(self) -> None: """ Fully close socket, session, and channel Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ self._channel_close() self._session_close() self._socket_close() session_log.info(f"{str(self)}; Closed")
Inherited members
class SSH2NetChannel (*args, **kwargs)
-
Expand source code
class SSH2NetChannel: @staticmethod def _rstrip_all_lines(output: bytes) -> str: """ Right strip all lines in provided output Args: output: bytes object to handle Returns: output: bytes object with each line right stripped Raises: N/A # noqa """ output = output.decode("unicode_escape").strip().splitlines() output = [line.rstrip() for line in output] return "\n".join(output) @staticmethod def _restructure_output(output: str, strip_prompt: bool = False) -> str: """ Clean up preceding empty lines, and strip prompt if desired Args: output: list of strings to parse strip_prompt: bool True/False whether to strip prompt or not Returns: output: string of joined output lines Raises: N/A # noqa """ output = output.splitlines() # purge empty rows before actual output for row in output.copy(): if row == "": output = output[1:] else: break # should improve -- simply peels the last line out of the list... if strip_prompt: output = output[:-1] output = "\n".join(output) return output @staticmethod def _strip_ansi(output: bytes) -> bytes: """ Strip ansi characters from bytes string Args: output: bytes from channel that need ansi stripped Returns: output: bytes string with ansi stripped Raises: N/A # noqa """ ansi_escape_pattern = re.compile(rb"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") output = re.sub(ansi_escape_pattern, b"", output) return output @channel_timeout(Timeout) def _read_until_input(self, channel_input: str) -> None: """ Read until all input has been entered, then send return. strip off everything before and including the command Args: channel_input: string to write to channel Returns: N/A # noqa Raises: N/A # noqa """ output = b"" while channel_input.encode() not in output: if not self.comms_strip_ansi: output += self.channel.read()[1] else: output += self._strip_ansi(self.channel.read()[1]) channel_log.debug(f"Read: {repr(output)}") # once the input has been fully written to channel; flush it and send return char self.channel.flush() self.channel.write(self.comms_return_char) channel_log.debug(f"Write (sending return character): {repr(self.comms_return_char)}") @channel_timeout(Timeout) def _read_until_prompt(self, output=None, prompt=None): """ Read the channel until the desired prompt is seen Args: output: bytes of previously seen output if any prompt: string of prompt to look for; refactor to prefer regex Returns: output: bytes of any channel reads after prompt has been seen Raises: N/A # noqa """ if not output: output = b"" # prefer to use regex match where possible; assume pattern is regex if starting with # ^ or ending with $ -- this works as we always use multi line search if not prompt: prompt_pattern = re.compile(self.comms_prompt_regex, flags=re.M | re.I) prompt_regex = True else: if prompt.startswith("^") or prompt.endswith("$"): prompt_pattern = re.compile(prompt, flags=re.M | re.I) prompt_regex = True else: prompt_pattern = prompt prompt_regex = False # disabling session blocking means the while loop will actually iterate # without this iteration we can never properly check for prompts self.session.set_blocking(False) while True: if not self.comms_strip_ansi: output += self.channel.read()[1] else: output += self._strip_ansi(self.channel.read()[1]) channel_log.debug(f"Read: {repr(output)}") # we do not need to deal w/ line replacement for the actual output, only for # parsing if a prompt-like thing is at the end of the output output_copy = output output_copy = re.sub("\r", "\n", output_copy.decode("unicode_escape").strip()) if prompt_regex: channel_match = re.search(prompt_pattern, output_copy) elif prompt in output_copy: channel_match = True else: channel_match = False if channel_match: output = self._rstrip_all_lines(output) self.session.set_blocking(True) return output @operation_timeout("comms_operation_timeout") def _send_input(self, channel_input: str, strip_prompt: bool): """ Send input to device and return results Args: channel_input: string input to write to channel strip_prompt: bool True/False for whether or not to strip prompt Returns: output: string of cleaned channel data Raises: N/A # noqa """ self._acquire_session_lock() session_log.debug( f"Attempting to send input: {channel_input}; strip_prompt: {strip_prompt}" ) self.channel.flush() self.channel.write(channel_input) channel_log.debug(f"Write: {repr(channel_input)}") self._read_until_input(channel_input) output = self._read_until_prompt() self.session_lock.release_lock() return self._restructure_output(output, strip_prompt=strip_prompt) @operation_timeout("comms_operation_timeout") def _send_input_interact( self, channel_input: str, expectation: str, response: str, finale: str, hidden_response: bool = False, ) -> str: """ Respond to a single "staged" prompt and return results Args: channel_input: string input to write to channel expectation: string of what to expect from channel response: string what to respond to the "expectation" finale: string of prompt to look for to know when "done" hidden_response: True/False response is hidden (i.e. password input) Returns: output: string of cleaned channel data Raises: N/A # noqa """ self._acquire_session_lock() session_log.debug( f"Attempting to send input interact: {channel_input}; " f"expecting: {expectation}; responding: {response}; " f"with a finale: {finale}; hidden_response: {hidden_response}" ) self.channel.flush() self.channel.write(channel_input) channel_log.debug(f"Write: {repr(channel_input)}") self._read_until_input(channel_input) output = self._read_until_prompt(prompt=expectation) # if response is simply a return; add that so it shows in output # likewise if response is "hidden" (i.e. password input), add return # otherwise, skip if not response: output += self.comms_return_char elif hidden_response is True: output += self.comms_return_char self.channel.write(response) channel_log.debug(f"Write: {repr(response)}") self.channel.write(self.comms_return_char) channel_log.debug(f"Write (sending return character): {repr(self.comms_return_char)}") output += self._read_until_prompt(prompt=finale) self.session_lock.release_lock() return self._restructure_output(output) def open_and_execute(self, command: str): """ Open ssh channel and execute a command; closes channel when done. "one time use" method -- best for running one command then moving on; otherwise use "open_shell" instead, though this will likely be substantially faster for "single" operations. Args: command: string input to write to channel Returns: result: output from command sent over the channel Raises: N/A # noqa """ session_log.info(f"Attempting to open channel for command execution") if self._shell: self._channel_close() self._channel_open() output = b"" channel_buff = 1 session_log.debug(f"Channel open, executing command: {command}") self.channel.execute(command) while channel_buff > 0: try: channel_buff, data = self.channel.read() output += data except SocketRecvError: break output = self._rstrip_all_lines(output) result = self._restructure_output(output) self.close() session_log.info(f"Command executed, channel closed") return result def open_shell(self) -> None: """ Open and prepare interactive SSH shell Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ session_log.info(f"Attempting to open interactive shell") # open the channel itself self._channel_open() # invoke a shell on the channel self._channel_invoke_shell() # pre-login handling if needed for things like wlc if self.comms_pre_login_handler: self.comms_pre_login_handler(self) # send disable paging if needed if self.comms_disable_paging: if callable(self.comms_disable_paging): self.comms_disable_paging(self) else: self.send_inputs(self.comms_disable_paging) self._session_keepalive() session_log.info("Interactive shell opened") @channel_timeout(Timeout) def get_prompt(self) -> bool: """ Read from shell and get the current shell prompt Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ pattern = re.compile(self.comms_prompt_regex, flags=re.M | re.I) self.session.set_timeout(1000) self.channel.flush() self.channel.write(self.comms_return_char) channel_log.debug(f"Write (sending return character): {repr(self.comms_return_char)}") while True: output = self.channel.read()[1].rstrip(b"\\") output = output.decode("unicode_escape").strip() channel_match = re.search(pattern, output) if channel_match: self.session.set_timeout(self.session_timeout) current_prompt = channel_match.group(0) return current_prompt def send_inputs(self, inputs, strip_prompt: Optional[bool] = True) -> List[bytes]: """ Primary entry point to send data to devices in shell mode; accept inputs and return results Args: inputs: list of strings or string of inputs to send to channel strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt) Returns: result: list of output from the input command(s) Raises: N/A # noqa """ if isinstance(inputs, str): inputs = [inputs] results = [] for channel_input in inputs: output = self._send_input(channel_input, strip_prompt) results.append(output) return results def send_inputs_interact(self, inputs, hidden_response=False) -> List[Tuple[str, bytes]]: """ Primary entry point to interact with devices in shell mode; used to handle prompts accepts inputs and looks for expected prompt; sends the appropriate response, then waits for the "finale" returns the results of the interaction could be "chained" together to respond to more than a "single" staged prompt Args: inputs: tuple containing strings representing: initial input expectation (what should ssh2net expect after input) response (response to expectation) finale (what should ssh2net expect when "done") hidden_response: True/False response is hidden (i.e. password input) Returns: result: list of output from the input command(s) Raises: N/A # noqa """ if isinstance(inputs, tuple): inputs = [inputs] results = [] for channel_input, expectation, response, finale in inputs: output = self._send_input_interact( channel_input, expectation, response, finale, hidden_response ) results.append(output) return results
Subclasses
Methods
def get_prompt(self, *args, **kwargs)
-
Expand source code
def retry_wrapper(self, *args, **kwargs): attempt, delay = attempts, starting_delay while attempt > 1: try: return wrapped_func(self, *args, **kwargs) except exception_to_check: channel_log.info(f"Retrying read operation in {delay} seconds...") time.sleep(delay) attempt -= 1 delay *= backoff return wrapped_func(self, *args, **kwargs)
def open_and_execute(self, command)
-
Open ssh channel and execute a command; closes channel when done.
"one time use" method – best for running one command then moving on; otherwise use "open_shell" instead, though this will likely be substantially faster for "single" operations.
Args
command
- string input to write to channel
Returns
result
- output from command sent over the channel
Raises
N
/A
#noqa
Expand source code
def open_and_execute(self, command: str): """ Open ssh channel and execute a command; closes channel when done. "one time use" method -- best for running one command then moving on; otherwise use "open_shell" instead, though this will likely be substantially faster for "single" operations. Args: command: string input to write to channel Returns: result: output from command sent over the channel Raises: N/A # noqa """ session_log.info(f"Attempting to open channel for command execution") if self._shell: self._channel_close() self._channel_open() output = b"" channel_buff = 1 session_log.debug(f"Channel open, executing command: {command}") self.channel.execute(command) while channel_buff > 0: try: channel_buff, data = self.channel.read() output += data except SocketRecvError: break output = self._rstrip_all_lines(output) result = self._restructure_output(output) self.close() session_log.info(f"Command executed, channel closed") return result
def open_shell(self)
-
Open and prepare interactive SSH shell
Args
N/A # noqa
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
def open_shell(self) -> None: """ Open and prepare interactive SSH shell Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ session_log.info(f"Attempting to open interactive shell") # open the channel itself self._channel_open() # invoke a shell on the channel self._channel_invoke_shell() # pre-login handling if needed for things like wlc if self.comms_pre_login_handler: self.comms_pre_login_handler(self) # send disable paging if needed if self.comms_disable_paging: if callable(self.comms_disable_paging): self.comms_disable_paging(self) else: self.send_inputs(self.comms_disable_paging) self._session_keepalive() session_log.info("Interactive shell opened")
def send_inputs(self, inputs, strip_prompt=True)
-
Primary entry point to send data to devices in shell mode; accept inputs and return results
Args
inputs
- list of strings or string of inputs to send to channel
strip_prompt
- strip prompt or not, defaults to True (yes, strip the prompt)
Returns
result
- list of output from the input command(s)
Raises
N
/A
#noqa
Expand source code
def send_inputs(self, inputs, strip_prompt: Optional[bool] = True) -> List[bytes]: """ Primary entry point to send data to devices in shell mode; accept inputs and return results Args: inputs: list of strings or string of inputs to send to channel strip_prompt: strip prompt or not, defaults to True (yes, strip the prompt) Returns: result: list of output from the input command(s) Raises: N/A # noqa """ if isinstance(inputs, str): inputs = [inputs] results = [] for channel_input in inputs: output = self._send_input(channel_input, strip_prompt) results.append(output) return results
def send_inputs_interact(self, inputs, hidden_response=False)
-
Primary entry point to interact with devices in shell mode; used to handle prompts
accepts inputs and looks for expected prompt; sends the appropriate response, then waits for the "finale" returns the results of the interaction
could be "chained" together to respond to more than a "single" staged prompt
Args
inputs
- tuple containing strings representing: initial input expectation (what should ssh2net expect after input) response (response to expectation) finale (what should ssh2net expect when "done")
hidden_response
- True/False response is hidden (i.e. password input)
Returns
result
- list of output from the input command(s)
Raises
N
/A
#noqa
Expand source code
def send_inputs_interact(self, inputs, hidden_response=False) -> List[Tuple[str, bytes]]: """ Primary entry point to interact with devices in shell mode; used to handle prompts accepts inputs and looks for expected prompt; sends the appropriate response, then waits for the "finale" returns the results of the interaction could be "chained" together to respond to more than a "single" staged prompt Args: inputs: tuple containing strings representing: initial input expectation (what should ssh2net expect after input) response (response to expectation) finale (what should ssh2net expect when "done") hidden_response: True/False response is hidden (i.e. password input) Returns: result: list of output from the input command(s) Raises: N/A # noqa """ if isinstance(inputs, tuple): inputs = [inputs] results = [] for channel_input, expectation, response, finale in inputs: output = self._send_input_interact( channel_input, expectation, response, finale, hidden_response ) results.append(output) return results
class SSH2NetSSHConfig (ssh_config_file='')
-
Initialize SSH2NetSSHConfig Object
Parse OpenSSH config file
Try to load the following data for all entries in config file: Host HostName Port User AddressFamily BindAddress ConnectTimeout IdentitiesOnly IdentityFile KbdInteractiveAuthentication PasswordAuthentication PreferredAuthentications
Args
ssh_config_file
- string path to ssh configuration file to use if not provided will try to use users ssh config file in ~/.ssh/config first, then will try /etc/ssh/config_file
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
class SSH2NetSSHConfig: def __init__(self, ssh_config_file=""): """ Initialize SSH2NetSSHConfig Object Parse OpenSSH config file Try to load the following data for all entries in config file: Host HostName Port User AddressFamily BindAddress ConnectTimeout IdentitiesOnly IdentityFile KbdInteractiveAuthentication PasswordAuthentication PreferredAuthentications Args: ssh_config_file: string path to ssh configuration file to use if not provided will try to use users ssh config file in ~/.ssh/config first, then will try /etc/ssh/config_file Returns: N/A # noqa Raises: N/A # noqa """ self.ssh_config_file = self._select_config_file(ssh_config_file) if self.ssh_config_file: with open(self.ssh_config_file, "r") as f: self.ssh_config_file = f.read() self.hosts = self._parse() if not self.hosts: self.hosts = None else: self.hosts = None def __str__(self): """ Magic str method for SSH2NetSSHConfig class Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ return "SSH2NetSSHConfig Object" def __repr__(self): """ Magic repr method for SSH2NetSSHConfig class Args: N/A # noqa Returns: repr: repr for class object Raises: N/A # noqa """ class_dict = self.__dict__.copy() del class_dict["ssh_config_file"] return f"SSH2NetSSHConfig {class_dict}" def __bool__(self): """ Magic bool method; return True if ssh_config_file Args: N/A # noqa Returns: bool: True/False if ssh_config_file Raises: N/A # noqa """ if self.ssh_config_file: return True return False @staticmethod def _select_config_file(ssh_config_file): """ Select ssh configuration file Args: ssh_config_file: string representation of ssh config file to try to use Returns: ssh_config_file: Pathlib path object or None Raises: N/A # noqa """ if Path(ssh_config_file).is_file(): return Path(ssh_config_file) if Path(os.path.expanduser("~/.ssh/config")).is_file(): return Path(os.path.expanduser("~/.ssh/config")) if Path("/etc/ssh/ssh_config").is_file(): return Path("/etc/ssh/ssh_config") return None @staticmethod def _strip_comments(line): """ Strip out comments from ssh config file lines Args: line: to strip comments from Returns: line: rejoined ssh config file line after stripping comments Raises: N/A # noqa """ line = " ".join(shlex.split(line, comments=True)) return line def _parse(self): """ Parse SSH configuration file Args: N/A # noqa Returns: discovered_hosts: dict of host objects discovered in ssh config file Raises: N/A # noqa """ # uncomment next line and handle global patterns (stuff before hosts) at some point # global_config_pattern = re.compile(r"^.*?\b(?=host)", flags=re.I | re.S) # use word boundaries with a positive lookahead to get everything between the word host # need to do this as whitespace/formatting is not really a thing in ssh_config file # match host\s to ensure we don't pick up hostname and split things there accidentally host_pattern = re.compile(r"\bhost.*?\b(?=host\s|\s+$)", flags=re.I | re.S) host_entries = re.findall(host_pattern, self.ssh_config_file) discovered_hosts = {} if not host_entries: return discovered_hosts # do we need to add whitespace between match and end of line to ensure we match correctly? hosts_pattern = re.compile(r"^\s*host[\s=]+(.*)$", flags=re.I | re.M) hostname_pattern = re.compile(r"^\s*hostname[\s=]+([\w.-]*)$", flags=re.I | re.M) port_pattern = re.compile(r"^\s*port[\s=]+([\d]*)$", flags=re.I | re.M) user_pattern = re.compile(r"^\s*user[\s=]+([\w]*)$", flags=re.I | re.M) # address_family_pattern = None # bind_address_pattern = None # connect_timeout_pattern = None identities_only_pattern = re.compile( r"^\s*identitiesonly[\s=]+(yes|no)$", flags=re.I | re.M ) identity_file_pattern = re.compile( r"^\s*identityfile[\s=]+([\w.\/\@~-]*)$", flags=re.I | re.M ) # keyboard_interactive_pattern = None # password_authentication_pattern = None # preferred_authentication_pattern = None for host_entry in host_entries: host = Host() host_line = re.search(hosts_pattern, host_entry) if host_line: host.hosts = self._strip_comments(host_line.groups()[0]) hostname = re.search(hostname_pattern, host_entry) if hostname: host.hostname = self._strip_comments(hostname.groups()[0]) port = re.search(port_pattern, host_entry) if port: host.port = self._strip_comments(port.groups()[0]) user = re.search(user_pattern, host_entry) if user: host.user = self._strip_comments(user.groups()[0]) # address_family = re.search(user_pattern, host_entry[0]) # bind_address = re.search(user_pattern, host_entry[0]) # connect_timeout = re.search(user_pattern, host_entry[0]) identities_only = re.search(identities_only_pattern, host_entry) if identities_only: host.identities_only = self._strip_comments(identities_only.groups()[0]) identity_file = re.search(identity_file_pattern, host_entry) if identity_file: host.identity_file = self._strip_comments(identity_file.groups()[0]) # keyboard_interactive = re.search(user_pattern, host_entry[0]) # password_authentication = re.search(user_pattern, host_entry[0]) # preferred_authentication = re.search(user_pattern, host_entry[0]) discovered_hosts[host.hosts] = host return discovered_hosts def _lookup_fuzzy_match(self, host): """ Look up fuzzy matched hosts Get the best match ssh config Host entry for a given host; this allows for using the splat and question-mark operators in ssh config file Args: host: host to lookup in discovered_hosts dict Returns: N/A # noqa Raises: N/A # noqa """ possible_matches = [] for host_entry in self.hosts.keys(): host_list = host_entry.split() for host_pattern in host_list: # replace periods with literal period # replace asterisk (match 0 or more things) with appropriate regex # replace question mark (match one thing) with appropriate regex host_pattern = ( host_pattern.replace(".", r"\.").replace("*", r"(.*)").replace("?", r"(.)") ) # compile with case insensitive host_pattern = re.compile(host_pattern, flags=re.I) result = re.search(host_pattern, host) # if we get a result, append it and the original pattern to the possible matches if result: possible_matches.append((result, host_entry)) # initialize a None best match best_match = None for match in possible_matches: if best_match is None: best_match = match # count how many chars were replaced to get regex to work chars_replaced = 0 for start_char, end_char in match[0].regs[1:]: chars_replaced += end_char - start_char # count how many chars were replaced to get regex to work on best match best_match_chars_replaced = 0 for start_char, end_char in best_match[0].regs[1:]: best_match_chars_replaced += end_char - start_char # if match replaced less chars than "best_match" we have a new best match if chars_replaced < best_match_chars_replaced: best_match = match return self.hosts[best_match[1]] def lookup(self, host): """ Lookup a given host Args: host: host to lookup in discovered_hosts dict Returns: N/A # noqa Raises: N/A # noqa """ # return exact 1:1 match if exists if host in self.hosts.keys(): return self.hosts[host] # return match if given host is an exact match for a host entry for host_entry in self.hosts.keys(): host_list = host_entry.split() if host in host_list: return self.hosts[host_entry] # otherwise need to select the most correct host entry return self._lookup_fuzzy_match(host)
Methods
def lookup(self, host)
-
Lookup a given host
Args
host
- host to lookup in discovered_hosts dict
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
def lookup(self, host): """ Lookup a given host Args: host: host to lookup in discovered_hosts dict Returns: N/A # noqa Raises: N/A # noqa """ # return exact 1:1 match if exists if host in self.hosts.keys(): return self.hosts[host] # return match if given host is an exact match for a host entry for host_entry in self.hosts.keys(): host_list = host_entry.split() if host in host_list: return self.hosts[host_entry] # otherwise need to select the most correct host entry return self._lookup_fuzzy_match(host)
class SSH2NetSession (*args, **kwargs)
-
Expand source code
class SSH2NetSession(SSH2NetChannel): def _session_alive(self): """ Check if session is alive and authenticated Args: N/A # noqa Returns: bool True/False session is alive and authenticated Raises: N/A # noqa """ try: # if authenticated we can assume session is good to go return self._session_check_authenticated() except AttributeError: # session never created yet; there may be other exceptions we need to catch here logging.debug(f"Session to host {self.host} has never been created") return False def _keepalive_thread(self) -> None: """ Attempt to keep sessions alive. In the case of "networking" equipment this will try to acquire a session lock and send an innocuous character -- such as CTRL+E -- to keep the device "exec-timeout" from expiring. For "normal" devices that allow for a standard ssh keepalive, this thread will simply use those mechanisms to maintain the session. This will likely break (for "normal" devices) if using paramiko for the underlying driver, but has not been tested yet! Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ lock_counter = 0 last_keepalive = datetime.now() if self.session_keepalive_type == "network": while True: if not self._session_alive(): return diff = datetime.now() - last_keepalive if diff.seconds >= self.session_keepalive_interval: if not self.session_lock.locked(): lock_counter = 0 self.session_lock.acquire_lock() self.channel.write(self.session_keepalive_pattern) self.session_lock.release_lock() last_keepalive = datetime.now() else: lock_counter += 1 if lock_counter >= 3: print( f"Keepalive thread missed {lock_counter} consecutive keepalives..." ) time.sleep(self.session_keepalive_interval / 10) elif self.session_keepalive_type == "standard": self.session.keepalive_config( want_reply=False, interval=self.session_keepalive_interval ) while True: if not self._session_alive(): return self.session.keepalive_send() time.sleep(self.session_keepalive_interval / 10) def _session_keepalive(self) -> None: """ Spawn keepalive thread for ssh session Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ if not self.session_keepalive: return pool = ThreadPoolExecutor() pool.submit(self._keepalive_thread) def _acquire_session_lock(self) -> None: """ Attempt to acquire session lock Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ while True: if not self.session_lock.locked(): self.session_lock.acquire_lock() return def _session_open(self) -> None: """ Open SSH session Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ if self.setup_use_paramiko is False: ssh2_session_obj = SSH2NetSessionSSH2(self) self._session_open_connect = ( ssh2_session_obj._session_open_connect # pylint: disable=W0212 ) self._session_public_key_auth = ( ssh2_session_obj._session_public_key_auth # pylint: disable=W0212 ) self._session_password_auth = ( ssh2_session_obj._session_password_auth # pylint: disable=W0212 ) self._channel_open_driver = ( ssh2_session_obj._channel_open_driver # pylint: disable=W0212 ) self._channel_invoke_shell = ( ssh2_session_obj._channel_invoke_shell # pylint: disable=W0212 ) else: miko_sesion_obj = SSH2NetSessionParamiko(self) self._session_open_connect = ( miko_sesion_obj._session_open_connect # pylint: disable=W0212 ) self._session_public_key_auth = ( miko_sesion_obj._session_public_key_auth # pylint: disable=W0212 ) self._session_password_auth = ( miko_sesion_obj._session_password_auth # pylint: disable=W0212 ) self._channel_open_driver = ( miko_sesion_obj._channel_open_driver # pylint: disable=W0212 ) self._channel_invoke_shell = ( miko_sesion_obj._channel_invoke_shell # pylint: disable=W0212 ) if not self._socket_alive(): self._socket_open() if not self._session_alive(): self._session_open_connect() logging.debug(f"Session to host {self.host} opened") self.session_lock = Lock() if self.auth_public_key: self._session_public_key_auth() if self._session_alive(): return if self.auth_password: self._session_password_auth() if self._session_alive(): return def _session_check_authenticated(self) -> bool: """ Check if session is authenticated Args: N/A # noqa Returns: bool: True/False for session authenticated Raises: N/A # noqa """ if self.setup_use_paramiko is False: return self.session.userauth_authenticated() return self.session.is_authenticated() def _session_close(self) -> None: """ Close SSH SSH2NetSession Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ if self.session is not None: # pylint: disable=E0203 if self.setup_use_paramiko: self.session.close() # pylint: disable=E0203 else: self.session.disconnect() # pylint: disable=E0203 self.session = None logging.debug(f"Session to host {self.host} closed") """ channel setup """ # noqa def _channel_alive(self) -> bool: """ Check if channel is alive Args: N/A # noqa Returns: bool True/False channel is alive Raises: N/A # noqa """ try: if self.channel: return True except AttributeError: # channel not created, or closed logging.debug(f"Channel to host {self.host} has never been created") return False return False def _channel_open(self) -> None: """ Open channel Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ if not self._session_alive(): self._session_open() if not self._channel_alive(): self._channel_open_driver() def _channel_close(self) -> None: """ Close channel Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ if self.channel is not None: # pylint: disable=E0203 self.channel.close # noqa self.channel = None logging.debug(f"Channel to host {self.host} closed")
Ancestors
Subclasses
Inherited members