Skip to content

Clients module

tomodachi_testcontainers.clients

Test clients for testing interactions with external systems, like AWS SNS/SQS.

SNSSQSTestClient

SNSSQSTestClient(sns_client, sqs_client)

Provides common methods for testing AWS SNS/SQS interactions with Tomodachi framework.

PARAMETER DESCRIPTION
sns_client

TYPE: SNSClient

sqs_client

TYPE: SQSClient

Source code in src/tomodachi_testcontainers/clients/snssqs.py
def __init__(self, sns_client: SNSClient, sqs_client: SQSClient) -> None:
    self._sns_client = sns_client
    self._sqs_client = sqs_client

create_topic async

create_topic(topic)
PARAMETER DESCRIPTION
topic

TYPE: str

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def create_topic(self, topic: str) -> TopicARNType:
    with suppress(TopicDoesNotExistError):
        return await self.get_topic_arn(topic)
    topic_attributes: Dict[str, str] = {}
    if topic.endswith(".fifo"):
        topic_attributes.update(
            {
                "FifoTopic": "true",
                "ContentBasedDeduplication": "false",
            }
        )
    create_topic_response = await self._sns_client.create_topic(Name=topic, Attributes=topic_attributes)
    return create_topic_response["TopicArn"]

create_queue async

create_queue(queue)
PARAMETER DESCRIPTION
queue

TYPE: str

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def create_queue(self, queue: str) -> QueueARNType:
    with suppress(QueueDoesNotExistError):
        return await self.get_queue_arn(queue)
    queue_attributes: Dict[QueueAttributeNameType, str] = {}
    if queue.endswith(".fifo"):
        queue_attributes.update(
            {
                "FifoQueue": "true",
                "ContentBasedDeduplication": "false",
            }
        )
    await self._sqs_client.create_queue(QueueName=queue, Attributes=queue_attributes)
    queue_attributes = await self.get_queue_attributes(queue, attributes=["QueueArn"])
    return queue_attributes["QueueArn"]

subscribe_to async

subscribe_to(topic, queue, subscribe_attributes=None)

Subscribe a SQS queue to a SNS topic; create the topic and queue if they don't exist.

PARAMETER DESCRIPTION
topic

TYPE: str

queue

TYPE: str

subscribe_attributes

TYPE: Optional[Dict[str, str]] DEFAULT: None

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def subscribe_to(
    self,
    topic: str,
    queue: str,
    subscribe_attributes: Optional[Dict[str, str]] = None,
) -> None:
    """Subscribe a SQS queue to a SNS topic; create the topic and queue if they don't exist."""
    topic_arn = await self.create_topic(topic)
    queue_arn = await self.create_queue(queue)
    await self._sns_client.subscribe(
        TopicArn=topic_arn,
        Protocol="sqs",
        Endpoint=queue_arn,
        Attributes=subscribe_attributes or {},
    )

receive async

receive(queue, envelope, message_type, max_messages=10)

Receive messages from SQS queue.

PARAMETER DESCRIPTION
queue

TYPE: str

envelope

TYPE: TomodachiSNSSQSEnvelope

message_type

TYPE: Type[MessageType]

max_messages

TYPE: int DEFAULT: 10

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def receive(
    self,
    queue: str,
    envelope: TomodachiSNSSQSEnvelope,
    message_type: Type[MessageType],
    max_messages: int = 10,
) -> List[SQSMessage[MessageType]]:
    """Receive messages from SQS queue."""
    queue_url = await self.get_queue_url(queue)
    received_messages_response = await self._sqs_client.receive_message(
        QueueUrl=queue_url, MaxNumberOfMessages=max_messages, MessageAttributeNames=["All"]
    )
    sqs_messages: List[SQSMessage[MessageType]] = []
    for received_message in received_messages_response.get("Messages", []):
        payload = await self._parse_received_message_payload(envelope, message_type, received_message)
        message_attributes = self._parse_received_message_attributes(received_message)
        sqs_messages.append(SQSMessage(payload, message_attributes))
        await self._sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=received_message["ReceiptHandle"])
    return sqs_messages

publish async

publish(topic, data, envelope, message_attributes=None, message_deduplication_id=None, message_group_id=None)

Publish message to SNS topic.

PARAMETER DESCRIPTION
topic

TYPE: str

data

TYPE: Any

envelope

TYPE: TomodachiSNSSQSEnvelope

message_attributes

TYPE: Optional[Dict[str, MessageAttributeValueTypeDef]] DEFAULT: None

message_deduplication_id

TYPE: Optional[str] DEFAULT: None

message_group_id

TYPE: Optional[str] DEFAULT: None

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def publish(
    self,
    topic: str,
    data: Any,
    envelope: TomodachiSNSSQSEnvelope,
    message_attributes: Optional[Dict[str, SNSMessageAttributeValueTypeDef]] = None,
    message_deduplication_id: Optional[str] = None,
    message_group_id: Optional[str] = None,
) -> None:
    """Publish message to SNS topic."""
    topic_arn = await self.get_topic_arn(topic)
    message = await envelope.build_message(service={}, topic=topic, data=data)
    sns_publish_kwargs: Dict[str, Any] = {}
    if message_attributes:
        sns_publish_kwargs["MessageAttributes"] = message_attributes
    if message_deduplication_id:
        sns_publish_kwargs["MessageDeduplicationId"] = message_deduplication_id
    if message_group_id:
        sns_publish_kwargs["MessageGroupId"] = message_group_id
    await self._sns_client.publish(TopicArn=topic_arn, Message=message, **sns_publish_kwargs)

send async

send(queue, data, envelope, message_attributes=None, message_deduplication_id=None, message_group_id=None)

Send message to SQS queue.

PARAMETER DESCRIPTION
queue

TYPE: str

data

TYPE: Any

envelope

TYPE: TomodachiSNSSQSEnvelope

message_attributes

TYPE: Optional[Dict[str, MessageAttributeValueTypeDef]] DEFAULT: None

message_deduplication_id

TYPE: Optional[str] DEFAULT: None

message_group_id

TYPE: Optional[str] DEFAULT: None

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def send(
    self,
    queue: str,
    data: Any,
    envelope: TomodachiSNSSQSEnvelope,
    message_attributes: Optional[Dict[str, SQSMessageAttributeValueTypeDef]] = None,
    message_deduplication_id: Optional[str] = None,
    message_group_id: Optional[str] = None,
) -> None:
    """Send message to SQS queue."""
    queue_url = await self.get_queue_url(queue)
    message = await envelope.build_message(service={}, topic="", data=data)
    sqs_send_kwargs: Dict[str, Any] = {}
    if message_attributes:
        sqs_send_kwargs["MessageAttributes"] = message_attributes
    if message_deduplication_id:
        sqs_send_kwargs["MessageDeduplicationId"] = message_deduplication_id
    if message_group_id:
        sqs_send_kwargs["MessageGroupId"] = message_group_id
    await self._sqs_client.send_message(
        QueueUrl=queue_url, MessageBody=json.dumps({"Message": message}), **sqs_send_kwargs
    )

get_topic_arn async

get_topic_arn(topic)
PARAMETER DESCRIPTION
topic

TYPE: str

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def get_topic_arn(self, topic: str) -> str:
    list_topics_response = await self._sns_client.list_topics()
    topic_arn = next((v["TopicArn"] for v in list_topics_response["Topics"] if v["TopicArn"].endswith(topic)), None)
    if not topic_arn:
        raise TopicDoesNotExistError(topic)
    return topic_arn

get_topic_attributes async

get_topic_attributes(topic)
PARAMETER DESCRIPTION
topic

TYPE: str

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def get_topic_attributes(self, topic: str) -> Dict[str, str]:
    topic_arn = await self.get_topic_arn(topic)
    get_topic_attributes_response = await self._sns_client.get_topic_attributes(TopicArn=topic_arn)
    return get_topic_attributes_response["Attributes"]

get_queue_arn async

get_queue_arn(queue)
PARAMETER DESCRIPTION
queue

TYPE: str

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def get_queue_arn(self, queue: str) -> str:
    attributes = await self.get_queue_attributes(queue, attributes=["QueueArn"])
    return attributes["QueueArn"]

get_queue_url async

get_queue_url(queue)
PARAMETER DESCRIPTION
queue

TYPE: str

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def get_queue_url(self, queue: str) -> str:
    try:
        get_queue_response = await self._sqs_client.get_queue_url(QueueName=queue)
        return get_queue_response["QueueUrl"]
    except ClientError as e:
        raise QueueDoesNotExistError(queue) from e

get_queue_attributes async

get_queue_attributes(queue, attributes)
PARAMETER DESCRIPTION
queue

TYPE: str

attributes

TYPE: List[QueueAttributeFilterType]

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def get_queue_attributes(
    self, queue: str, attributes: List[QueueAttributeFilterType]
) -> Dict[QueueAttributeNameType, str]:
    queue_url = await self.get_queue_url(queue)
    get_queue_attributes_response = await self._sqs_client.get_queue_attributes(
        QueueUrl=queue_url, AttributeNames=attributes
    )
    return get_queue_attributes_response["Attributes"]

purge_queue async

purge_queue(queue)

Delete all messages from SQS queue.

PARAMETER DESCRIPTION
queue

TYPE: str

Source code in src/tomodachi_testcontainers/clients/snssqs.py
async def purge_queue(self, queue: str) -> None:
    """Delete all messages from SQS queue."""
    queue_url = await self.get_queue_url(queue)
    await self._sqs_client.purge_queue(QueueUrl=queue_url)