Skip to content

Testing Asynchronous Systems

In the previous section, we tested an application that directly depends on another application's HTTP API; we mocked its public API with an HTTP mock server. This section will explore another scenario - testing an application that depends on other services through asynchronous message bus.

Example: Event-Driven Customer Management Application

Let's test an example Customer Management Application. A user can create a new account in the Customer application and place new orders in the Order application. The Customer and Order apps communicate through an asynchronous message bus, in this example, AWS SNS/SQS. When the customer creates a new account, the Customer app publishes a CustomerCreated event to the message bus. When the customer creates a new order, the Order app publishes an OrderCreated event to the message bus. The Customer application listens for OrderCreated events and saves a list of orders that belong to the customer.

The apps communicate indirectly through the message bus

A message bus is a decoupling point. The Customer and Order apps communicate indirectly through the message bus. To test the Customer app in isolation, we don't need to run a real or mocked version of the Order app - We only need to publish the Order app's events to the message bus to simulate interactions with the Customer app.

Container Diagram - Order Management System

Creating the customer management application

Let's first create the example Customer application.

Note

Since this example involves setting up many dependencies like AWS DynamoDB and AWS SNS/SQS, the setup code is omitted for brevity. Find the complete example's code in GitHub.

The app has three endpoints:

  • HTTP POST /customer - for new customer creation, publishes CustomerCreatedEvent to AWS SNS.
  • HTTP GET /customer/<id> - for querying existing customers.
  • AWS SQS consumer listening for OrderCreatedEvents - saves the received Order identifier on an existing customer's object.
src/app.py
class Service(tomodachi.Service):
    ...

    @tomodachi.http("POST", r"/customer/?")
    async def create_customer(self, request: web.Request) -> web.Response:
        data = await request.json()
        customer = Customer(
            customer_id=str(uuid.uuid4()),
            name=data["name"],
            orders=[],
        )
        event = CustomerCreatedEvent.from_customer(customer)
        await self._repository.create(customer)
        await tomodachi.aws_sns_sqs_publish(
            self,
            data=event.to_dict(),
            topic="customer--created",
            message_envelope=JsonBase,
        )
        return web.json_response(customer.to_dict())

    @tomodachi.http("GET", r"/customer/(?P<customer_id>[^/]+?)/?")
    async def get_customer(self, request: web.Request, customer_id: str) -> web.Response:
        customer = await self._repository.get(customer_id)
        if not customer:
            return web.json_response({"error": "CUSTOMER_NOT_FOUND"}, status=404)
        return web.json_response(customer.to_dict())

    @tomodachi.aws_sns_sqs(
        "order--created",
        queue_name="customer--order-created",
        dead_letter_queue_name="customer--order-created--dlq",
        max_receive_count=int(os.getenv("AWS_SQS_MAX_RECEIVE_COUNT", 3)),
        message_envelope=JsonBase,
        visibility_timeout=int(os.getenv("AWS_SQS_VISIBILITY_TIMEOUT", 30)),
    )
    async def handle_order_created(self, data: dict) -> None:
        try:
            event = OrderCreatedEvent.from_dict(data)
            await self._repository.add_order(event)
        except Exception as e:
            raise AWSSNSSQSInternalServiceError from e

Configuring Testcontainers

The Testcontainer's configuration in this example is involved because we must configure two infrastructure dependencies - AWS DynamoDB and AWS SNS/SQS.

  • The fixture _create_topics_and_queues creates necessary pub/sub topics and queues for the Customer and Order app to communicate through the AWS SNS/SQS message bus.

  • The fixture _purge_queues_on_teardown deletes all messages from the queues after every test to ensure test isolation - that messages don't leak between tests.

  • The tomodachi_container fixture uses LocalStackContainer to emulate AWS DynamoDB and AWS SNS/SQS - in the same way as in the Testing Applications with Backing Services guide. The TomodachiContainer is configured with the LocalStack container's environment variables. The _create_topics_and_queues fixture is used in the tomodachi_container fixture to ensure that topics and queues are created before the Customer application starts.

tests/conftest.py
from typing import AsyncGenerator, Generator

import httpx
import pytest
import pytest_asyncio

from tomodachi_testcontainers import DockerContainer, LocalStackContainer, TomodachiContainer
from tomodachi_testcontainers.clients import SNSSQSTestClient


@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def _create_topics_and_queues(localstack_snssqs_tc: SNSSQSTestClient) -> None:
    await localstack_snssqs_tc.subscribe_to(topic="customer--created", queue="customer--created")
    await localstack_snssqs_tc.subscribe_to(topic="order--created", queue="customer--order-created")


@pytest_asyncio.fixture(loop_scope="session", autouse=True)
async def _purge_queues_on_teardown(localstack_snssqs_tc: SNSSQSTestClient) -> AsyncGenerator[None, None]:
    yield
    await localstack_snssqs_tc.purge_queue("customer--created")
    await localstack_snssqs_tc.purge_queue("customer--order-created")


@pytest.fixture(scope="session")
def tomodachi_container(
    testcontainer_image: str,
    localstack_container: LocalStackContainer,
    _create_topics_and_queues: None,
) -> Generator[DockerContainer, None, None]:
    with (
        TomodachiContainer(testcontainer_image)
        .with_env("AWS_REGION", "us-east-1")
        .with_env("AWS_ACCESS_KEY_ID", "testing")
        .with_env("AWS_SECRET_ACCESS_KEY", "testing")
        .with_env("AWS_SQS_MAX_RECEIVE_COUNT", "1")
        .with_env("AWS_SQS_VISIBILITY_TIMEOUT", "3")
        .with_env("AWS_SNS_ENDPOINT_URL", localstack_container.get_internal_url())
        .with_env("AWS_SQS_ENDPOINT_URL", localstack_container.get_internal_url())
        .with_env("AWS_DYNAMODB_ENDPOINT_URL", localstack_container.get_internal_url())
        .with_env("DYNAMODB_TABLE_NAME", "autotest-customers")
        .with_command("tomodachi run getting_started/customers/app.py --production")
    ) as container:
        yield container


@pytest_asyncio.fixture(scope="module", loop_scope="session")
async def http_client(tomodachi_container: TomodachiContainer) -> AsyncGenerator[httpx.AsyncClient, None]:
    async with httpx.AsyncClient(base_url=tomodachi_container.get_external_url()) as client:
        yield client

Testing for published messages

Let's test the first scenario - a customer creates a new account by calling the POST /customer endpoint, and a new CustomerCreatedEvent is published to notify surrounding systems that a new customer has registered.

First, the test calls the POST /customer endpoint and creates a new customer. To test that the CustomerCreatedEvent has been published, the test attempts to retrieve a message from the customer--created queue using the SNSSQSTestClient provided by Tomodachi Testcontainers for easier testing of AWS SNS/SQS pub/sub with the Tomodachi framework. The SNSSQSTestClient is accessed with the localstack_snssqs_tc fixture.

Danger

This test has a problem - since the message bus is asynchronous, it's not guaranteed that the CustomerCreatedEvent will be immediately visible to consumers. It takes some time from the moment the message is published until it's visible to consumers. Therefore, this test is flaky - no messages might be visible yet, so the test can randomly fail.

tests/test_app.py
import httpx
import pytest
from tomodachi.envelope.json_base import JsonBase

from tomodachi_testcontainers.clients import SNSSQSTestClient


@pytest.mark.xfail(reason="CustomerCreatedEvent is emitted asynchronously")
@pytest.mark.asyncio(loop_scope="session")
async def test_customer_created_event_emitted(
    http_client: httpx.AsyncClient,
    localstack_snssqs_tc: SNSSQSTestClient,
) -> None:
    # Act
    response = await http_client.post("/customer", json={"name": "John Doe"})
    body = response.json()
    customer_id = body["customer_id"]
    assert response.status_code == 200

    # Assert
    events = await localstack_snssqs_tc.receive("customer--created", JsonBase, dict[str, str])
    assert len(events) == 1
    assert events[0].payload == {
        "customer_id": customer_id,
        "name": "John Doe",
    }

To fix the test, we need to wait until the message becomes visible. The easiest solution is to sleep for a fixed time between the customer creation and the attempt to receive the message.

Danger

This approach works; however, tests will become slow on a larger scale due to unnecessary waiting. For example, in a test suite of 60 test cases, there'll be an additional 60 seconds of waiting.

tests/test_app.py
import asyncio

import httpx
import pytest
from tomodachi.envelope.json_base import JsonBase

from tomodachi_testcontainers.clients import SNSSQSTestClient


@pytest.mark.asyncio(loop_scope="session")
async def test_customer_created_event_emitted(
    http_client: httpx.AsyncClient,
    localstack_snssqs_tc: SNSSQSTestClient,
) -> None:
    # Act
    response = await http_client.post("/customer", json={"name": "John Doe"})
    body = response.json()
    customer_id = body["customer_id"]
    assert response.status_code == 200

    # Assert
    await asyncio.sleep(1)  # Wait for the message to become visible to consumers

    events = await localstack_snssqs_tc.receive("customer--created", JsonBase, dict[str, str])
    assert len(events) == 1
    assert events[0].payload == {
        "customer_id": customer_id,
        "name": "John Doe",
    }

Using Asynchronous Probing for testing asynchronous systems

Using asynchronous probing (sampling) is better than sleeping for a fixed time. The idea of asynchronous probing is continuously testing for a condition in short time intervals.

Tomodachi Testcontainers provides a probe_until function for asynchronous probing. The probe_until receives a probe function. The probe is continuously invoked in probe_interval (defaults to 0.1 second) until it finishes without exceptions. When the stop_after timeout is reached (defaults to 3.0 seconds), the probing is stopped, and the last probe's exception is raised.

In our test, we define the probe function - _customer_created_event_emitted. The probe reads messages from the SQS queue and expects to find one message. If no messages are returned, the probe will fail with ValueError and will be retried in probe_interval. When the probe receives one message from the SQS queue, it's returned from the probe to the main test body.

Note

A probe must contain an assertion, e.g., a check that exactly one message is received from the queue. In this example, the assertion is the list unpacking operation [event] = localstack_snssqs_tc.receive(...). It could also be events = localstack_snssqs_tc.receive(...); assert len(events) == 1.

tests/test_app.py
import httpx
import pytest
from tomodachi.envelope.json_base import JsonBase

from tomodachi_testcontainers.async_probes import probe_until
from tomodachi_testcontainers.clients import SNSSQSTestClient


@pytest.mark.asyncio(loop_scope="session")
async def test_customer_created_event_emitted(
    http_client: httpx.AsyncClient,
    localstack_snssqs_tc: SNSSQSTestClient,
) -> None:
    # Act
    response = await http_client.post("/customer", json={"name": "John Doe"})
    body = response.json()
    customer_id = body["customer_id"]

    # Assert
    async def _customer_created_event_emitted() -> dict[str, str]:
        [event] = await localstack_snssqs_tc.receive("customer--created", JsonBase, dict[str, str])
        return event.payload

    event = await probe_until(_customer_created_event_emitted, probe_interval=0.1, stop_after=3.0)
    assert event == {
        "customer_id": customer_id,
        "name": "John Doe",
    }

Asynchronous probing reduces wait time

The benefit of asynchronous probing is minimal wait time - as soon as the message is received, the test stops waiting. This way, we ensure fast test runtime.

The probe_until is inspired by Awaitility and busypie - read more about testing asynchronous systems in their documentation.

Probing for no visible effect

Sometimes, we'd like to test that a system doesn't publish a specific message. For example, let's test that a message consumer implements the idempotent consumer pattern - it performs an operation only once and discards duplicate messages.

There are several ways to test this:

  • Publish two duplicate messages and probe that exactly one outcome message has been published.
  • Publish a single message and assert that an outcome message has been published. Then, publish the duplicate message and probe_during_interval that the outcome message is not published for, e.g., 3 seconds.

The downside of the second approach is that it introduces extra wait time for the tests; however, sometimes, it's a helpful technique.

Testing asynchronous message consumers

Let's test that our Customer app consumes OrderCreatedEvents and associates new orders with existing customers.

  • In the arrange step, we create a new customer.

  • Then, in the act step, we publish two SNS messages to the order--created topic with predefined order identifiers.

  • In the assert step, we probe the GET /customer/<id> API until two messages are consumed - the two orders are associated with the customer.

Since the message consumer (Customer app) is asynchronous, it takes time for messages to be picked up and processed. Therefore, using asynchronous probing, we continuously check the system's state until all published messages are consumed.

tests/test_app.py
import httpx
import pytest
from tomodachi.envelope.json_base import JsonBase

from tomodachi_testcontainers.async_probes import probe_until
from tomodachi_testcontainers.clients import SNSSQSTestClient


@pytest.mark.asyncio(loop_scope="session")
async def test_register_created_order(
    http_client: httpx.AsyncClient,
    localstack_snssqs_tc: SNSSQSTestClient,
) -> None:
    # Arrange
    response = await http_client.post("/customer", json={"name": "John Doe"})
    customer_id = response.json()["customer_id"]
    assert response.status_code == 200

    # Act
    order_ids = ["6c403295-2755-4178-a4f1-e3b698927971", "c8bb390a-71f4-4e8f-8879-c92261b0e18e"]
    for order_id in order_ids:
        await localstack_snssqs_tc.publish(
            topic="order--created",
            data={"order_id": order_id, "customer_id": customer_id},
            envelope=JsonBase,
        )

    # Assert
    async def _new_orders_associated_with_customer() -> None:
        response = await http_client.get(f"/customer/{customer_id}")

        assert response.status_code == 200
        assert response.json() == {
            "customer_id": customer_id,
            "name": "John Doe",
            "orders": [
                {"order_id": order_ids[0]},
                {"order_id": order_ids[1]},
            ],
        }

    await probe_until(_new_orders_associated_with_customer)

Testing error-handling with dead-letter queues

When a consumer cannot process a message, it's a good practice to move it to a dead-letter queue (DLQ) for further troubleshooting. Otherwise, a faulty message can be lost or retried forever.

The handle_order_created message consumer is configured to move faulty messages to the customer--order-created--dlq DLQ. In Tomodachi framework, it's necessary to raise the AWSSNSSQSInternalServiceError to retry a message; otherwise, it's deleted from a queue.

In AWS SQS, the visibility_timeout specifies when a consumed message becomes visible again to other consumers. By default, it's 30 seconds, so our test would have to wait 30 seconds to receive a message from the DLQ - it's too long for a single test. To speed up the test, in the autotest environment, we set the environment variable AWS_SQS_VISIBILITY_TIMEOUT to something smaller, e.g., 3 seconds, and decrease number of retries to 1 with the AWS_SQS_MAX_RECEIVE_COUNT environment variable. The setting should be no less than the default 30 seconds in a production environment.

src/app.py
class Service(tomodachi.Service):
    ...

    @tomodachi.aws_sns_sqs(
        "order--created",
        queue_name="customer--order-created",
        dead_letter_queue_name="customer--order-created--dlq",
        max_receive_count=int(os.getenv("AWS_SQS_MAX_RECEIVE_COUNT", 3)),
        message_envelope=JsonBase,
        visibility_timeout=int(os.getenv("AWS_SQS_VISIBILITY_TIMEOUT", 30)),
    )
    async def handle_order_created(self, data: dict) -> None:
        try:
            event = OrderCreatedEvent.from_dict(data)
            await self._repository.add_order(event)
        except Exception as e:
            raise AWSSNSSQSInternalServiceError from e

Let's test that the OrderCreatedEvent message is moved to the DLQ if the given customer_id is not found in the customer's database. Due to message retries and visibility_timeout, we set the async probe's stop_after value to a higher value, e.g., 10 seconds.

tests/test_app.py
import uuid

import pytest
from tomodachi.envelope.json_base import JsonBase

from tomodachi_testcontainers.async_probes import probe_until
from tomodachi_testcontainers.clients import SNSSQSTestClient


@pytest.mark.usefixtures("tomodachi_container")
@pytest.mark.asyncio(loop_scope="session")
async def test_customer_not_found_for_newly_created_order(localstack_snssqs_tc: SNSSQSTestClient) -> None:
    # Arrange
    customer_id = str(uuid.uuid4())
    order_id = str(uuid.uuid4())

    # Act
    await localstack_snssqs_tc.publish(
        topic="order--created",
        data={"order_id": order_id, "customer_id": customer_id},
        envelope=JsonBase,
    )

    # Assert
    async def _order_created_event_moved_to_dlq() -> dict[str, str]:
        [event] = await localstack_snssqs_tc.receive("customer--order-created--dlq", JsonBase, dict[str, str])
        return event.payload

    event = await probe_until(_order_created_event_moved_to_dlq, stop_after=10.0)
    assert event == {"order_id": order_id, "customer_id": customer_id}

Other application tests

For completeness, let's add the customer application's HTTP endpoint tests.

tests/test_app.py
import uuid
from unittest import mock

import httpx
import pytest


@pytest.mark.asyncio(loop_scope="session")
async def test_customer_not_found(http_client: httpx.AsyncClient) -> None:
    customer_id = uuid.uuid4()
    response = await http_client.get(f"/customer/{customer_id}")

    assert response.status_code == 404
    assert response.json() == {"error": "CUSTOMER_NOT_FOUND"}


@pytest.mark.asyncio(loop_scope="session")
async def test_created_and_get_customer(http_client: httpx.AsyncClient) -> None:
    response = await http_client.post("/customer", json={"name": "John Doe"})
    body = response.json()
    customer_id = body["customer_id"]
    assert response.status_code == 200
    assert body == {
        "customer_id": mock.ANY,
        "name": "John Doe",
        "orders": [],
    }

    response = await http_client.get(f"/customer/{customer_id}")
    assert response.status_code == 200
    assert response.json() == {
        "customer_id": customer_id,
        "name": "John Doe",
        "orders": [],
    }

Summary

Testing asynchronous systems might initially seem complicated due to a different programming and mental mode - operations are not performed immediately; they need to be awaited. However, testing applications that communicate with other systems through an asynchronous message bus is pretty simple because the message bus decouples the applications from each other. To test an app in isolation, we should publish messages in the correct format to the message bus and wait until the app consumes them and changes its state. There's no need to configure additional mocks or external apps. The asynchronous probing technique makes awaiting asynchronous operations easy and fast - it continuously probes the application for a state change.

As with testing apps that communicate with other systems with synchronous APIs (Testing Applications with Collaborator Services), we can't be sure that the same message format is used in production as we're using for testing. It might be necessary to add additional contract tests to ensure the correctness of the test message format. Pact, a contract testing tool, supports testing contracts in event-driven systems.

References