Skip to content

transport

scrapli.transport.plugins.asyncssh.transport

AsyncsshTransport

Bases: AsyncTransport

Source code in transport/plugins/asyncssh/transport.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class AsyncsshTransport(AsyncTransport):
    def __init__(
        self, base_transport_args: BaseTransportArgs, plugin_transport_args: PluginTransportArgs
    ) -> None:
        """
        Asyncssh transport plugin.

        Important note: some ssh servers may refuse connections if too many ssh host key algorithms
        are passed to it during the connection opening -- Asyncssh sends a bunch by default! If you
        encounter this issue, you can simply update your SSH config file to set a smaller (or one)
        number of ssh host key algorithms to work around this like so:

        ```
        Host *
            HostKeyAlgorithms ssh-rsa
        ```

        Thank you to @davaeron [https://github.com/davaeron] for reporting this in #173, see also
        asyncssh #323 here: https://github.com/ronf/asyncssh/issues/323.

        This transport supports some additional `transport_options` to control behavior --
        `asyncssh` is a dictionary that contains options that are passed directly to asyncssh during
        connection creation, you can find the SSH Client options of asyncssh here:
        https://asyncssh.readthedocs.io/en/latest/api.html#sshclientconnectionoptions.

        Below is an example of passing in options to modify kex and encryption algorithms

        ```
        device = {
            "host": "localhost",
            "transport_options": {
                "asyncssh": {
                    "kex_algs": ["diffie-hellman-group14-sha1", "diffie-hellman-group1-sha1"],
                    "encryption_algs": ["aes256-cbc", "aes192-cbc", "aes256-ctr", "aes192-ctr"],
                }
            },
            "platform": "cisco_iosxe"
        }

        conn = Scrapli(**device)
        ```

        Args:
            base_transport_args: scrapli base transport plugin arguments
            plugin_transport_args: asyncssh ssh specific transport plugin arguments

        Returns:
            N/A

        Raises:
            N/A

        """
        super().__init__(base_transport_args=base_transport_args)
        self.plugin_transport_args = plugin_transport_args

        self.session: Optional[SSHClientConnection] = None
        self.stdout: Optional[SSHReader[Any]] = None
        self.stdin: Optional[SSHWriter[Any]] = None

    def _verify_key(self) -> None:
        """
        Verify target host public key, raise exception if invalid/unknown

        Args:
            N/A

        Returns:
            None

        Raises:
            ScrapliAuthenticationFailed: if host is not in known hosts

        """
        known_hosts = SSHKnownHosts(self.plugin_transport_args.ssh_known_hosts_file)
        known_host_public_key = known_hosts.lookup(self._base_transport_args.host)

        if not known_host_public_key:
            raise ScrapliAuthenticationFailed(
                f"{self._base_transport_args.host} not in known_hosts!"
            )

    def _verify_key_value(self) -> None:
        """
        Verify target host public key, raise exception if invalid/unknown

        Args:
            N/A

        Returns:
            None

        Raises:
            ScrapliConnectionNotOpened: if session is unopened/None
            ScrapliAuthenticationFailed: if host is in known hosts but public key does not match or
                cannot glean remote server key from session.

        """
        if not self.session:
            raise ScrapliConnectionNotOpened

        known_hosts = SSHKnownHosts(self.plugin_transport_args.ssh_known_hosts_file)
        known_host_public_key = known_hosts.lookup(self._base_transport_args.host)

        remote_server_key = self.session.get_server_host_key()
        if remote_server_key is None:
            raise ScrapliAuthenticationFailed(
                f"failed gleaning remote server ssh key for host {self._base_transport_args.host}"
            )

        remote_public_key = remote_server_key.export_public_key().split()[1].decode()

        if known_host_public_key["public_key"] != remote_public_key:
            raise ScrapliAuthenticationFailed(
                f"{self._base_transport_args.host} in known_hosts but public key does not match!"
            )

    async def open(self) -> None:
        self._pre_open_closing_log(closing=False)

        if self.plugin_transport_args.auth_strict_key:
            self.logger.debug(
                f"Attempting to validate {self._base_transport_args.host} public key is in known "
                f"hosts"
            )
            self._verify_key()

        # we already fetched host/port/user from the user input and/or the ssh config file, so we
        # want to use those explicitly. likewise we pass config file we already found. set known
        # hosts and agent to None so we can not have an agent and deal w/ known hosts ourselves.
        # to use ssh-agent either pass an empty tuple (to pick up ssh-agent socket from
        # SSH_AUTH_SOCK), or pass an explicit path to ssh-agent socket should be provided as part
        # of transport_options -- in either case these get merged into the dict *after* we set the
        # default value of `None`, so users options override our defaults.

        common_args: Dict[str, Any] = {
            "host": self._base_transport_args.host,
            "port": self._base_transport_args.port,
            "username": self.plugin_transport_args.auth_username,
            "known_hosts": None,
            "agent_path": None,
            "config": self.plugin_transport_args.ssh_config_file,
        }

        # Allow passing `transport_options` to asyncssh
        common_args.update(self._base_transport_args.transport_options.get("asyncssh", {}))

        # Common authentication args
        auth_args: Dict[str, Any] = {
            "client_keys": self.plugin_transport_args.auth_private_key,
            "password": self.plugin_transport_args.auth_password,
            "preferred_auth": (
                "publickey",
                "keyboard-interactive",
                "password",
            ),
        }

        # The session args to use in connect() - to merge the dicts in
        # the order to have transport options preference over predefined auth args
        conn_args = {**auth_args, **common_args}

        try:
            self.session = await asyncio.wait_for(
                connect(**conn_args),
                timeout=self._base_transport_args.timeout_socket,
            )
        except PermissionDenied as exc:
            msg = "all authentication methods failed"
            self.logger.critical(msg)
            raise ScrapliAuthenticationFailed(msg) from exc
        except asyncio.TimeoutError as exc:
            msg = "timed out opening connection to device"
            self.logger.critical(msg)
            raise ScrapliAuthenticationFailed(msg) from exc

        if not self.session:
            raise ScrapliConnectionNotOpened

        if self.plugin_transport_args.auth_strict_key:
            self.logger.debug(
                f"Attempting to validate {self._base_transport_args.host} public key is in known "
                f"hosts and is valid"
            )
            self._verify_key_value()

        self.stdin, self.stdout, _ = await self.session.open_session(
            term_type="xterm", encoding=None
        )

        self._post_open_closing_log(closing=False)

    def close(self) -> None:
        self._pre_open_closing_log(closing=True)

        if self.session:
            with suppress(BrokenPipeError):
                # this may raise a BrokenPipeError because seems it is possible for the connection
                # transport is_closing() to be true already in some cases... since we are closing
                # the connection anyway we will just ignore this note that this seemed to only
                # happen in github actions on ubuntu-latest w/ py3.8... hence the suppress!
                self.session.close()

        # always reset session/stdin/stdout back to None if we are closing!
        self.session = None
        self.stdin = None
        self.stdout = None

        self._post_open_closing_log(closing=True)

    def isalive(self) -> bool:
        if not self.session:
            return False

        # this may need to be revisited in the future, but this seems to be a good check for
        # aliveness
        with suppress(AttributeError):
            if (
                self.session._auth_complete  # pylint:  disable=W0212
                and self.session._transport is not None  # pylint:  disable=W0212
                and self.session._transport.is_closing() is False  # pylint:  disable=W0212
            ):
                return True

        return False

    @timeout_wrapper
    async def read(self) -> bytes:
        if not self.stdout:
            raise ScrapliConnectionNotOpened

        try:
            buf: bytes = await self.stdout.read(65535)
        except ConnectionLost as exc:
            msg = (
                "encountered EOF reading from transport; typically means the device closed the "
                "connection"
            )
            self.logger.critical(msg)
            raise ScrapliConnectionError(msg) from exc

        return buf

    def write(self, channel_input: bytes) -> None:
        if not self.stdin:
            raise ScrapliConnectionNotOpened
        self.stdin.write(channel_input)

__init__(base_transport_args: BaseTransportArgs, plugin_transport_args: PluginTransportArgs) -> None

Asyncssh transport plugin.

Important note: some ssh servers may refuse connections if too many ssh host key algorithms are passed to it during the connection opening -- Asyncssh sends a bunch by default! If you encounter this issue, you can simply update your SSH config file to set a smaller (or one) number of ssh host key algorithms to work around this like so:

1
2
Host *
    HostKeyAlgorithms ssh-rsa

Thank you to @davaeron [https://github.com/davaeron] for reporting this in #173, see also asyncssh #323 here: https://github.com/ronf/asyncssh/issues/323.

This transport supports some additional transport_options to control behavior -- asyncssh is a dictionary that contains options that are passed directly to asyncssh during connection creation, you can find the SSH Client options of asyncssh here: https://asyncssh.readthedocs.io/en/latest/api.html#sshclientconnectionoptions.

Below is an example of passing in options to modify kex and encryption algorithms

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
device = {
    "host": "localhost",
    "transport_options": {
        "asyncssh": {
            "kex_algs": ["diffie-hellman-group14-sha1", "diffie-hellman-group1-sha1"],
            "encryption_algs": ["aes256-cbc", "aes192-cbc", "aes256-ctr", "aes192-ctr"],
        }
    },
    "platform": "cisco_iosxe"
}

conn = Scrapli(**device)

Parameters:

Name Type Description Default
base_transport_args BaseTransportArgs

scrapli base transport plugin arguments

required
plugin_transport_args PluginTransportArgs

asyncssh ssh specific transport plugin arguments

required

Returns:

Type Description
None

N/A

Source code in transport/plugins/asyncssh/transport.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, base_transport_args: BaseTransportArgs, plugin_transport_args: PluginTransportArgs
) -> None:
    """
    Asyncssh transport plugin.

    Important note: some ssh servers may refuse connections if too many ssh host key algorithms
    are passed to it during the connection opening -- Asyncssh sends a bunch by default! If you
    encounter this issue, you can simply update your SSH config file to set a smaller (or one)
    number of ssh host key algorithms to work around this like so:

    ```
    Host *
        HostKeyAlgorithms ssh-rsa
    ```

    Thank you to @davaeron [https://github.com/davaeron] for reporting this in #173, see also
    asyncssh #323 here: https://github.com/ronf/asyncssh/issues/323.

    This transport supports some additional `transport_options` to control behavior --
    `asyncssh` is a dictionary that contains options that are passed directly to asyncssh during
    connection creation, you can find the SSH Client options of asyncssh here:
    https://asyncssh.readthedocs.io/en/latest/api.html#sshclientconnectionoptions.

    Below is an example of passing in options to modify kex and encryption algorithms

    ```
    device = {
        "host": "localhost",
        "transport_options": {
            "asyncssh": {
                "kex_algs": ["diffie-hellman-group14-sha1", "diffie-hellman-group1-sha1"],
                "encryption_algs": ["aes256-cbc", "aes192-cbc", "aes256-ctr", "aes192-ctr"],
            }
        },
        "platform": "cisco_iosxe"
    }

    conn = Scrapli(**device)
    ```

    Args:
        base_transport_args: scrapli base transport plugin arguments
        plugin_transport_args: asyncssh ssh specific transport plugin arguments

    Returns:
        N/A

    Raises:
        N/A

    """
    super().__init__(base_transport_args=base_transport_args)
    self.plugin_transport_args = plugin_transport_args

    self.session: Optional[SSHClientConnection] = None
    self.stdout: Optional[SSHReader[Any]] = None
    self.stdin: Optional[SSHWriter[Any]] = None