1. Overview

In this tutorial, we'll take a look at transaction support in the Spring Integration framework.

2. Transactions in Message Flows

Spring provides support for synchronizing resources with transactions since the earliest versions. We often use it to synchronize transactions managed by multiple transaction managers.

For example, we can synchronize a JMS commit with a JDBC commit.

On the other hand, we also have more complex use cases in the message flows. They include synchronization of nontransactional resources as well as various types of transactional resources.

Typically, messaging flows can be initiated by two different types of mechanisms.

2.1. Message Flows Initiated by a User Process

Some message flows depend on the initiation of third party processes, like triggering a message on some message channel or invocation of a message gateway method.

We configure transaction support for these flows through Spring’s standard transaction support. The flows don't have to be configured explicitly by Spring Integration to support transactions. The Spring Integration message flow naturally honors the transactional semantics of the Spring components.

For example, we can annotate a ServiceActivator or its method with @Transactional:

@Transactional
public class TxServiceActivator {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void storeTestResult(String testResult) {
        this.jdbcTemplate.update("insert into STUDENT values(?)", testResult);
        log.info("Test result is stored: {}", testResult);
    }
}

We can run the storeTestResult method from any component, and the transactional context will apply as usual. With this approach, we have full control over the transaction configuration.

2.2. Message Flows Initiated by a Daemon Process

We often use this type of message flow for automation. For example, a Poller polling a message queue to initiate a new message flow with the polled message, or a scheduler scheduling the process by creating a new message and initiating a message flow at a predefined time.

In essence, these are trigger-based flows initiated by a trigger process (a daemon process). For these flows, we have to provide some transaction configuration to create a transaction context whenever a new message flow begins.

Through the configuration, we delegate the flows to Spring's existing transaction support.

We'll focus on transaction support for this type of message flow through the rest of the article.

3. Poller Transaction Support

Poller is a common component in integration flows. It periodically retrieves the data from various sources and passes it on through the integration chain.

Spring Integration provides transactional support for pollers out of the box. Any time we configure a Poller component, we can provide transactional configuration:

@Bean
@InboundChannelAdapter(value = "someChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> someMessageSource() {
    ...
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory)
      .get();
}

private TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder()
      .transactionManager(txManager)
      .build();
}

We have to provide a reference to a TransactionManager and a custom TransactionSynchronizationFactory, or we can rely on the defaults. Internally, Spring’s native transaction wraps the process. As a result, all message flows initiated by this poller are transactional.

4. Transaction Boundaries

When a transaction is started, the transaction context is always bound to the current thread. Regardless of how many endpoints and channels we might have in our message flow, our transaction context will always be preserved as long as the flow lives in the same thread.

If we break it by initiating a new thread in some service, we'll break the Transactional boundary as well. Essentially, the transaction will end at that point.

If a successful handoff has transpired between the threads, the flow will be considered a success. That will commit the transaction at that point, but the flow will continue, and it still might result in an Exception somewhere downstream.

Consequently, that Exception can get back to the initiator of the flow so that the transaction can end up in a rollback. That is why we have to use transactional channels at any point where a thread boundary can be broken.

For example, we should use JMS, JDBC, or some other transactional channel.

5. Transaction Synchronization

In some use cases, it is beneficial to synchronize certain operations with a transaction that encompasses the entire flow.

For example, we'll demonstrate how to use a Poller that reads an incoming file and, based on its contents, performs a database update. When the database operation completes, it also renames the file depending on the success of the operation.

Before we move to the example, it is crucial to understand that this approach synchronizes the operations on the filesystem with a transaction. It does not make the filesystem, which is not inherently transactional, actually become transactional.

The transaction starts before the poll and either commits or rolls back when the flow completes, followed by the synchronized operation on the filesystem.

First, we define an InboundChannelAdapter with a simple Poller:

@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory)
      .get();
}

Poller contains a reference to the TransactionManager, as explained earlier. Additionally, it also contains a reference to the TransactionSynchronizationFactory. This component provides the mechanism for synchronization of the filesystem operations with the transaction:

@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor processor =
      new ExpressionEvaluatingTransactionSynchronizationProcessor();

    SpelExpressionParser spelParser = new SpelExpressionParser();
 
    processor.setAfterCommitExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"));
 
    processor.setAfterRollbackExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));

    return new DefaultTransactionSynchronizationFactory(processor);
}

If the transaction commits, TransactionSynchronizationFactory will rename the file by appending “.PASSED” to the filename. However, if it rolls back, it will append “.FAILED”.

The InputChannel transforms the payload using the FileToStringTransformer and delegates it to the toServiceChannel. This channel is bound to the ServiceActivator:

@Bean
public MessageChannel inputChannel() {
    return new DirectChannel();
}
    
@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")
public FileToStringTransformer fileToStringTransformer() {
    return new FileToStringTransformer();
}

ServiceActivator reads the incoming file, which contains the student's exam results. It writes the result in the database. If a result contains the string “fail”, it throws the Exception, which causes the database to rollback:

@ServiceActivator(inputChannel = "toServiceChannel")
public void serviceActivator(String payload) {

    jdbcTemplate.update("insert into STUDENT values(?)", payload);

    if (payload.toLowerCase().startsWith("fail")) {
        log.error("Service failure. Test result: {} ", payload);
        throw new RuntimeException("Service failure.");
    }

    log.info("Service success. Test result: {}", payload);
}

After the database operation successfully commits or rolls back, the TransactionSynchronizationFactory synchronizes the filesystem operation with its outcome.

6. Conclusion

In this article, we explained the transaction support in the Spring Integration framework. Additionally, we demonstrated how to synchronize the transaction with operations on a nontransactional resource like the filesystem.

The complete source code for the example is available over on GitHub.