1. Introduction
In this tutorial, we’ll learn about the Spring Integration Java DSL for creating application integrations.
We’ll take the file-moving integration we built in Introduction to Spring Integration and use the DSL instead.
2. Dependencies
The Spring Integration Java DSL is part of Spring Integration Core.
So, we can add that dependency:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>6.0.0</version>
</dependency>
And to work on our file-moving application, we’ll also need Spring Integration File:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>6.0.0</version>
</dependency>
3. Spring Integration Java DSL
Before the Java DSL, users would configure Spring Integration components in XML.
The DSL introduces some fluent builders from which we can easily create a complete a Spring Integration pipeline purely in Java.
So, let’s say we wanted to create a channel that uppercases any data coming through the pipe.
In the past, we might have done:
<int:channel id="input"/>
<int:transformer input-channel="input" expression="payload.toUpperCase()" />
And now we can instead do:
@Bean
public IntegrationFlow upcaseFlow() {
return IntegrationFlow.from("input")
.transform(String::toUpperCase)
.get();
}
4. The File-Moving App
To begin our file-moving integration, we’ll need some simple building blocks.
4.1. Integration Flow
The first building block we need is an integration flow, which we can get from the IntegrationFlows builder:
IntegrationFlows.from(...)
from can take several types, but in this tutorial, we will look at just three:
- MessageSources
- MessageChannels, and
- Strings
We’ll talk about all three shortly.
After we have called from, some customization methods are now available to us:
IntegrationFlow flow = IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory())
// add more components
.get();
Ultimately, IntegrationFlow will always produce an instance of IntegrationFlow, which is the final product of any Spring Integration app.
This pattern of taking input, performing the appropriate transformations, and emitting the results is fundamental to all Spring Integration apps.
4.2. Describing an Input Source
First, to move files, we’ll need to indicate to our integration flow where it should look for them, and for that, we need a MessageSource:
@Bean
public MessageSource<File> sourceDirectory() {
// .. create a message source
}
Simply put, a MessageSource is a place from which messages can come that are external to the application.
More specifically, we need something that can adapt that external source into the Spring messaging representation. And since this adaptation is focused on input, these are often called Input Channel Adapters.
The spring-integration-file dependency gives us an input channel adapter that’s great for our use case: FileReadingMessageSource:
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File(INPUT_DIR));
return messageSource;
}
Here, our FileReadingMessageSource will be reading a directory given by INPUT_DIR and will create a MessageSource from it.
Let’s specify this as our source in an IntegrationFlow.from invocation:
IntegrationFlow.from(sourceDirectory());
4.3. Configuring an Input Source
Now, if we are thinking about this as a long-lived application, we’ll probably want to be able to notice files as they come in, not just move the files that are already there at startup.
To facilitate this, from can also take extra configurers as further customization of the input source:
IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));
In this case, we can make our input source more resilient by telling Spring Integration to poll that source–our filesystem in this case–every 10 seconds.
And, of course, this doesn’t apply to just our file input source, we could add this poller to any MessageSource.
4.4. Filtering Messages from an Input Source
Next, let’s suppose we want our file-moving application to move specific files only, say image files having jpg extension.
For this, we can use GenericSelector:
@Bean
public GenericSelector<File> onlyJpgs() {
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName().endsWith(".jpg");
}
};
}
So, let’s update our integration flow again:
IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs());
Or, because this filter is so simple, we could have instead defined it using a lambda:
IntegrationFlow.from(sourceDirectory())
.filter(source -> ((File) source).getName().endsWith(".jpg"));
4.5. Handling Messages With Service Activators
Now that we have a filtered list of files, we need to write them to a new location.
Service Activator**s are what we turn to when we’re thinking about outputs in Spring Integration.
Let’s use the FileWritingMessageHandler service activator from spring-integration-file:
@Bean
public MessageHandler targetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
Here, our FileWritingMessageHandler will write each Message payload it receives to OUTPUT_DIR.
Again, let’s update:
IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory());
And notice, by the way, the usage of setExpectReply. Because integration flows can be bidirectional, this invocation indicates that this particular pipe is one way.
4.6. Activating Our Integration Flow
When we have added all our components we need to register our IntegrationFlow as a bean to activate it:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlow.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
.filter(onlyJpgs())
.handle(targetDirectory())
.get();
}
The get method extracts an IntegrationFlow instance that we need to register as a Spring Bean.
As soon as our application context loads, all our components contained in our IntegrationFlow gets activated.
And now, our application will start moving files from the source directory to target directory.
5. Additional Components
In our DSL-based file-moving application, we created an Inbound Channel Adapter, a Message Filter, and a Service Activator.
Let’s look at a few other common Spring Integration components and see how we might use them.
5.1. Message Channels
As mentioned earlier, a Message Channel is another way to initialize a flow:
IntegrationFlow.from("anyChannel")
We can read this as “please find or create a channel bean called anyChannel. Then, read any data that is fed into anyChannel from other flows.”
But, really it is more general-purpose than that.
Simply put, a channel abstracts away producers from consumers, and we can think of it as a Java Queue. A channel can be inserted at any point in the flow.
Let’s say, for example, that we want to prioritize the files as they get moved from one directory to the next:
@Bean
public PriorityChannel alphabetically() {
return new PriorityChannel(1000, (left, right) ->
((File)left.getPayload()).getName().compareTo(
((File)right.getPayload()).getName()));
}
Then, we can insert an invocation to channel in between our flow:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.channel("alphabetically")
.handle(targetDirectory())
.get();
}
There are dozens of channels to pick from, some of the more handy ones being for concurrency, auditing, or intermediate persistence (think Kafka or JMS buffers).
Also, channels can be powerful when combined with Bridges.
5.2. Bridge
When we want to combine two channels, we use a Bridge.
Let’s imagine that instead of writing directly to an output directory, we instead had our file-moving app write to another channel:
@Bean
public IntegrationFlow fileReader() {
return IntegrationFlow.from(sourceDirectory())
.filter(onlyJpgs())
.channel("holdingTank")
.get();
}
Now, because we’ve simply written it to a channel, we can bridge from there to other flows.
Let’s create a bridge that polls our holding tank for messages and writes them to a destination:
@Bean
public IntegrationFlow fileWriter() {
return IntegrationFlow.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(Duration.of(1, TimeUnit.SECONDS.toChronoUnit()), Duration.of(20, TimeUnit.SECONDS.toChronoUnit()))))
.handle(targetDirectory())
.get();
}
Again, because we wrote to an intermediate channel, now we can add another flow that takes these same files and writes them at a different rate:
@Bean
public IntegrationFlow anotherFileWriter() {
return IntegrationFlow.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(Duration.of(2, TimeUnit.SECONDS.toChronoUnit()), Duration.of(10, TimeUnit.SECONDS.toChronoUnit()))))
.handle(anotherTargetDirectory())
.get();
}
As we can see, individual bridges can control the polling configuration for different handlers.
As soon as our application context is loaded, we now have a more complex app in action that will start moving files from the source directory to two target directories.
6. Conclusion
In this article, we saw various ways to use the Spring Integration Java DSL to build different integration pipelines.
Essentially, we were able to recreate the file-moving application from a previous tutorial, this time using pure java.
Also, we took a look at a few other components like channels and bridges.
The complete source code used in this tutorial is available over on Github.