Skip to content

Async Probes

tomodachi_testcontainers.async_probes

Asynchronous probes (sampling) for testing asynchronous systems.

Inspired by Awaitility and busypie.

T module-attribute

T = TypeVar('T')

probe_until async

probe_until(probe: Callable[[], Awaitable[T]], probe_interval: float = 0.1, stop_after: float = 3.0) -> T
probe_until(probe: Callable[[], T], probe_interval: float = 0.1, stop_after: float = 3.0) -> T
probe_until(probe, probe_interval=0.1, stop_after=3.0)

Run given function until it finishes without exceptions.

Given function can be a regular synchronous function or an asynchronous function.

PARAMETER DESCRIPTION
probe

TYPE: Union[Callable[[], Awaitable[T]], Callable[[], T]]

probe_interval

TYPE: float DEFAULT: 0.1

stop_after

TYPE: float DEFAULT: 3.0

Source code in src/tomodachi_testcontainers/async_probes.py
async def probe_until(
    probe: Union[Callable[[], Awaitable[T]], Callable[[], T]],
    probe_interval: float = 0.1,
    stop_after: float = 3.0,
) -> T:
    """Run given function until it finishes without exceptions.

    Given function can be a regular synchronous function or an asynchronous function.
    """
    result: Any = None
    async for attempt in AsyncRetrying(
        wait=wait_fixed(probe_interval),
        stop=stop_after_delay(stop_after),
        reraise=True,
    ):
        with attempt:
            result = probe()
            if asyncio.iscoroutine(result):
                result = await result
    return cast(T, result)

probe_during_interval async

probe_during_interval(probe: Callable[[], Awaitable[T]], probe_interval: float = 0.1, stop_after: float = 3.0) -> T
probe_during_interval(probe: Callable[[], T], probe_interval: float = 0.1, stop_after: float = 3.0) -> T
probe_during_interval(probe, probe_interval=0.1, stop_after=3.0)

Run given function until timeout is reached and the function always finishes without exceptions.

Given function can be a regular synchronous function or an asynchronous function.

PARAMETER DESCRIPTION
probe

TYPE: Callable[[], Union[Awaitable[T], T]]

probe_interval

TYPE: float DEFAULT: 0.1

stop_after

TYPE: float DEFAULT: 3.0

Source code in src/tomodachi_testcontainers/async_probes.py
async def probe_during_interval(
    probe: Callable[[], Union[Awaitable[T], T]],
    probe_interval: float = 0.1,
    stop_after: float = 3.0,
) -> T:
    """Run given function until timeout is reached and the function always finishes without exceptions.

    Given function can be a regular synchronous function or an asynchronous function.
    """
    result: Any = None
    with suppress(RetryError):
        async for attempt in AsyncRetrying(
            wait=wait_fixed(probe_interval),
            stop=stop_after_delay(stop_after),
            retry=retry_unless_exception_type(BaseException),
            reraise=True,
        ):
            with attempt:
                result = probe()
                if asyncio.iscoroutine(result):
                    result = await result
    return cast(T, result)