Module ssh2net.core.driver
ssh2net.core.driver
Expand source code
"""ssh2net.core.driver"""
import collections
import re
from typing import Any, Dict, Optional, Union
from ssh2net.base import SSH2Net
from ssh2net.exceptions import UnknownPrivLevel
from ssh2net.helper import _textfsm_get_template, textfsm_parse
PrivilegeLevel = collections.namedtuple(
"PrivLevel",
"pattern "
"name "
"deescalate_priv "
"deescalate "
"escalate_priv "
"escalate "
"escalate_auth "
"escalate_prompt "
"requestable "
"level",
)
PRIVS = {}
class BaseNetworkDriver(SSH2Net):
def __init__(self, auth_secondary: Optional[Union[str]] = None, **kwargs: Dict[str, Any]):
"""
Initialize SSH2Net BaseNetworkDriver Object
Args:
auth_secondary: password to use for secondary authentication (enable)
Returns:
N/A # noqa
Raises:
N/A # noqa
"""
self.auth_secondary = auth_secondary
super().__init__(**kwargs)
self.privs = PRIVS
self.default_desired_priv = None
self.textfsm_platform = None
def _determine_current_priv(self, current_prompt: str):
"""
Determine current privilege level from prompt string
Args:
current_prompt: string of current prompt
Returns:
priv_level: NamedTuple of current privilege level
Raises:
UnknownPrivLevel: if privilege level cannot be determined # noqa
# darglint raises DAR401 for some reason hence the noqa...
"""
for priv_level in self.privs.values():
if re.search(priv_level.pattern, current_prompt):
return priv_level
raise UnknownPrivLevel
def _escalate(self) -> None:
"""
Escalate to the next privilege level up
Args:
N/A # noqa
Returns:
N/A # noqa
Raises:
N/A # noqa
"""
current_priv = self._determine_current_priv(self.get_prompt())
if current_priv.escalate:
if current_priv.escalate_auth:
self.send_inputs_interact(
(
current_priv.escalate,
current_priv.escalate_prompt,
self.auth_enable,
self.privs.get("escalate_priv"),
),
hidden_response=True,
)
else:
self.send_inputs(current_priv.escalate)
def _deescalate(self) -> None:
"""
Deescalate to the next privilege level down
Args:
N/A # noqa
Returns:
N/A # noqa
Raises:
N/A # noqa
"""
current_priv = self._determine_current_priv(self.get_prompt())
if current_priv.deescalate:
self.send_inputs(current_priv.deescalate)
def attain_priv(self, desired_priv) -> None:
"""
Attain desired priv level
Args:
desired_priv: string name of desired privilege level
(see ssh2net.core.<device_type>.driver for levels)
Returns:
N/A # noqa
Raises:
N/A # noqa
"""
while True:
current_priv = self._determine_current_priv(self.get_prompt())
if current_priv == self.privs[desired_priv]:
return
if current_priv.level > self.privs[desired_priv].level:
self._deescalate()
else:
self._escalate()
def send_command(self, commands):
"""
Send command(s)
Args:
commands: string or list of strings to send to device in privilege exec mode
Returns:
N/A # noqa
Raises:
N/A # noqa
"""
self.attain_priv(self.default_desired_priv)
result = self.send_inputs(commands)
return result
def send_config_set(self, configs):
"""
Send configuration(s)
Args:
configs: string or list of strings to send to device in config mode
Returns:
N/A # noqa
Raises:
N/A # noqa
"""
self.attain_priv("configuration")
result = self.send_inputs(configs)
self.attain_priv(self.default_desired_priv)
return result
def textfsm_parse_output(self, command: str, output: str) -> str:
"""
Parse output with TextFSM and ntc-templates
Args:
command: command used to get output
output: output from command
Returns:
output: parsed output
Raises:
N/A # noqa
"""
template = _textfsm_get_template(self.textfsm_platform, command)
if template:
output = textfsm_parse(template, output)
return output
Classes
class BaseNetworkDriver (auth_secondary=None, **kwargs)
-
Initialize SSH2Net BaseNetworkDriver Object
Args
auth_secondary
- password to use for secondary authentication (enable)
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
class BaseNetworkDriver(SSH2Net): def __init__(self, auth_secondary: Optional[Union[str]] = None, **kwargs: Dict[str, Any]): """ Initialize SSH2Net BaseNetworkDriver Object Args: auth_secondary: password to use for secondary authentication (enable) Returns: N/A # noqa Raises: N/A # noqa """ self.auth_secondary = auth_secondary super().__init__(**kwargs) self.privs = PRIVS self.default_desired_priv = None self.textfsm_platform = None def _determine_current_priv(self, current_prompt: str): """ Determine current privilege level from prompt string Args: current_prompt: string of current prompt Returns: priv_level: NamedTuple of current privilege level Raises: UnknownPrivLevel: if privilege level cannot be determined # noqa # darglint raises DAR401 for some reason hence the noqa... """ for priv_level in self.privs.values(): if re.search(priv_level.pattern, current_prompt): return priv_level raise UnknownPrivLevel def _escalate(self) -> None: """ Escalate to the next privilege level up Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ current_priv = self._determine_current_priv(self.get_prompt()) if current_priv.escalate: if current_priv.escalate_auth: self.send_inputs_interact( ( current_priv.escalate, current_priv.escalate_prompt, self.auth_enable, self.privs.get("escalate_priv"), ), hidden_response=True, ) else: self.send_inputs(current_priv.escalate) def _deescalate(self) -> None: """ Deescalate to the next privilege level down Args: N/A # noqa Returns: N/A # noqa Raises: N/A # noqa """ current_priv = self._determine_current_priv(self.get_prompt()) if current_priv.deescalate: self.send_inputs(current_priv.deescalate) def attain_priv(self, desired_priv) -> None: """ Attain desired priv level Args: desired_priv: string name of desired privilege level (see ssh2net.core.<device_type>.driver for levels) Returns: N/A # noqa Raises: N/A # noqa """ while True: current_priv = self._determine_current_priv(self.get_prompt()) if current_priv == self.privs[desired_priv]: return if current_priv.level > self.privs[desired_priv].level: self._deescalate() else: self._escalate() def send_command(self, commands): """ Send command(s) Args: commands: string or list of strings to send to device in privilege exec mode Returns: N/A # noqa Raises: N/A # noqa """ self.attain_priv(self.default_desired_priv) result = self.send_inputs(commands) return result def send_config_set(self, configs): """ Send configuration(s) Args: configs: string or list of strings to send to device in config mode Returns: N/A # noqa Raises: N/A # noqa """ self.attain_priv("configuration") result = self.send_inputs(configs) self.attain_priv(self.default_desired_priv) return result def textfsm_parse_output(self, command: str, output: str) -> str: """ Parse output with TextFSM and ntc-templates Args: command: command used to get output output: output from command Returns: output: parsed output Raises: N/A # noqa """ template = _textfsm_get_template(self.textfsm_platform, command) if template: output = textfsm_parse(template, output) return output
Ancestors
Subclasses
Methods
def attain_priv(self, desired_priv)
-
Attain desired priv level
Args
desired_priv
- string name of desired privilege level
(see ssh2net.core.
.driver for levels)
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
def attain_priv(self, desired_priv) -> None: """ Attain desired priv level Args: desired_priv: string name of desired privilege level (see ssh2net.core.<device_type>.driver for levels) Returns: N/A # noqa Raises: N/A # noqa """ while True: current_priv = self._determine_current_priv(self.get_prompt()) if current_priv == self.privs[desired_priv]: return if current_priv.level > self.privs[desired_priv].level: self._deescalate() else: self._escalate()
def send_command(self, commands)
-
Send command(s)
Args
commands
- string or list of strings to send to device in privilege exec mode
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
def send_command(self, commands): """ Send command(s) Args: commands: string or list of strings to send to device in privilege exec mode Returns: N/A # noqa Raises: N/A # noqa """ self.attain_priv(self.default_desired_priv) result = self.send_inputs(commands) return result
def send_config_set(self, configs)
-
Send configuration(s)
Args
configs
- string or list of strings to send to device in config mode
Returns
N
/A
#noqa
Raises
N
/A
#noqa
Expand source code
def send_config_set(self, configs): """ Send configuration(s) Args: configs: string or list of strings to send to device in config mode Returns: N/A # noqa Raises: N/A # noqa """ self.attain_priv("configuration") result = self.send_inputs(configs) self.attain_priv(self.default_desired_priv) return result
def textfsm_parse_output(self, command, output)
-
Parse output with TextFSM and ntc-templates
Args
command
- command used to get output
output
- output from command
Returns
output
- parsed output
Raises
N
/A
#noqa
Expand source code
def textfsm_parse_output(self, command: str, output: str) -> str: """ Parse output with TextFSM and ntc-templates Args: command: command used to get output output: output from command Returns: output: parsed output Raises: N/A # noqa """ template = _textfsm_get_template(self.textfsm_platform, command) if template: output = textfsm_parse(template, output) return output
Inherited members
class PrivilegeLevel (*args, **kwargs)
-
PrivLevel(pattern, name, deescalate_priv, deescalate, escalate_priv, escalate, escalate_auth, escalate_prompt, requestable, level)
Ancestors
- builtins.tuple
Instance variables
var deescalate
-
Alias for field number 3
var deescalate_priv
-
Alias for field number 2
var escalate
-
Alias for field number 5
var escalate_auth
-
Alias for field number 6
var escalate_priv
-
Alias for field number 4
var escalate_prompt
-
Alias for field number 7
var level
-
Alias for field number 9
var name
-
Alias for field number 1
var pattern
-
Alias for field number 0
var requestable
-
Alias for field number 8