Testing Asynchronous Systems
In the previous section, we tested an application that directly depends on another application's HTTP API; we mocked its public API with an HTTP mock server. This section will explore another scenario - testing an application that depends on other services through asynchronous message bus.
Example: Event-Driven Customer Management Application
Let's test an example Customer Management Application.
A user can create a new account in the Customer application and place new orders in the Order application.
The Customer and Order apps communicate through an asynchronous message bus, in this example, AWS SNS/SQS.
When the customer creates a new account, the Customer app publishes a CustomerCreated
event to the message bus.
When the customer creates a new order, the Order app publishes an OrderCreated
event to the message bus.
The Customer application listens for OrderCreated
events and saves a list of orders that belong to the customer.
The apps communicate indirectly through the message bus
A message bus is a decoupling point. The Customer and Order apps communicate indirectly through the message bus. To test the Customer app in isolation, we don't need to run a real or mocked version of the Order app - We only need to publish the Order app's events to the message bus to simulate interactions with the Customer app.
Creating the customer management application
Let's first create the example Customer application.
Note
Since this example involves setting up many dependencies like AWS DynamoDB and AWS SNS/SQS, the setup code is omitted for brevity. Find the complete example's code in GitHub.
The app has three endpoints:
- HTTP
POST /customer
- for new customer creation, publishesCustomerCreatedEvent
to AWS SNS. - HTTP
GET /customer/<id>
- for querying existing customers. - AWS SQS consumer listening for
OrderCreatedEvents
- saves the received Order identifier on an existing customer's object.
class Service(tomodachi.Service):
...
@tomodachi.http("POST", r"/customer/?")
async def create_customer(self, request: web.Request) -> web.Response:
data = await request.json()
customer = Customer(
customer_id=str(uuid.uuid4()),
name=data["name"],
orders=[],
)
event = CustomerCreatedEvent.from_customer(customer)
await self._repository.create(customer)
await tomodachi.aws_sns_sqs_publish(
self,
data=event.to_dict(),
topic="customer--created",
message_envelope=JsonBase,
)
return web.json_response(customer.to_dict())
@tomodachi.http("GET", r"/customer/(?P<customer_id>[^/]+?)/?")
async def get_customer(self, request: web.Request, customer_id: str) -> web.Response:
customer = await self._repository.get(customer_id)
if not customer:
return web.json_response({"error": "CUSTOMER_NOT_FOUND"}, status=404)
return web.json_response(customer.to_dict())
@tomodachi.aws_sns_sqs(
"order--created",
queue_name="customer--order-created",
dead_letter_queue_name="customer--order-created--dlq",
max_receive_count=int(os.getenv("AWS_SQS_MAX_RECEIVE_COUNT", 3)),
message_envelope=JsonBase,
visibility_timeout=int(os.getenv("AWS_SQS_VISIBILITY_TIMEOUT", 30)),
)
async def handle_order_created(self, data: dict) -> None:
try:
event = OrderCreatedEvent.from_dict(data)
await self._repository.add_order(event)
except Exception as e:
raise AWSSNSSQSInternalServiceError from e
Configuring Testcontainers
The Testcontainer's configuration in this example is involved because we must configure two infrastructure dependencies - AWS DynamoDB and AWS SNS/SQS.
-
The fixture
_create_topics_and_queues
creates necessary pub/sub topics and queues for the Customer and Order app to communicate through the AWS SNS/SQS message bus. -
The fixture
_purge_queues_on_teardown
deletes all messages from the queues after every test to ensure test isolation - that messages don't leak between tests. -
The
tomodachi_container
fixture usesLocalStackContainer
to emulate AWS DynamoDB and AWS SNS/SQS - in the same way as in the Testing Applications with Backing Services guide. TheTomodachiContainer
is configured with the LocalStack container's environment variables. The_create_topics_and_queues
fixture is used in thetomodachi_container
fixture to ensure that topics and queues are created before the Customer application starts.
from typing import AsyncGenerator, Generator
import httpx
import pytest
import pytest_asyncio
from tomodachi_testcontainers import DockerContainer, LocalStackContainer, TomodachiContainer
from tomodachi_testcontainers.clients import SNSSQSTestClient
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def _create_topics_and_queues(localstack_snssqs_tc: SNSSQSTestClient) -> None:
await localstack_snssqs_tc.subscribe_to(topic="customer--created", queue="customer--created")
await localstack_snssqs_tc.subscribe_to(topic="order--created", queue="customer--order-created")
@pytest_asyncio.fixture(loop_scope="session", autouse=True)
async def _purge_queues_on_teardown(localstack_snssqs_tc: SNSSQSTestClient) -> AsyncGenerator[None, None]:
yield
await localstack_snssqs_tc.purge_queue("customer--created")
await localstack_snssqs_tc.purge_queue("customer--order-created")
@pytest.fixture(scope="session")
def tomodachi_container(
testcontainer_image: str,
localstack_container: LocalStackContainer,
_create_topics_and_queues: None,
) -> Generator[DockerContainer, None, None]:
with (
TomodachiContainer(testcontainer_image)
.with_env("AWS_REGION", "us-east-1")
.with_env("AWS_ACCESS_KEY_ID", "testing")
.with_env("AWS_SECRET_ACCESS_KEY", "testing")
.with_env("AWS_SQS_MAX_RECEIVE_COUNT", "1")
.with_env("AWS_SQS_VISIBILITY_TIMEOUT", "3")
.with_env("AWS_SNS_ENDPOINT_URL", localstack_container.get_internal_url())
.with_env("AWS_SQS_ENDPOINT_URL", localstack_container.get_internal_url())
.with_env("AWS_DYNAMODB_ENDPOINT_URL", localstack_container.get_internal_url())
.with_env("DYNAMODB_TABLE_NAME", "autotest-customers")
.with_command("tomodachi run getting_started/customers/app.py --production")
) as container:
yield container
@pytest_asyncio.fixture(scope="module", loop_scope="session")
async def http_client(tomodachi_container: TomodachiContainer) -> AsyncGenerator[httpx.AsyncClient, None]:
async with httpx.AsyncClient(base_url=tomodachi_container.get_external_url()) as client:
yield client
Testing for published messages
Let's test the first scenario - a customer creates a new account by calling the POST /customer
endpoint,
and a new CustomerCreatedEvent
is published to notify surrounding systems that a new customer has registered.
First, the test calls the POST /customer
endpoint and creates a new customer. To test that the CustomerCreatedEvent
has been published,
the test attempts to retrieve a message from the customer--created
queue using the SNSSQSTestClient
provided by Tomodachi Testcontainers
for easier testing of AWS SNS/SQS pub/sub with the Tomodachi framework.
The SNSSQSTestClient
is accessed with the localstack_snssqs_tc
fixture.
Danger
This test has a problem - since the message bus is asynchronous, it's not guaranteed that the CustomerCreatedEvent
will be immediately
visible to consumers. It takes some time from the moment the message is published until it's visible to consumers.
Therefore, this test is flaky - no messages might be visible yet, so the test can randomly fail.
import httpx
import pytest
from tomodachi.envelope.json_base import JsonBase
from tomodachi_testcontainers.clients import SNSSQSTestClient
@pytest.mark.xfail(reason="CustomerCreatedEvent is emitted asynchronously")
@pytest.mark.asyncio(loop_scope="session")
async def test_customer_created_event_emitted(
http_client: httpx.AsyncClient,
localstack_snssqs_tc: SNSSQSTestClient,
) -> None:
# Act
response = await http_client.post("/customer", json={"name": "John Doe"})
body = response.json()
customer_id = body["customer_id"]
assert response.status_code == 200
# Assert
events = await localstack_snssqs_tc.receive("customer--created", JsonBase, dict[str, str])
assert len(events) == 1
assert events[0].payload == {
"customer_id": customer_id,
"name": "John Doe",
}
To fix the test, we need to wait until the message becomes visible. The easiest solution is to sleep for a fixed time between the customer creation and the attempt to receive the message.
Danger
This approach works; however, tests will become slow on a larger scale due to unnecessary waiting. For example, in a test suite of 60 test cases, there'll be an additional 60 seconds of waiting.
import asyncio
import httpx
import pytest
from tomodachi.envelope.json_base import JsonBase
from tomodachi_testcontainers.clients import SNSSQSTestClient
@pytest.mark.asyncio(loop_scope="session")
async def test_customer_created_event_emitted(
http_client: httpx.AsyncClient,
localstack_snssqs_tc: SNSSQSTestClient,
) -> None:
# Act
response = await http_client.post("/customer", json={"name": "John Doe"})
body = response.json()
customer_id = body["customer_id"]
assert response.status_code == 200
# Assert
await asyncio.sleep(1) # Wait for the message to become visible to consumers
events = await localstack_snssqs_tc.receive("customer--created", JsonBase, dict[str, str])
assert len(events) == 1
assert events[0].payload == {
"customer_id": customer_id,
"name": "John Doe",
}
Using Asynchronous Probing for testing asynchronous systems
Using asynchronous probing (sampling) is better than sleeping for a fixed time. The idea of asynchronous probing is continuously testing for a condition in short time intervals.
Tomodachi Testcontainers provides a probe_until
function for asynchronous probing.
The probe_until
receives a probe function. The probe is continuously invoked in probe_interval
(defaults to 0.1 second
) until it finishes without exceptions.
When the stop_after
timeout is reached (defaults to 3.0
seconds), the probing is stopped, and the last probe's exception is raised.
In our test, we define the probe function - _customer_created_event_emitted
. The probe reads messages from the SQS queue
and expects to find one message. If no messages are returned, the probe will fail with ValueError
and will be retried in probe_interval
.
When the probe receives one message from the SQS queue, it's returned from the probe to the main test body.
Note
A probe must contain an assertion, e.g., a check that exactly one message is received from the queue.
In this example, the assertion is the list unpacking operation [event] = localstack_snssqs_tc.receive(...)
.
It could also be events = localstack_snssqs_tc.receive(...); assert len(events) == 1
.
import httpx
import pytest
from tomodachi.envelope.json_base import JsonBase
from tomodachi_testcontainers.async_probes import probe_until
from tomodachi_testcontainers.clients import SNSSQSTestClient
@pytest.mark.asyncio(loop_scope="session")
async def test_customer_created_event_emitted(
http_client: httpx.AsyncClient,
localstack_snssqs_tc: SNSSQSTestClient,
) -> None:
# Act
response = await http_client.post("/customer", json={"name": "John Doe"})
body = response.json()
customer_id = body["customer_id"]
# Assert
async def _customer_created_event_emitted() -> dict[str, str]:
[event] = await localstack_snssqs_tc.receive("customer--created", JsonBase, dict[str, str])
return event.payload
event = await probe_until(_customer_created_event_emitted, probe_interval=0.1, stop_after=3.0)
assert event == {
"customer_id": customer_id,
"name": "John Doe",
}
Asynchronous probing reduces wait time
The benefit of asynchronous probing is minimal wait time - as soon as the message is received, the test stops waiting. This way, we ensure fast test runtime.
The probe_until
is inspired by
Awaitility and busypie -
read more about testing asynchronous systems in their documentation.
Probing for no visible effect
Sometimes, we'd like to test that a system doesn't publish a specific message. For example, let's test that a message consumer implements the idempotent consumer pattern - it performs an operation only once and discards duplicate messages.
There are several ways to test this:
- Publish two duplicate messages and probe that exactly one outcome message has been published.
- Publish a single message and assert that an outcome message has been published. Then, publish the duplicate
message and
probe_during_interval
that the outcome message is not published for, e.g.,3
seconds.
The downside of the second approach is that it introduces extra wait time for the tests; however, sometimes, it's a helpful technique.
Testing asynchronous message consumers
Let's test that our Customer app consumes OrderCreatedEvents
and associates new orders with existing customers.
-
In the
arrange
step, we create a new customer. -
Then, in the
act
step, we publish two SNS messages to theorder--created
topic with predefined order identifiers. -
In the
assert
step, we probe theGET /customer/<id>
API until two messages are consumed - the two orders are associated with the customer.
Since the message consumer (Customer app) is asynchronous, it takes time for messages to be picked up and processed. Therefore, using asynchronous probing, we continuously check the system's state until all published messages are consumed.
import httpx
import pytest
from tomodachi.envelope.json_base import JsonBase
from tomodachi_testcontainers.async_probes import probe_until
from tomodachi_testcontainers.clients import SNSSQSTestClient
@pytest.mark.asyncio(loop_scope="session")
async def test_register_created_order(
http_client: httpx.AsyncClient,
localstack_snssqs_tc: SNSSQSTestClient,
) -> None:
# Arrange
response = await http_client.post("/customer", json={"name": "John Doe"})
customer_id = response.json()["customer_id"]
assert response.status_code == 200
# Act
order_ids = ["6c403295-2755-4178-a4f1-e3b698927971", "c8bb390a-71f4-4e8f-8879-c92261b0e18e"]
for order_id in order_ids:
await localstack_snssqs_tc.publish(
topic="order--created",
data={"order_id": order_id, "customer_id": customer_id},
envelope=JsonBase,
)
# Assert
async def _new_orders_associated_with_customer() -> None:
response = await http_client.get(f"/customer/{customer_id}")
assert response.status_code == 200
assert response.json() == {
"customer_id": customer_id,
"name": "John Doe",
"orders": [
{"order_id": order_ids[0]},
{"order_id": order_ids[1]},
],
}
await probe_until(_new_orders_associated_with_customer)
Testing error-handling with dead-letter queues
When a consumer cannot process a message, it's a good practice to move it to a dead-letter queue (DLQ) for further troubleshooting. Otherwise, a faulty message can be lost or retried forever.
The handle_order_created
message consumer is configured to move faulty messages to the customer--order-created--dlq
DLQ.
In Tomodachi framework, it's necessary to raise the AWSSNSSQSInternalServiceError
to retry a message; otherwise, it's deleted from a queue.
In AWS SQS, the visibility_timeout
specifies when a consumed message becomes visible again to other consumers.
By default, it's 30 seconds
, so our test would have to wait 30 seconds
to receive a message from the DLQ - it's too long for a single test.
To speed up the test, in the autotest environment, we set the environment variable AWS_SQS_VISIBILITY_TIMEOUT
to something smaller, e.g., 3 seconds
,
and decrease number of retries to 1
with the AWS_SQS_MAX_RECEIVE_COUNT
environment variable.
The setting should be no less than the default 30 seconds
in a production environment.
class Service(tomodachi.Service):
...
@tomodachi.aws_sns_sqs(
"order--created",
queue_name="customer--order-created",
dead_letter_queue_name="customer--order-created--dlq",
max_receive_count=int(os.getenv("AWS_SQS_MAX_RECEIVE_COUNT", 3)),
message_envelope=JsonBase,
visibility_timeout=int(os.getenv("AWS_SQS_VISIBILITY_TIMEOUT", 30)),
)
async def handle_order_created(self, data: dict) -> None:
try:
event = OrderCreatedEvent.from_dict(data)
await self._repository.add_order(event)
except Exception as e:
raise AWSSNSSQSInternalServiceError from e
Let's test that the OrderCreatedEvent
message is moved to the DLQ if the given customer_id
is not found in the customer's database.
Due to message retries and visibility_timeout
, we set the async probe's stop_after
value to a higher value, e.g., 10 seconds
.
import uuid
import pytest
from tomodachi.envelope.json_base import JsonBase
from tomodachi_testcontainers.async_probes import probe_until
from tomodachi_testcontainers.clients import SNSSQSTestClient
@pytest.mark.usefixtures("tomodachi_container")
@pytest.mark.asyncio(loop_scope="session")
async def test_customer_not_found_for_newly_created_order(localstack_snssqs_tc: SNSSQSTestClient) -> None:
# Arrange
customer_id = str(uuid.uuid4())
order_id = str(uuid.uuid4())
# Act
await localstack_snssqs_tc.publish(
topic="order--created",
data={"order_id": order_id, "customer_id": customer_id},
envelope=JsonBase,
)
# Assert
async def _order_created_event_moved_to_dlq() -> dict[str, str]:
[event] = await localstack_snssqs_tc.receive("customer--order-created--dlq", JsonBase, dict[str, str])
return event.payload
event = await probe_until(_order_created_event_moved_to_dlq, stop_after=10.0)
assert event == {"order_id": order_id, "customer_id": customer_id}
Other application tests
For completeness, let's add the customer application's HTTP endpoint tests.
import uuid
from unittest import mock
import httpx
import pytest
@pytest.mark.asyncio(loop_scope="session")
async def test_customer_not_found(http_client: httpx.AsyncClient) -> None:
customer_id = uuid.uuid4()
response = await http_client.get(f"/customer/{customer_id}")
assert response.status_code == 404
assert response.json() == {"error": "CUSTOMER_NOT_FOUND"}
@pytest.mark.asyncio(loop_scope="session")
async def test_created_and_get_customer(http_client: httpx.AsyncClient) -> None:
response = await http_client.post("/customer", json={"name": "John Doe"})
body = response.json()
customer_id = body["customer_id"]
assert response.status_code == 200
assert body == {
"customer_id": mock.ANY,
"name": "John Doe",
"orders": [],
}
response = await http_client.get(f"/customer/{customer_id}")
assert response.status_code == 200
assert response.json() == {
"customer_id": customer_id,
"name": "John Doe",
"orders": [],
}
Summary
Testing asynchronous systems might initially seem complicated due to a different programming and mental mode - operations are not performed immediately; they need to be awaited. However, testing applications that communicate with other systems through an asynchronous message bus is pretty simple because the message bus decouples the applications from each other. To test an app in isolation, we should publish messages in the correct format to the message bus and wait until the app consumes them and changes its state. There's no need to configure additional mocks or external apps. The asynchronous probing technique makes awaiting asynchronous operations easy and fast - it continuously probes the application for a state change.
As with testing apps that communicate with other systems with synchronous APIs (Testing Applications with Collaborator Services), we can't be sure that the same message format is used in production as we're using for testing. It might be necessary to add additional contract tests to ensure the correctness of the test message format. Pact, a contract testing tool, supports testing contracts in event-driven systems.
References
- https://aws.amazon.com/sns/
- https://en.wikipedia.org/wiki/Publish-subscribe_pattern
- http://www.awaitility.org/
- https://github.com/rockem/busypie
- https://github.com/kalaspuff/tomodachi
- https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html
- https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
- https://docs.pact.io/implementation_guides/javascript/docs/messages