1. Introduction

ElasticMQ is an in-memory AWS SQS (Simple Queue System) compatible message queue built with Scala for embedded or standalone use.

In this tutorial, we’ll go through ElasticMQ orchestration, configuration, and use from a Scala client.

2. ElasticMQ Setup

ElasticMQ can be run standalone, embedded, or using an official docker image.

2.1. ElasticMQ Standalone

To set up a standalone ElasticMQ instance, we can download the ElasticMQ jar application with wget:

$ wget https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.6.5.jar

Then we can start it:

$ java -jar elasticmq-server-1.6.5.jar
11:56:36.506 [main] INFO  org.elasticmq.server.Main$ - Starting ElasticMQ server (1.6.5) ...
11:56:39.109 [elasticmq-pekko.actor.default-dispatcher-4] INFO  o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
11:56:43.740 [elasticmq-pekko.actor.default-dispatcher-4] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
11:56:44.153 [main] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
11:56:44.326 [elasticmq-pekko.actor.default-dispatcher-4] INFO  o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9325
11:56:44.344 [main] INFO  org.elasticmq.server.Main$ - === ElasticMQ server (1.6.5) started in 10093 ms ===

2.2. ElasticMQ Embedded

To create an embedded ElasticMQ instance, we use the ElasticMQ REST client library:

libraryDependencies += "org.elasticmq" %% "elasticmq-rest-sqs" % "1.6.5"

We can check the latest version on the MVN Repository.

To start the embedded REST server, we use the SQSRestServerBuilder:

val server = SQSRestServerBuilder
  .withPort(9325)
  .withInterface("localhost")
  .start()

To stop the embedded REST server, we use the stopAndWait() method:

server.stopAndWait()

The SQSRestServerBuilder is a default instance of TheSQSRestServerBuilder:

object SQSRestServerBuilder extends TheSQSRestServerBuilder(
  None,
  None,
  "",
  9324,
  NodeAddress(),
  true,
  StrictSQSLimits,
  "elasticmq",
  "000000000000",
  None
)

TheSQSRestServerBuilder has the following signature:

case class TheSQSRestServerBuilder(
  providedActorSystem: Option[ActorSystem],
  providedQueueManagerActor: Option[ActorRef],
  interface: String,
  port: Int,
  serverAddress: NodeAddress,
  generateServerAddress: Boolean,
  sqsLimits: Limits,
  _awsRegion: String,
  _awsAccountId: String,
  queueEventListener: Option[ActorRef]
)

If a providedActorSystem is provided, it won’t be managed by the server. If not, one will be created and managed by the server. A providedQueueManagerActor is the main ElasticMQ actor. A queueEventListener listens to changes in queues and messages. The rest is standard SQS configuration.

2.3. ElasticMQ Docker

To set up a docker ElasticMQ instance, let’s use the softwaremill/elasticmq-native image and expose the necessary ports:

$ docker run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq-native
18:09:32.103 [main] INFO  org.elasticmq.server.Main$ - Starting ElasticMQ server (1.6.5) ...
18:09:32.756 [elasticmq-pekko.actor.default-dispatcher-7] INFO  o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
18:09:33.226 [elasticmq-pekko.actor.default-dispatcher-8] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
18:09:33.228 [main] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
18:09:33.259 [elasticmq-pekko.actor.default-dispatcher-8] INFO  o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9325
18:09:33.259 [main] INFO  org.elasticmq.server.Main$ - === ElasticMQ server (1.6.5) started in 1581 ms ===

The native image is compiled using GraalVM for faster startup time and smaller bundle size, but we can also use the JVM-based softwaremill/elasticmq image.

3. Configuration

To configure an ElasticMQ instance, we use the HOCON (Human-Optimized Config Object Notation) configuration language. This is the standard for many Scala ecosystem projects.

Let’s consider a basic configuration file:

$ cat elasticmq.conf
# What is the outside visible address of this ElasticMQ node
# Used to create the queue URL (may be different from bind address!)
node-address {
  protocol = http
  host = localhost
  port = 9323
  context-path = ""
}

rest-sqs {
  enabled = true
  bind-port = 9323
  bind-hostname = "0.0.0.0"
  # Possible values: relaxed, strict
  sqs-limits = strict
}

rest-stats {
  enabled = true
  bind-port = 9324
  bind-hostname = "0.0.0.0"
}

# Should the node-address be generated from the bind port/hostname
# Set this to true e.g. when assigning port automatically by using port 0.
generate-node-address = false

We can see the ports have changed given our configuration file:

$ java -Dconfig.file=elasticmq.conf -jar elasticmq-server-1.6.5.jar
12:08:52.665 [main] INFO  org.elasticmq.server.Main$ - Starting ElasticMQ server (1.6.5) ...
12:08:59.786 [elasticmq-pekko.actor.default-dispatcher-4] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9323, visible server address http://localhost:9323
12:09:00.075 [main] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
12:09:00.246 [elasticmq-pekko.actor.default-dispatcher-5] INFO  o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9324
12:09:00.262 [main] INFO  org.elasticmq.server.Main$ - === ElasticMQ server (1.6.5) started in 9916 ms ===

We can also provide a separate logging configuration:

$ java -Dlogback.configurationFile=logback.xml -Dconfig.file=elasticmq.conf -jar elasticmq-server-1.6.5.jar

4. ElasticMQ Client

To create a client for an ElasticMQ instance, we can use the AWS SQS SDK library and create a thin wrapper for convenience.

First, we include the SQS SDK:

libraryDependencies += "software.amazon.awssdk" % "sqs" % "2.26.24"

We can check for the latest version on the MVN Repository page. Then we can create a client wrapper class:

class SQSAsyncClient(
  queueURL: String,
  region: String,
  endpoint: String
)(implicit ec: ExecutionContext):

Finally, we instantiate the SDK client with our configuration parameters:

private val sqsAsyncClient: SqsAsyncClient =
  SqsAsyncClient
    .builder()
    .region(Region.of(region))
    .credentialsProvider(
      AwsCredentialsProviderChain
        .builder()
        .credentialsProviders(
          StaticCredentialsProvider.create(
            AwsBasicCredentials.create(
              ElasticMQConfig.ELASTIC_MQ_ACCESS_KEY,
              ElasticMQConfig.ELASTIC_MQ_SECRET_ACCESS_KEY
            )
          )
        )
        .build()
    )
    .endpointOverride(URI.create(endpoint))
    .build()

4.1. Creating Queues

Queues can be created at the configuration stage or we can use a client instance to create a queue through the SQS REST API. By default, queues are LIFO (Last-In First-Out), meaning old messages have dequeue priority over new ones.

We use the client to create a queue:

def createStandardQueue(queueName: String): Future[CreateQueueResponse] =
  val request = CreateQueueRequest.builder.queueName(queueName).build

  sqsAsyncClient.createQueue(request).asScala

Additionally, we can modify the queue attributes to create a FIFO (First-In First-Out) queue:

final lazy val createFIFOQueueAttributes = Map(
  (QueueAttributeName.FIFO_QUEUE, "true")
).asJava

def createFIFOQueue(queueName: String): Future[CreateQueueResponse] =
  val createQueueRequest = CreateQueueRequest.builder
    .queueName(queueName)
    .attributes(createFIFOQueueAttributes)
    .build

  sqsAsyncClient.createQueue(createQueueRequest).asScala

4.2. Sending Messages

To use our client to send messages to a queue, we use a SendMessageRequest() method*:*

def sendMessage(message: String): Future[SendMessageResponse] =
  val request = SendMessageRequest
    .builder()
    .messageBody(message)
    .queueUrl(queueURL)
    .build()

  sqsAsyncClient.sendMessage(request).asScala

We can also send messages in a batch using a SendMessageBatchRequest():

def sendMessagesInBatch(
  messages: List[String]
): Future[SendMessageBatchResponse] =
  val batchRequestEntry = messages
    .map(
      SendMessageBatchRequestEntry
        .builder()
        .messageBody(_)
        .id(UUID.randomUUID().toString)
        .build()
    )
    .asJava
  val sendMessageBatchRequest = SendMessageBatchRequest
    .builder()
    .queueUrl(queueURL)
    .entries(batchRequestEntry)
    .build()

  sqsAsyncClient.sendMessageBatch(sendMessageBatchRequest).asScala

4.3. Receiving Messages

We can use our client to receive up to 10 messages at a time using a ReceiveMessageRequest():

def receiveMessages(
  maxNumberOfMessages: Int
): Future[ReceiveMessageResponse] =
  val receiveMessageRequest =
    ReceiveMessageRequest
      .builder()
      .maxNumberOfMessages(maxNumberOfMessages)
      .queueUrl(queueURL)
      .waitTimeSeconds(10)
      .build()

  sqsAsyncClient.receiveMessage(receiveMessageRequest).asScala

The maxNumberOfMessages setter must not exceed 10.

4.4. Deleting Messages

To delete messages with our client using a DeleteMessageRequest():

def deleteMessage(receiptHandle: String): Future[DeleteMessageResponse] =
  val deleteMessageRequest = DeleteMessageRequest
    .builder()
    .queueUrl(queueURL)
    .receiptHandle(receiptHandle)
    .build()

  sqsAsyncClient.deleteMessage(deleteMessageRequest).asScala

4.5. List Active Queues

To receive a list of active queues with our client, we use the listQueues() method:

def listQueues(): Future[ListQueuesResponse] =
  sqsAsyncClient.listQueues().asScala

4.6. Purge Active Queue

To purge an active queue with our client, we use a PurgeQueueRequest():

final lazy val purgeQueueRequest =
  PurgeQueueRequest.builder().queueUrl(queueURL).build()
def purgeQueue(): Future[PurgeQueueResponse] =
  sqsAsyncClient.purgeQueue(purgeQueueRequest).asScala

4.7. Example Workflow

To test a workflow, we start an embedded ElasticMQ REST server:

val endpoint = "http://localhost:9325"
val region = "elasticmq"

val server = SQSRestServerBuilder
  .withPort(9325)
  .withInterface("localhost")
  .start()

Then we create a client and workflow:

val elasticMQClient = new SQSAsyncClient(ElasticMQ_URL, region, endpoint)

val uselessWorkflow =
  for
    _ <- elasticMQClient.createStandardQueue("standardQueueForTest")
    testQueueClient = new SQSAsyncClient(
      ElasticMQ_URL + "standardQueueForTest",
      region,
      endpoint
    )
    _ <- testQueueClient.createFIFOQueue("fifoQueue.fifo")
    _ <- testQueueClient.listQueues()
    _ <- testQueueClient.sendMessage("Hi")
    _ <- testQueueClient.sendMessagesInBatch(
      List("Follow", "Baeldung", "on", "LinkedIn")
    )
    _ <- testQueueClient.receiveMessages(5)
    _ <- testQueueClient.purgeQueue()
  yield ()

Then we execute the workflow, logging any errors:

uselessWorkflow
  .andThen(_ => server.stopAndWait())
  .onComplete:
    case Success(_) => m_logger.info("queue created")
    case Failure(exception) =>
      m_logger.error(exception, "exception in uselessWorkflow")

Finally, the result is successful logs:

[info] running com.baeldung.elasticmq.ElasticMQService 
13:42:30.524 [default-pekko.actor.default-dispatcher-5] INFO  o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
13:42:34.167 [elasticmq-pekko.actor.default-dispatcher-5] INFO  o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
13:42:38.825 [elasticmq-pekko.actor.default-dispatcher-5] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address localhost:9325, visible server address http://localhost:9325
13:42:38.992 [sbt-bg-threads-1] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
13:42:44.536 [elasticmq-pekko.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(standardQueueForTest,None,None,None,None,None,None,false,false,None,None,Map())
13:42:45.399 [elasticmq-pekko.actor.default-dispatcher-9] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(fifoQueue.fifo,None,None,None,None,None,None,true,false,None,None,Map())
13:42:47.753 [elasticmq-pekko.actor.default-dispatcher-13] INFO  o.elasticmq.actor.queue.QueueActor - standardQueueForTest: Clearing queue
13:42:47.987 [elasticmq-pekko.actor.default-dispatcher-11] INFO  o.a.pekko.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
13:42:48.085 [default-pekko.actor.default-dispatcher-5] INFO  o.apache.pekko.actor.ActorSystemImpl - queue created

5. Conclusion

In this article, we’ve had an overview of ElasticMQ and client interaction with Scala. First, we set up ElasticMQ standalone with a .jar file, then with docker and embedded ElasticMQ within a program. Secondly, we set up a client to create queues, send messages, receive messages, delete messages, and send other commands to the ElasticMQ server.

As usual, the code is available over on GitHub.