Skip to content

decorators

scrapli.decorators

timeout_modifier(wrapped_func: Callable[..., Any]) -> Callable[..., Any]

Decorate an "operation" to modify the timeout_ops value for duration of that operation

This decorator wraps send command/config ops and is used to allow users to set a timeout_ops value for the duration of a single method call -- this makes it so users don't need to manually set/reset the value

Parameters:

Name Type Description Default
wrapped_func Callable[..., Any]

function being decorated

required

Returns:

Name Type Description
decorate Callable[..., Any]

decorated func

Source code in scrapli/decorators.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def timeout_modifier(wrapped_func: Callable[..., Any]) -> Callable[..., Any]:
    """
    Decorate an "operation" to modify the timeout_ops value for duration of that operation

    This decorator wraps send command/config ops and is used to allow users to set a
    `timeout_ops` value for the duration of a single method call -- this makes it so users don't
    need to manually set/reset the value

    Args:
        wrapped_func: function being decorated

    Returns:
        decorate: decorated func

    Raises:
        N/A

    """
    if asyncio.iscoroutinefunction(wrapped_func):

        async def decorate(*args: Any, **kwargs: Any) -> Any:
            driver_instance: "AsyncGenericDriver" = args[0]
            driver_logger = driver_instance.logger

            timeout_ops_kwarg = kwargs.get("timeout_ops", None)

            if timeout_ops_kwarg is None or timeout_ops_kwarg == driver_instance.timeout_ops:
                result = await wrapped_func(*args, **kwargs)
            else:
                driver_logger.info(
                    "modifying driver timeout for current operation, temporary timeout_ops "
                    f"value: '{timeout_ops_kwarg}'"
                )
                base_timeout_ops = driver_instance.timeout_ops
                driver_instance.timeout_ops = kwargs["timeout_ops"]
                result = await wrapped_func(*args, **kwargs)
                driver_instance.timeout_ops = base_timeout_ops
            return result

    else:
        # ignoring type error:
        # "All conditional function variants must have identical signatures"
        # one is sync one is async so never going to be identical here!
        def decorate(*args: Any, **kwargs: Any) -> Any:  # type: ignore
            driver_instance: "GenericDriver" = args[0]
            driver_logger = driver_instance.logger

            timeout_ops_kwarg = kwargs.get("timeout_ops", None)

            if timeout_ops_kwarg is None or timeout_ops_kwarg == driver_instance.timeout_ops:
                result = wrapped_func(*args, **kwargs)
            else:
                driver_logger.info(
                    "modifying driver timeout for current operation, temporary timeout_ops "
                    f"value: '{timeout_ops_kwarg}'"
                )
                base_timeout_ops = driver_instance.timeout_ops
                driver_instance.timeout_ops = kwargs["timeout_ops"]
                result = wrapped_func(*args, **kwargs)
                driver_instance.timeout_ops = base_timeout_ops
            return result

    # ensures that the wrapped function is updated w/ the original functions docs/etc. --
    # necessary for introspection for the auto gen docs to work!
    update_wrapper(wrapper=decorate, wrapped=wrapped_func)
    return decorate

timeout_wrapper(wrapped_func: Callable[..., Any]) -> Callable[..., Any]

Timeout wrapper for transports

Parameters:

Name Type Description Default
wrapped_func Callable[..., Any]

function being wrapped -- must be a method of Channel or Transport

required

Returns:

Name Type Description
Any Callable[..., Any]

result of wrapped function

Source code in scrapli/decorators.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def timeout_wrapper(wrapped_func: Callable[..., Any]) -> Callable[..., Any]:
    """
    Timeout wrapper for transports

    Args:
        wrapped_func: function being wrapped -- must be a method of Channel or Transport

    Returns:
        Any: result of wrapped function

    Raises:
        N/A

    """
    if asyncio.iscoroutinefunction(wrapped_func):

        async def decorate(*args: Any, **kwargs: Any) -> Any:
            transport, logger, timeout = _get_transport_logger_timeout(cls=args[0])

            if not timeout:
                return await wrapped_func(*args, **kwargs)

            try:
                return await asyncio.wait_for(wrapped_func(*args, **kwargs), timeout=timeout)
            except asyncio.TimeoutError:
                _handle_timeout(
                    transport=transport,
                    logger=logger,
                    message=_get_timeout_message(func_name=wrapped_func.__name__),
                )

    else:
        # ignoring type error:
        # "All conditional function variants must have identical signatures"
        # one is sync one is async so never going to be identical here!
        def decorate(*args: Any, **kwargs: Any) -> Any:  # type: ignore
            transport, logger, timeout = _get_transport_logger_timeout(cls=args[0])

            if not timeout:
                return wrapped_func(*args, **kwargs)

            cls_name = transport.__class__.__name__

            if (
                cls_name in ("SystemTransport", "TelnetTransport")
                or _IS_WINDOWS
                or threading.current_thread() is not threading.main_thread()
            ):
                return _multiprocessing_timeout(
                    transport=transport,
                    logger=logger,
                    timeout=timeout,
                    wrapped_func=wrapped_func,
                    args=args,
                    kwargs=kwargs,
                )

            callback = partial(
                _signal_raise_exception,
                transport=transport,
                logger=logger,
                message=_get_timeout_message(wrapped_func.__name__),
            )

            old = signal.signal(signal.SIGALRM, callback)
            signal.setitimer(signal.ITIMER_REAL, timeout)
            try:
                return wrapped_func(*args, **kwargs)
            finally:
                if timeout:
                    signal.setitimer(signal.ITIMER_REAL, 0)
                    signal.signal(signal.SIGALRM, old)

    # ensures that the wrapped function is updated w/ the original functions docs/etc. --
    # necessary for introspection for the auto gen docs to work!
    update_wrapper(wrapper=decorate, wrapped=wrapped_func)
    return decorate