1. Overview

In this tutorial, we’ll explore how to use Amazon’s SQS (Simple Queue Service) using the Java SDK.

2. Prerequisites

The Maven dependencies, AWS account settings, and client connection needed to use the Amazon AWS SDK for SQS are the same as in this article here.

Assuming we’ve created an instance of AWSCredentials, as described in the previous article, we can go ahead and create our SQS client:

SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(ProfileCredentialsProvider.create())
    .build();

3. Creating Queues

Once we’ve set up our SQS client, creating queues is fairly straightforward.

3.1. Creating a Standard Queue

Let’s see how we can create a Standard Queue. To do this, we’ll need to create an instance of CreateQueueRequest:

CreateQueueRequest createStandardQueueRequest = CreateQueueRequest.builder()
    .queueName(STANDARD_QUEUE_NAME)
    .build();

sqsClient.createQueue(createStandardQueueRequest);

3.2. Creating a FIFO Queue

Creating a FIFO is similar to creating a Standard Queue. We’ll still use an instance of CreateQueueRequest, as we did previously. Only this time, we’ll have to pass in queue attributes, and set the FifoQueue attribute to true:

Map<QueueAttributeName, String> queueAttributes = new HashMap<>();
queueAttributes.put(QueueAttributeName.FIFO_QUEUE, "true");
queueAttributes.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true");

CreateQueueRequest createFifoQueueRequest = CreateQueueRequest.builder()
    .queueName(FIFO_QUEUE_NAME)
    .attributes(queueAttributes)
    .build();

sqsClient.createQueue(createFifoQueueRequest);

4. Posting Messages to Queues

Once we’ve got our queues set up, we can start sending messages.

4.1. Posting a Message to a Standard Queue

To send messages to a standard queue, we’ll have to create an instance of SendMessageRequest.

Then we attach a map of message attributes to this request:

Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
MessageAttributeValue messageAttributeValue = MessageAttributeValue.builder()
    .stringValue("This is an attribute")
    .dataType("String")
    .build();

messageAttributes.put("AttributeOne", messageAttributeValue);

SendMessageRequest sendMessageStandardQueue = SendMessageRequest.builder()
    .queueUrl(standardQueueUrl)
    .messageBody("A simple message.")
    .delaySeconds(30)
    .messageAttributes(messageAttributes)
    .build();

sqsClient.sendMessage(sendMessageStandardQueue);

The d**elaySeconds() specifies after how long the message should arrive on the queue.

4.2. Posting a Message to a FIFO Queue

The only difference, in this case, is that we’ll have to specify the group to which the message belongs:

SendMessageRequest sendMessageFifoQueue = SendMessageRequest.builder()
    .queueUrl(fifoQueueUrl)
    .messageBody("FIFO Queue")
    .messageGroupId("baeldung-group-1")
    .messageAttributes(messageAttributes)
    .build();

As you can see in the code example above, we specify the group by using messageGroupId().

4.3. Posting Multiple Messages to a Queue

We can also post multiple messages to a queue, using a single request. We’ll create a list of SendMessageBatchRequestEntry which we’ll send using an instance of SendMessageBatchRequest:

List<SendMessageBatchRequestEntry> messageEntries = new ArrayList<>();
SendMessageBatchRequestEntry messageBatchRequestEntry1 = SendMessageBatchRequestEntry.builder()
    .id("id-1")
    .messageBody("batch-1")
    .messageGroupId("baeldung-group-1")
    .build();

SendMessageBatchRequestEntry messageBatchRequestEntry2 = SendMessageBatchRequestEntry.builder()
    .id("id-2")
    .messageBody("batch-2")
    .messageGroupId("baeldung-group-1")
    .build();

messageEntries.add(messageBatchRequestEntry1);
messageEntries.add(messageBatchRequestEntry2);

SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
    .queueUrl(fifoQueueUrl)
    .entries(messageEntries)
    .build();

sqsClient.sendMessageBatch(sendMessageBatchRequest);

5. Reading Messages from Queues

We can receive messages from our queues by invoking the receiveMessage() method on an instance of ReceiveMessageRequest:

ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
    .waitTimeSeconds(10)
    .maxNumberOfMessages(10)
    .build();

List<Message> sqsMessages = sqsClient.receiveMessage(receiveMessageRequest)
    .messages();

Using m**axNumberOfMessages(), we specify how many messages to get from the queue — although it should be noted that the maximum is 10.

The method w**aitTimeSeconds() enables long-polling. Long polling is a way to limit the number of receive message requests we send to SQS. 

Simply put, this means that we’ll wait up to the specified number of seconds to retrieve a message. If there are no messages in the queue for that duration, then the request will return empty. If a message arrives on the queue during that time, it will be returned.

We can get the attributes and body of a given message:

sqsMessages.get(0).attributes();
sqsMessages.get(0).body();

6. Deleting a Message from a Queue

To delete a message, we’ll use a DeleteMessageRequest:

DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
    .queueUrl(fifoQueueUrl)
    .receiptHandle(sqsMessages.get(0).receiptHandle())
    .build();

sqsClient.deleteMessage(deleteMessageRequest);

7. Dead Letter Queues

A dead letter queue must be of the same type as its base queue — it must be FIFO if the base queue is FIFO, and standard if the base queue is standard. For this example, we’ll use a standard queue.

The first thing we need to do is to create what will become our dead letter queue:

CreateQueueRequest createDeadLetterQueueRequest = CreateQueueRequest.builder()
    .queueName(DEAD_LETTER_QUEUE_NAME)
    .build();

String deadLetterQueueUrl = sqsClient.createQueue(createDeadLetterQueueRequest).queueUrl();

Next, we’ll get our newly created queue’s ARN (Amazon Resource Name):

GetQueueAttributesRequest getQueueAttributesRequest = GetQueueAttributesRequest.builder()
    .queueUrl(deadLetterQueueUrl)
    .attributeNames(QueueAttributeName.QUEUE_ARN)
    .build();

GetQueueAttributesResponse deadLetterQueueAttributes = sqsClient.getQueueAttributes(getQueueAttributesRequest);

Finally, we set this newly created queue to be our original standard queue’s dead letter queue:

Map<QueueAttributeName, String> attributes = new HashMap<>();
attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
    + deadLetterQueueARN + "\"}");

SetQueueAttributesRequest queueAttributesRequest = SetQueueAttributesRequest.builder()
    .queueUrl(standardQueueUrl)
    .attributes(attributes)
    .build();

sqsClient.setQueueAttributes(queueAttributesRequest);

The JSON packet we set in the attributesEntry() method when building our SetQueueAttributesRequest instance contains the information we need: the maxReceiveCount is 2, which means that if a message is received this many times, it’s assumed to haven’t been processed correctly, and is sent to our dead letter queue.

The deadLetterTargetArn attribute points our standard queue to our newly created dead letter queue.

8. Monitoring

We can check how many messages are currently in a given queue, and how many are in flight with the SDK. First, we’ll need to create a GetQueueAttributesRequest. 

From there we’ll check the state of the queue:

GetQueueAttributesRequest getQueueAttributesRequestForMonitoring = GetQueueAttributesRequest.builder()
    .queueUrl(standardQueueUrl)
    .build();

GetQueueAttributesResponse attributesResponse = sqsClient.getQueueAttributes(getQueueAttributesRequestForMonitoring);
System.out.println(String.format("The number of messages on the queue: %s", attributesResponse.attributes()
    .get("ApproximateNumberOfMessages")));
System.out.println(String.format("The number of messages in flight: %s", attributesResponse.attributes()
    .get("ApproximateNumberOfMessagesNotVisible")));

More in-depth monitoring can be achieved using Amazon Cloud Watch.

9. Conclusion

In this article, we’ve seen how to manage SQS queues using the AWS Java SDK.

As usual, all code samples used in the article can be found over on GitHub.


« 上一篇: Java Clock Class 使用
» 下一篇: Spring @Order 注解