Skip to content

transport

scrapli.transport.plugins.paramiko.transport

ParamikoTransport

Bases: Transport

Source code in transport/plugins/paramiko/transport.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
class ParamikoTransport(Transport):
    def __init__(
        self, base_transport_args: BaseTransportArgs, plugin_transport_args: PluginTransportArgs
    ) -> None:
        """
        Paramiko transport plugin.

        This transport supports some additional `transport_options` to control behavior --

        `enable_rsa2` is bool defaulting to `False`. Please see the paramiko changelog entry for
            version 2.9.0 here: https://www.paramiko.org/changelog.html. Basically, even though
            scrapli *tries* to default to safe/sane/secure things where possible, it is also focused
            on *network* devices which maybe sometimes are less up-to-date/safe/secure than we'd
            all hope. In this case paramiko 2.9.0 defaults to supporting only RSA2based servers,
            which, causes issues for the devices in the test suite, and likely for many other
            network devices out there. So, by default, we'll *disable* this. If you wish to enable
            this you can simply pass `True` to this transport option!

        Args:
            base_transport_args: scrapli base transport plugin arguments
            plugin_transport_args: paramiko ssh specific transport plugin arguments

        Returns:
            N/A

        Raises:
            N/A

        """
        super().__init__(base_transport_args=base_transport_args)
        self.plugin_transport_args = plugin_transport_args

        self.socket: Optional[Socket] = None
        self.session: Optional[_ParamikoTransport] = None
        self.session_channel: Optional[Channel] = None

    def open(self) -> None:
        self._pre_open_closing_log(closing=False)

        if not self.socket:
            self.socket = Socket(
                host=self._base_transport_args.host,
                port=self._base_transport_args.port,
                timeout=self._base_transport_args.timeout_socket,
            )

        if not self.socket.isalive():
            self.socket.open()

        try:
            self.session = _ParamikoTransport(self.socket.sock)  # type: ignore
            self.session.start_client()
        except Exception as exc:
            self.logger.critical("failed to complete handshake with host")
            raise ScrapliConnectionNotOpened from exc

        if self.plugin_transport_args.auth_strict_key:
            self.logger.debug(f"attempting to validate {self._base_transport_args.host} public key")
            self._verify_key()

        self._authenticate()

        if not self.session.is_authenticated():
            msg = "all authentication methods failed"
            self.logger.critical(msg)
            raise ScrapliAuthenticationFailed(msg)

        self._open_channel()

        self._post_open_closing_log(closing=False)

    def _verify_key(self) -> None:
        """
        Verify target host public key, raise exception if invalid/unknown

        Args:
            N/A

        Returns:
            None

        Raises:
            ScrapliConnectionNotOpened: if session is unopened/None
            ScrapliAuthenticationFailed: if host is not in known hosts
            ScrapliAuthenticationFailed: if host is in known hosts but public key does not match

        """
        if not self.session:
            raise ScrapliConnectionNotOpened

        known_hosts = SSHKnownHosts(self.plugin_transport_args.ssh_known_hosts_file)
        known_host_public_key = known_hosts.lookup(self._base_transport_args.host)

        if not known_host_public_key:
            raise ScrapliAuthenticationFailed(
                f"{self._base_transport_args.host} not in known_hosts!"
            )

        remote_server_key = self.session.get_remote_server_key()
        remote_public_key = remote_server_key.get_base64()

        if known_host_public_key["public_key"] != remote_public_key:
            raise ScrapliAuthenticationFailed(
                f"{self._base_transport_args.host} in known_hosts but public key does not match!"
            )

    def _authenticate(self) -> None:
        """
        Parent method to try all means of authentication

        Args:
            N/A

        Returns:
            None

        Raises:
            ScrapliConnectionNotOpened: if session is unopened/None
            ScrapliAuthenticationFailed: if auth fails

        """
        if not self.session:
            raise ScrapliConnectionNotOpened

        if self.plugin_transport_args.auth_private_key:
            self._authenticate_public_key()
            if self.session.is_authenticated():
                return
            if (
                not self.plugin_transport_args.auth_password
                or not self.plugin_transport_args.auth_username
            ):
                msg = (
                    f"Failed to authenticate to host {self._base_transport_args.host} with private "
                    f"key `{self.plugin_transport_args.auth_private_key}`. Unable to continue "
                    "authentication, missing username, password, or both."
                )
                raise ScrapliAuthenticationFailed(msg)

        self._authenticate_password()

    def _authenticate_public_key(self) -> None:
        """
        Attempt to authenticate with public key authentication

        Args:
            N/A

        Returns:
            None

        Raises:
            ScrapliConnectionNotOpened: if session is unopened/None

        """
        if not self.session:
            raise ScrapliConnectionNotOpened

        if self._base_transport_args.transport_options.get("enable_rsa2", False) is False:
            # do this for "keys" and "pubkeys": https://github.com/paramiko/paramiko/issues/1984
            self.session.disabled_algorithms = {
                "keys": ["rsa-sha2-256", "rsa-sha2-512"],
                "pubkeys": ["rsa-sha2-256", "rsa-sha2-512"],
            }

        try:
            paramiko_key = RSAKey(filename=self.plugin_transport_args.auth_private_key)
            self.session.auth_publickey(
                username=self.plugin_transport_args.auth_username, key=paramiko_key
            )
        except AuthenticationException:
            pass
        except Exception:  # pylint: disable=W0703
            pass

    def _authenticate_password(self) -> None:
        """
        Attempt to authenticate with password authentication

        Args:
            N/A

        Returns:
            None

        Raises:
            ScrapliConnectionNotOpened: if session is unopened/None

        """
        if not self.session:
            raise ScrapliConnectionNotOpened

        with suppress(AuthenticationException):
            self.session.auth_password(
                username=self.plugin_transport_args.auth_username,
                password=self.plugin_transport_args.auth_password,
            )

    def _open_channel(self) -> None:
        """
        Open channel, acquire pty, request interactive shell

        Args:
            N/A

        Returns:
            None

        Raises:
            ScrapliConnectionNotOpened: if session is unopened/None

        """
        if not self.session:
            raise ScrapliConnectionNotOpened

        self.session_channel = self.session.open_session()
        self._set_timeout(self._base_transport_args.timeout_transport)
        self.session_channel.get_pty()
        self.session_channel.invoke_shell()

    def close(self) -> None:
        self._pre_open_closing_log(closing=True)

        if self.session_channel:
            self.session_channel.close()

            if self.socket:
                self.socket.close()

        self.session = None
        self.session_channel = None

        self._post_open_closing_log(closing=True)

    def isalive(self) -> bool:
        if not self.session:
            return False
        _isalive: bool = self.session.is_alive()
        return _isalive

    def read(self) -> bytes:
        if not self.session_channel:
            raise ScrapliConnectionNotOpened
        try:
            buf: bytes = self.session_channel.recv(65535)
        except Exception as exc:
            msg = (
                "encountered EOF reading from transport; typically means the device closed the "
                "connection"
            )
            self.logger.critical(msg)
            raise ScrapliConnectionError(msg) from exc
        return buf

    def write(self, channel_input: bytes) -> None:
        if not self.session_channel:
            raise ScrapliConnectionNotOpened
        self.session_channel.send(channel_input)

    def _set_timeout(self, value: float) -> None:
        """
        Set session object timeout value

        Args:
            value: timeout in seconds

        Returns:
            None

        Raises:
            ScrapliConnectionNotOpened: if session is unopened/None

        """
        if not self.session_channel:
            raise ScrapliConnectionNotOpened
        self.session_channel.settimeout(value)

__init__(base_transport_args: BaseTransportArgs, plugin_transport_args: PluginTransportArgs) -> None

Paramiko transport plugin.

This transport supports some additional transport_options to control behavior --

enable_rsa2 is bool defaulting to False. Please see the paramiko changelog entry for version 2.9.0 here: https://www.paramiko.org/changelog.html. Basically, even though scrapli tries to default to safe/sane/secure things where possible, it is also focused on network devices which maybe sometimes are less up-to-date/safe/secure than we'd all hope. In this case paramiko 2.9.0 defaults to supporting only RSA2based servers, which, causes issues for the devices in the test suite, and likely for many other network devices out there. So, by default, we'll disable this. If you wish to enable this you can simply pass True to this transport option!

Parameters:

Name Type Description Default
base_transport_args BaseTransportArgs

scrapli base transport plugin arguments

required
plugin_transport_args PluginTransportArgs

paramiko ssh specific transport plugin arguments

required

Returns:

Type Description
None

N/A

Source code in transport/plugins/paramiko/transport.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(
    self, base_transport_args: BaseTransportArgs, plugin_transport_args: PluginTransportArgs
) -> None:
    """
    Paramiko transport plugin.

    This transport supports some additional `transport_options` to control behavior --

    `enable_rsa2` is bool defaulting to `False`. Please see the paramiko changelog entry for
        version 2.9.0 here: https://www.paramiko.org/changelog.html. Basically, even though
        scrapli *tries* to default to safe/sane/secure things where possible, it is also focused
        on *network* devices which maybe sometimes are less up-to-date/safe/secure than we'd
        all hope. In this case paramiko 2.9.0 defaults to supporting only RSA2based servers,
        which, causes issues for the devices in the test suite, and likely for many other
        network devices out there. So, by default, we'll *disable* this. If you wish to enable
        this you can simply pass `True` to this transport option!

    Args:
        base_transport_args: scrapli base transport plugin arguments
        plugin_transport_args: paramiko ssh specific transport plugin arguments

    Returns:
        N/A

    Raises:
        N/A

    """
    super().__init__(base_transport_args=base_transport_args)
    self.plugin_transport_args = plugin_transport_args

    self.socket: Optional[Socket] = None
    self.session: Optional[_ParamikoTransport] = None
    self.session_channel: Optional[Channel] = None