1. Overview

Spring Integration makes it easy to use some Enterprise Integration Patterns. One of these ways is through its DSL.

In this tutorial, we’ll take a look at the DSL’s support for subflows for simplifying some of our configurations.

2. Our Task

Let’s say we have a sequence of integers that we want to separate into three different buckets.

And if we wanted to use Spring Integration to do this, we could start by creating three output channels:

  • Numbers like 0, 3, 6 and 9 will go to the multipleOfThreeChannel
  • Numbers like 1, 4, 7, and 10 will go to the remainderIsOneChannel
  • And numbers like 2, 5, 8, and 11 go to the remainderIsTwoChannel

To see how helpful subflows can be, let’s start with what this will look like without subflows.

And then, we’ll use subflows to simplify our configuration with:

  • publishSubscribeChannel
  • routeToRecipients
  • Filters, to configure our if-then logic
  • Routers, to configure our switch logic

3. Prerequisites

Now before configuring our subflows, let’s create those output channels.

We’ll make these QueueChannels since that is a bit easier to demo:

@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {
 
    @Bean
    QueueChannel multipleOfThreeChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsOneChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsTwoChannel() {
        return new QueueChannel();
    }

    boolean isMultipleOfThree(Integer number) {
       return number % 3 == 0;
    }

    boolean isRemainderIOne(Integer number) {
        return number % 3 == 1;
    }

    boolean isRemainderTwo(Integer number) {
        return number % 3 == 2;
    }
}

Ultimately, these are where our grouped numbers will end up.

Note also that Spring Integration can easily start looking complex, so we’ll add a few helper methods for the sake of readability.

4. Solving Without Subflows

Now we need to define our flows.

Without subflows, the simple idea is to define three separate integration flows, one for each type of number.

We’ll send the same sequence of messages to each IntegrationFlow component, but the output messages for each component will be different.

4.1. Defining IntegrationFlow Components

First, let’s define each IntegrationFlow bean in our SubflowConfiguration class:

@Bean
public IntegrationFlow multipleOfThreeFlow() {
    return flow -> flow.split()
      .<Integer> filter(this::isMultipleOfThree)
      .channel("multipleOfThreeChannel");
}

Our flow contains two endpoints – a Splitter followed by a Filt**er.

The filter does what it sounds like. But why do we also need a splitter? We’ll see this in a minute, but basically, it splits an input Collection into individual messages.

And, we can, of course, define two more IntegrationFlow beans in the same way.

4.2. Messaging Gateways

For each flow, we also need a Message Gateway.

Simply put, these abstract the Spring Integration Messages API away from the caller, similarly to how a REST service can abstract away HTTP:

@MessagingGateway
public interface NumbersClassifier {

    @Gateway(requestChannel = "multipleOfThreeFlow.input")
    void multipleOfThree(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsOneFlow.input")
    void remainderIsOne(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsTwoFlow.input")
    void remainderIsTwo(Collection<Integer> numbers);

}

For each, we need to use the @Gateway annotation and specify the implicit name for the input channel, which is simply the name of the bean followed by “.input”. Note that we can use this convention because we are using lambda-based flows.

These methods are the entry points into our flows.

4.3. Sending Messages and Checking Output

And now, let’s test:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {
 
    @Autowired
    private QueueChannel multipleOfThreeChannel;

    @Autowired
    private NumbersClassifier numbersClassifier;
    @Test
    public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
        numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
        Message<?> outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 3);
        outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 6);
        outMessage = multipleOfThreeChannel.receive(0);
        assertNull(outMessage);
    }
}

Notice that we’ve sent the messages as a List, which is why we needed the splitter, to take the single “list message” and transform it into several “number messages”.

We call receive with o to get the next available message without waiting. Since there are two multiples of three in our list, we’d expect to be able to call it twice. The third call to receive returns null.

receive, of course, returns a Message, so we call getPayload to extract the number.

Similarly, we could do the same for the other two.

So, that was the solution without subflows. We have three separate flows to maintain and three separate gateway methods.

What we’ll do now is replace the three IntegrationFlow beans with a single bean and the three gateway methods with a single one.

5. Using publishSubscribeChannel

The publishSubscribeChannel() method broadcasts messages to all subscribing subflows. This way, we can create one flow, instead of three.

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .publishSubscribeChannel(subscription -> 
           subscription
             .subscribe(subflow -> subflow
               .<Integer> filter(this::isMultipleOfThree)
               .channel("multipleOfThreeChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderOne)
                .channel("remainderIsOneChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderTwo)
                .channel("remainderIsTwoChannel")));
}

In this way, the subflows are anonymous, meaning that they can’t be independently addressed.

Now, we only have one flow, so let’s edit our NumbersClassifier as well:

@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);

Now, since we have only one IntegrationFlow bean and one gateway method, we need only send our list once:

@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
    numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));

    // same assertions as before
}

Note that from now on, only the integration flow definition will change so that we won’t show the test again.

6. Using routeToRecipients

Another way to achieve the same thing is routeToRecipients, which is nice because it has filtering built in.

Using this method, we can specify both channels and subflows for broadcasting. 

6.1. recipient

In the code below we’ll specify multipleof3ChannelremainderIs1Channel, and remainderIsTwoChannel as the recipients based on our conditions:

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .routeToRecipients(route -> route
          .<Integer> recipient("multipleOfThreeChannel", 
            this::isMultipleOfThree)       
          .<Integer> recipient("remainderIsOneChannel", 
            this::isRemainderOne)
          .<Integer> recipient("remainderIsTwoChannel", 
            this::isRemainderTwo));
}

We can also call recipient without a condition, and routeToRecipients will publish to that destination unconditionally.

6.2. recipientFlow

And note that routeToRecipients allows us to define a complete flow, just like publishSubscribeChannel. 

Let’s modify the above code and specify an anonymous subflow as the first recipient:

.routeToRecipients(route -> route
  .recipientFlow(subflow -> subflow
      .<Integer> filter(this::isMultipleOfThree)
      .channel("mutipleOfThreeChannel"))
  ...);

This subflow will receive the entire sequence of messages, so we need to filter like before to get the same behavior.

Again, one IntegrationFlow bean was enough for us.

Now let’s move on to the if-else components. One of them is Filter.

7. Using if-then Flows

We’ve already used Filter in all of the previous examples. The good news is that we can specify not only the condition for further processing but also a channel or a flow for the discarded messages.

We can think of discard flows and channels like an else block:

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .<Integer> filter(this::isMultipleOfThree, 
           notMultiple -> notMultiple
             .discardFlow(oneflow -> oneflow
               .<Integer> filter(this::isRemainderOne,
                 twoflow -> twoflow
                   .discardChannel("remainderIsTwoChannel"))
               .channel("remainderIsOneChannel"))
        .channel("multipleofThreeChannel");
}

In this case, we’ve implemented our if-else routing logic:

  • If the number is not a multiple of three, then discard those messages to the discard flow; we use a flow here since there is more logic needed to know its destination channel.
  • In the discard flow, if the number isn’t of remainder one, then discard those messages to the discard channel.

**8. switch-ing on a Computed Value

And finally, let’s try the route method, which gives us a bit more control than routeToRecipients. It’s nice because a Router can split the flow into any number of parts, whereas a Filter can only do two.

8.1. channelMapping

Let’s define our IntegrationFlow bean:

@Bean
public IntegrationFlow classify() {
    return classify -> classify.split()
      .<Integer, Integer> route(number -> number % 3, 
        mapping -> mapping
         .channelMapping(0, "multipleOfThreeChannel")
         .channelMapping(1, "remainderIsOneChannel")
         .channelMapping(2, "remainderIsTwoChannel"));
}

In the code above we calculate a routing key by performing the division:

route(p -> p % 3,...

Based on this key, we route the messages:

channelMapping(0, "multipleof3Channel")

8.2. subFlowMapping

Now, like with others, we can take more control by specifying a subflow, replacing channelMapping with subFlowMapping:

.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))

Or still more control by calling the handle method instead of the channel method:

.subFlowMapping(2, subflow -> subflow
  .<Integer> handle((payload, headers) -> {
      // do extra work on the payload
     return payload;
  }))).channel("remainderIsTwoChannel");

In this case, the subflow would return to the main flow after the route() method, so there we’d need to specify the channel remainderIsTwoChannel.

9. Conclusion

In this tutorial, we’ve explored how to filter and route messages in some ways using subflows.

As usual, the complete source code is available on GitHub.