Test clients for testing interactions with external systems, like AWS SNS/SQS.
SNSSQSTestClient
SNSSQSTestClient(sns_client, sqs_client)
Provides common methods for testing AWS SNS/SQS interactions with Tomodachi framework.
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| def __init__(self, sns_client: SNSClient, sqs_client: SQSClient) -> None:
self._sns_client = sns_client
self._sqs_client = sqs_client
|
create_topic
async
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def create_topic(self, topic: str) -> TopicARNType:
with suppress(TopicDoesNotExistError):
return await self.get_topic_arn(topic)
topic_attributes: dict[str, str] = {}
if topic.endswith(".fifo"):
topic_attributes.update({
"FifoTopic": "true",
"ContentBasedDeduplication": "false",
})
create_topic_response = await self._sns_client.create_topic(Name=topic, Attributes=topic_attributes)
return create_topic_response["TopicArn"]
|
create_queue
async
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def create_queue(self, queue: str) -> QueueARNType:
with suppress(QueueDoesNotExistError):
return await self.get_queue_arn(queue)
queue_attributes: dict[QueueAttributeNameType, str] = {}
if queue.endswith(".fifo"):
queue_attributes.update({
"FifoQueue": "true",
"ContentBasedDeduplication": "false",
})
await self._sqs_client.create_queue(QueueName=queue, Attributes=queue_attributes)
queue_attributes = await self.get_queue_attributes(queue, attributes=["QueueArn"])
return queue_attributes["QueueArn"]
|
subscribe_to
async
subscribe_to(topic, queue, subscribe_attributes=None)
Subscribe a SQS queue to a SNS topic; create the topic and queue if they don't exist.
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def subscribe_to(
self,
topic: str,
queue: str,
subscribe_attributes: dict[str, str] | None = None,
) -> None:
"""Subscribe a SQS queue to a SNS topic; create the topic and queue if they don't exist."""
topic_arn = await self.create_topic(topic)
queue_arn = await self.create_queue(queue)
await self._sns_client.subscribe(
TopicArn=topic_arn,
Protocol="sqs",
Endpoint=queue_arn,
Attributes=subscribe_attributes or {},
)
|
receive
async
receive(queue, envelope, message_type, max_messages=10)
Receive messages from SQS queue.
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def receive(
self,
queue: str,
envelope: TomodachiSNSSQSEnvelope,
message_type: type[MessageType],
max_messages: int = 10,
) -> list[SQSMessage[MessageType]]:
"""Receive messages from SQS queue."""
queue_url = await self.get_queue_url(queue)
received_messages_response = await self._sqs_client.receive_message(
QueueUrl=queue_url, MaxNumberOfMessages=max_messages, MessageAttributeNames=["All"]
)
sqs_messages: list[SQSMessage[MessageType]] = []
for received_message in received_messages_response.get("Messages", []):
payload = await self._parse_received_message_payload(envelope, message_type, received_message)
message_attributes = self._parse_received_message_attributes(received_message)
sqs_messages.append(SQSMessage(payload, message_attributes))
await self._sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=received_message["ReceiptHandle"])
return sqs_messages
|
publish
async
publish(
topic,
data,
envelope,
message_attributes=None,
message_deduplication_id=None,
message_group_id=None,
)
Publish message to SNS topic.
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def publish(
self,
topic: str,
data: Any,
envelope: TomodachiSNSSQSEnvelope,
message_attributes: dict[str, SNSMessageAttributeValueTypeDef] | None = None,
message_deduplication_id: str | None = None,
message_group_id: str | None = None,
) -> None:
"""Publish message to SNS topic."""
topic_arn = await self.get_topic_arn(topic)
message = await envelope.build_message(service={}, topic=topic, data=data)
sns_publish_kwargs: dict[str, Any] = {}
if message_attributes:
sns_publish_kwargs["MessageAttributes"] = message_attributes
if message_deduplication_id:
sns_publish_kwargs["MessageDeduplicationId"] = message_deduplication_id
if message_group_id:
sns_publish_kwargs["MessageGroupId"] = message_group_id
await self._sns_client.publish(TopicArn=topic_arn, Message=message, **sns_publish_kwargs)
|
send
async
send(
queue,
data,
envelope,
message_attributes=None,
message_deduplication_id=None,
message_group_id=None,
)
Send message to SQS queue.
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def send(
self,
queue: str,
data: Any,
envelope: TomodachiSNSSQSEnvelope,
message_attributes: dict[str, SQSMessageAttributeValueTypeDef] | None = None,
message_deduplication_id: str | None = None,
message_group_id: str | None = None,
) -> None:
"""Send message to SQS queue."""
queue_url = await self.get_queue_url(queue)
message = await envelope.build_message(service={}, topic="", data=data)
sqs_send_kwargs: dict[str, Any] = {}
if message_attributes:
sqs_send_kwargs["MessageAttributes"] = message_attributes
if message_deduplication_id:
sqs_send_kwargs["MessageDeduplicationId"] = message_deduplication_id
if message_group_id:
sqs_send_kwargs["MessageGroupId"] = message_group_id
await self._sqs_client.send_message(
QueueUrl=queue_url, MessageBody=json.dumps({"Message": message}), **sqs_send_kwargs
)
|
get_topic_arn
async
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def get_topic_arn(self, topic: str) -> str:
list_topics_response = await self._sns_client.list_topics()
topic_arn = next((v["TopicArn"] for v in list_topics_response["Topics"] if v["TopicArn"].endswith(topic)), None)
if not topic_arn:
raise TopicDoesNotExistError(topic)
return topic_arn
|
get_topic_attributes
async
get_topic_attributes(topic)
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def get_topic_attributes(self, topic: str) -> dict[str, str]:
topic_arn = await self.get_topic_arn(topic)
get_topic_attributes_response = await self._sns_client.get_topic_attributes(TopicArn=topic_arn)
return get_topic_attributes_response["Attributes"]
|
get_queue_arn
async
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def get_queue_arn(self, queue: str) -> str:
attributes = await self.get_queue_attributes(queue, attributes=["QueueArn"])
return attributes["QueueArn"]
|
get_queue_url
async
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def get_queue_url(self, queue: str) -> str:
try:
get_queue_response = await self._sqs_client.get_queue_url(QueueName=queue)
return get_queue_response["QueueUrl"]
except ClientError as e:
raise QueueDoesNotExistError(queue) from e
|
get_queue_attributes
async
get_queue_attributes(queue, attributes)
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def get_queue_attributes(
self, queue: str, attributes: list[QueueAttributeFilterType]
) -> dict[QueueAttributeNameType, str]:
queue_url = await self.get_queue_url(queue)
get_queue_attributes_response = await self._sqs_client.get_queue_attributes(
QueueUrl=queue_url, AttributeNames=attributes
)
return get_queue_attributes_response["Attributes"]
|
purge_queue
async
Delete all messages from SQS queue.
Source code in src/tomodachi_testcontainers/clients/snssqs.py
| async def purge_queue(self, queue: str) -> None:
"""Delete all messages from SQS queue."""
queue_url = await self.get_queue_url(queue)
await self._sqs_client.purge_queue(QueueUrl=queue_url)
|