1. Introduction
ElasticMQ is an in-memory AWS SQS (Simple Queue System) compatible message queue built with Scala for embedded or standalone use.
In this tutorial, we’ll go through ElasticMQ orchestration, configuration, and use from a Scala client.
2. ElasticMQ Setup
ElasticMQ can be run standalone, embedded, or using an official docker image.
2.1. ElasticMQ Standalone
To set up a standalone ElasticMQ instance, we can download the ElasticMQ jar application with wget:
$ wget https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.6.5.jar
Then we can start it:
$ java -jar elasticmq-server-1.6.5.jar
11:56:36.506 [main] INFO org.elasticmq.server.Main$ - Starting ElasticMQ server (1.6.5) ...
11:56:39.109 [elasticmq-pekko.actor.default-dispatcher-4] INFO o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
11:56:43.740 [elasticmq-pekko.actor.default-dispatcher-4] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
11:56:44.153 [main] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
11:56:44.326 [elasticmq-pekko.actor.default-dispatcher-4] INFO o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9325
11:56:44.344 [main] INFO org.elasticmq.server.Main$ - === ElasticMQ server (1.6.5) started in 10093 ms ===
2.2. ElasticMQ Embedded
To create an embedded ElasticMQ instance, we use the ElasticMQ REST client library:
libraryDependencies += "org.elasticmq" %% "elasticmq-rest-sqs" % "1.6.5"
We can check the latest version on the MVN Repository.
To start the embedded REST server, we use the SQSRestServerBuilder:
val server = SQSRestServerBuilder
.withPort(9325)
.withInterface("localhost")
.start()
To stop the embedded REST server, we use the stopAndWait() method:
server.stopAndWait()
The SQSRestServerBuilder is a default instance of TheSQSRestServerBuilder:
object SQSRestServerBuilder extends TheSQSRestServerBuilder(
None,
None,
"",
9324,
NodeAddress(),
true,
StrictSQSLimits,
"elasticmq",
"000000000000",
None
)
TheSQSRestServerBuilder has the following signature:
case class TheSQSRestServerBuilder(
providedActorSystem: Option[ActorSystem],
providedQueueManagerActor: Option[ActorRef],
interface: String,
port: Int,
serverAddress: NodeAddress,
generateServerAddress: Boolean,
sqsLimits: Limits,
_awsRegion: String,
_awsAccountId: String,
queueEventListener: Option[ActorRef]
)
If a providedActorSystem is provided, it won’t be managed by the server. If not, one will be created and managed by the server. A providedQueueManagerActor is the main ElasticMQ actor. A queueEventListener listens to changes in queues and messages. The rest is standard SQS configuration.
2.3. ElasticMQ Docker
To set up a docker ElasticMQ instance, let’s use the softwaremill/elasticmq-native image and expose the necessary ports:
$ docker run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq-native
18:09:32.103 [main] INFO org.elasticmq.server.Main$ - Starting ElasticMQ server (1.6.5) ...
18:09:32.756 [elasticmq-pekko.actor.default-dispatcher-7] INFO o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
18:09:33.226 [elasticmq-pekko.actor.default-dispatcher-8] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
18:09:33.228 [main] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
18:09:33.259 [elasticmq-pekko.actor.default-dispatcher-8] INFO o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9325
18:09:33.259 [main] INFO org.elasticmq.server.Main$ - === ElasticMQ server (1.6.5) started in 1581 ms ===
The native image is compiled using GraalVM for faster startup time and smaller bundle size, but we can also use the JVM-based softwaremill/elasticmq image.
3. Configuration
To configure an ElasticMQ instance, we use the HOCON (Human-Optimized Config Object Notation) configuration language. This is the standard for many Scala ecosystem projects.
Let’s consider a basic configuration file:
$ cat elasticmq.conf
# What is the outside visible address of this ElasticMQ node
# Used to create the queue URL (may be different from bind address!)
node-address {
protocol = http
host = localhost
port = 9323
context-path = ""
}
rest-sqs {
enabled = true
bind-port = 9323
bind-hostname = "0.0.0.0"
# Possible values: relaxed, strict
sqs-limits = strict
}
rest-stats {
enabled = true
bind-port = 9324
bind-hostname = "0.0.0.0"
}
# Should the node-address be generated from the bind port/hostname
# Set this to true e.g. when assigning port automatically by using port 0.
generate-node-address = false
We can see the ports have changed given our configuration file:
$ java -Dconfig.file=elasticmq.conf -jar elasticmq-server-1.6.5.jar
12:08:52.665 [main] INFO org.elasticmq.server.Main$ - Starting ElasticMQ server (1.6.5) ...
12:08:59.786 [elasticmq-pekko.actor.default-dispatcher-4] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9323, visible server address http://localhost:9323
12:09:00.075 [main] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
12:09:00.246 [elasticmq-pekko.actor.default-dispatcher-5] INFO o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9324
12:09:00.262 [main] INFO org.elasticmq.server.Main$ - === ElasticMQ server (1.6.5) started in 9916 ms ===
We can also provide a separate logging configuration:
$ java -Dlogback.configurationFile=logback.xml -Dconfig.file=elasticmq.conf -jar elasticmq-server-1.6.5.jar
4. ElasticMQ Client
To create a client for an ElasticMQ instance, we can use the AWS SQS SDK library and create a thin wrapper for convenience.
First, we include the SQS SDK:
libraryDependencies += "software.amazon.awssdk" % "sqs" % "2.26.24"
We can check for the latest version on the MVN Repository page. Then we can create a client wrapper class:
class SQSAsyncClient(
queueURL: String,
region: String,
endpoint: String
)(implicit ec: ExecutionContext):
Finally, we instantiate the SDK client with our configuration parameters:
private val sqsAsyncClient: SqsAsyncClient =
SqsAsyncClient
.builder()
.region(Region.of(region))
.credentialsProvider(
AwsCredentialsProviderChain
.builder()
.credentialsProviders(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
ElasticMQConfig.ELASTIC_MQ_ACCESS_KEY,
ElasticMQConfig.ELASTIC_MQ_SECRET_ACCESS_KEY
)
)
)
.build()
)
.endpointOverride(URI.create(endpoint))
.build()
4.1. Creating Queues
Queues can be created at the configuration stage or we can use a client instance to create a queue through the SQS REST API. By default, queues are LIFO (Last-In First-Out), meaning old messages have dequeue priority over new ones.
We use the client to create a queue:
def createStandardQueue(queueName: String): Future[CreateQueueResponse] =
val request = CreateQueueRequest.builder.queueName(queueName).build
sqsAsyncClient.createQueue(request).asScala
Additionally, we can modify the queue attributes to create a FIFO (First-In First-Out) queue:
final lazy val createFIFOQueueAttributes = Map(
(QueueAttributeName.FIFO_QUEUE, "true")
).asJava
def createFIFOQueue(queueName: String): Future[CreateQueueResponse] =
val createQueueRequest = CreateQueueRequest.builder
.queueName(queueName)
.attributes(createFIFOQueueAttributes)
.build
sqsAsyncClient.createQueue(createQueueRequest).asScala
4.2. Sending Messages
To use our client to send messages to a queue, we use a SendMessageRequest() method*:*
def sendMessage(message: String): Future[SendMessageResponse] =
val request = SendMessageRequest
.builder()
.messageBody(message)
.queueUrl(queueURL)
.build()
sqsAsyncClient.sendMessage(request).asScala
We can also send messages in a batch using a SendMessageBatchRequest():
def sendMessagesInBatch(
messages: List[String]
): Future[SendMessageBatchResponse] =
val batchRequestEntry = messages
.map(
SendMessageBatchRequestEntry
.builder()
.messageBody(_)
.id(UUID.randomUUID().toString)
.build()
)
.asJava
val sendMessageBatchRequest = SendMessageBatchRequest
.builder()
.queueUrl(queueURL)
.entries(batchRequestEntry)
.build()
sqsAsyncClient.sendMessageBatch(sendMessageBatchRequest).asScala
4.3. Receiving Messages
We can use our client to receive up to 10 messages at a time using a ReceiveMessageRequest():
def receiveMessages(
maxNumberOfMessages: Int
): Future[ReceiveMessageResponse] =
val receiveMessageRequest =
ReceiveMessageRequest
.builder()
.maxNumberOfMessages(maxNumberOfMessages)
.queueUrl(queueURL)
.waitTimeSeconds(10)
.build()
sqsAsyncClient.receiveMessage(receiveMessageRequest).asScala
The maxNumberOfMessages setter must not exceed 10.
4.4. Deleting Messages
To delete messages with our client using a DeleteMessageRequest():
def deleteMessage(receiptHandle: String): Future[DeleteMessageResponse] =
val deleteMessageRequest = DeleteMessageRequest
.builder()
.queueUrl(queueURL)
.receiptHandle(receiptHandle)
.build()
sqsAsyncClient.deleteMessage(deleteMessageRequest).asScala
4.5. List Active Queues
To receive a list of active queues with our client, we use the listQueues() method:
def listQueues(): Future[ListQueuesResponse] =
sqsAsyncClient.listQueues().asScala
4.6. Purge Active Queue
To purge an active queue with our client, we use a PurgeQueueRequest():
final lazy val purgeQueueRequest =
PurgeQueueRequest.builder().queueUrl(queueURL).build()
def purgeQueue(): Future[PurgeQueueResponse] =
sqsAsyncClient.purgeQueue(purgeQueueRequest).asScala
4.7. Example Workflow
To test a workflow, we start an embedded ElasticMQ REST server:
val endpoint = "http://localhost:9325"
val region = "elasticmq"
val server = SQSRestServerBuilder
.withPort(9325)
.withInterface("localhost")
.start()
Then we create a client and workflow:
val elasticMQClient = new SQSAsyncClient(ElasticMQ_URL, region, endpoint)
val uselessWorkflow =
for
_ <- elasticMQClient.createStandardQueue("standardQueueForTest")
testQueueClient = new SQSAsyncClient(
ElasticMQ_URL + "standardQueueForTest",
region,
endpoint
)
_ <- testQueueClient.createFIFOQueue("fifoQueue.fifo")
_ <- testQueueClient.listQueues()
_ <- testQueueClient.sendMessage("Hi")
_ <- testQueueClient.sendMessagesInBatch(
List("Follow", "Baeldung", "on", "LinkedIn")
)
_ <- testQueueClient.receiveMessages(5)
_ <- testQueueClient.purgeQueue()
yield ()
Then we execute the workflow, logging any errors:
uselessWorkflow
.andThen(_ => server.stopAndWait())
.onComplete:
case Success(_) => m_logger.info("queue created")
case Failure(exception) =>
m_logger.error(exception, "exception in uselessWorkflow")
Finally, the result is successful logs:
[info] running com.baeldung.elasticmq.ElasticMQService
13:42:30.524 [default-pekko.actor.default-dispatcher-5] INFO o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
13:42:34.167 [elasticmq-pekko.actor.default-dispatcher-5] INFO o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
13:42:38.825 [elasticmq-pekko.actor.default-dispatcher-5] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address localhost:9325, visible server address http://localhost:9325
13:42:38.992 [sbt-bg-threads-1] INFO o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
13:42:44.536 [elasticmq-pekko.actor.default-dispatcher-5] INFO o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(standardQueueForTest,None,None,None,None,None,None,false,false,None,None,Map())
13:42:45.399 [elasticmq-pekko.actor.default-dispatcher-9] INFO o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(fifoQueue.fifo,None,None,None,None,None,None,true,false,None,None,Map())
13:42:47.753 [elasticmq-pekko.actor.default-dispatcher-13] INFO o.elasticmq.actor.queue.QueueActor - standardQueueForTest: Clearing queue
13:42:47.987 [elasticmq-pekko.actor.default-dispatcher-11] INFO o.a.pekko.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
13:42:48.085 [default-pekko.actor.default-dispatcher-5] INFO o.apache.pekko.actor.ActorSystemImpl - queue created
5. Conclusion
In this article, we’ve had an overview of ElasticMQ and client interaction with Scala. First, we set up ElasticMQ standalone with a .jar file, then with docker and embedded ElasticMQ within a program. Secondly, we set up a client to create queues, send messages, receive messages, delete messages, and send other commands to the ElasticMQ server.
As usual, the code is available over on GitHub.