1. Introduction

In this article, we’ll focus on how we can use Spring Integration and Spring Security together in an integration flow.

Therefore, we’ll set up a simple secured message flow to demonstrate the use of Spring Security in Spring Integration. Also, we’ll provide the example of SecurityContext propagation in multithreading message channels.

For more details of using the framework, you can refer to our introduction to Spring Integration.

2. Spring Integration Configuration

2.1. Dependencies

Firstly*,* we need to add the Spring Integration dependencies to our project.

Since we’ll set up a simple message flows with DirectChannel, PublishSubscribeChannel, and ServiceActivator, we need spring-integration-core dependency.

Also, we also need the spring-integration-security dependency to be able to use Spring Security in Spring Integration:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-security</artifactId>
    <version>5.1.13.RELEASE</version>
</dependency>

And we ‘re also using Spring Security, so we’ll add spring-security-config to our project:

<dependency>
    <groupId>org.springframework.security</groupId>
    <artifactId>spring-security-config</artifactId>
    <version>5.1.13.RELEASE</version>
</dependency>

We can check out the latest version of all above dependencies at Maven Central: spring-integration-security, spring-security-config.

2.2. Java-Based Configuration

Our example will use basic Spring Integration components. Thus, we only need to enable Spring Integration in our project by using @EnableIntegration annotation:

@Configuration
@EnableIntegration
public class SecuredDirectChannel {
    //...
}

3. Secured Message Channel

First of all, we need an instance of ChannelSecurityInterceptor which will intercept all send and receive calls on a channel and decide if that call can be executed or denied:

@Autowired
@Bean
public ChannelSecurityInterceptor channelSecurityInterceptor(
  AuthenticationManager authenticationManager, 
  AccessDecisionManager customAccessDecisionManager) {

    ChannelSecurityInterceptor 
      channelSecurityInterceptor = new ChannelSecurityInterceptor();

    channelSecurityInterceptor
      .setAuthenticationManager(authenticationManager);

    channelSecurityInterceptor
      .setAccessDecisionManager(customAccessDecisionManager);

    return channelSecurityInterceptor;
}

The AuthenticationManager and AccessDecisionManager beans are defined as:

@Configuration
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class SecurityConfig extends GlobalMethodSecurityConfiguration {

    @Override
    @Bean
    public AuthenticationManager 
      authenticationManager() throws Exception {
        return super.authenticationManager();
    }

    @Bean
    public AccessDecisionManager customAccessDecisionManager() {
        List<AccessDecisionVoter<? extends Object>> 
          decisionVoters = new ArrayList<>();
        decisionVoters.add(new RoleVoter());
        decisionVoters.add(new UsernameAccessDecisionVoter());
        AccessDecisionManager accessDecisionManager
          = new AffirmativeBased(decisionVoters);
        return accessDecisionManager;
    }
}

Here, we use two AccessDecisionVoter: RoleVoter and a custom UsernameAccessDecisionVoter.

Now, we can use that ChannelSecurityInterceptor to secure our channel. What we need to do is decorating the channel by @SecureChannel annotation:

@Bean(name = "startDirectChannel")
@SecuredChannel(
  interceptor = "channelSecurityInterceptor", 
  sendAccess = { "ROLE_VIEWER","jane" })
public DirectChannel startDirectChannel() {
    return new DirectChannel();
}

@Bean(name = "endDirectChannel")
@SecuredChannel(
  interceptor = "channelSecurityInterceptor", 
  sendAccess = {"ROLE_EDITOR"})
public DirectChannel endDirectChannel() {
    return new DirectChannel();
}

The @SecureChannel accepts three properties:

  • The interceptor property: refers to a ChannelSecurityInterceptor bean.
  • The sendAccess and receiveAccess properties: contains the policy for invoking send or receive action on a channel.

In the example above, we expect only users who have ROLE_VIEWER or have username jane can send a message from the startDirectChannel.

Also, only users who have ROLE_EDITOR can send a message to the endDirectChannel.

We achieve this with the support of our custom AccessDecisionManager: either RoleVoter or UsernameAccessDecisionVoter returns an affirmative response, the access is granted.

4. Secured ServiceActivator

It’s worth to mention that we also can secure our ServiceActivator by Spring Method Security. Therefore, we need to enable method security annotation:

@Configuration
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class SecurityConfig extends GlobalMethodSecurityConfiguration {
    //....
}

For simplicity, in this article, we’ll only use Spring pre and post annotations, so we’ll add the @EnableGlobalMethodSecurity annotation to our configuration class and set prePostEnabled to true.

Now we can secure our ServiceActivator with a @PreAuthorization annotation:

@ServiceActivator(
  inputChannel = "startDirectChannel", 
  outputChannel = "endDirectChannel")
@PreAuthorize("hasRole('ROLE_LOGGER')")
public Message<?> logMessage(Message<?> message) {
    Logger.getAnonymousLogger().info(message.toString());
    return message;
}

The ServiceActivator here receives the message from startDirectChannel and output the message to endDirectChannel.

Besides, the method is accessible only if the current Authentication principal has role ROLE_LOGGER.

5. Security Context Propagation

Spring SecurityContext is thread-bound by default. It means the SecurityContext won’t be propagated to a child-thread.

For all above examples, we use both DirectChannel and ServiceActivator – which all run in a single thread; thus, the SecurityContext is available throughout the flow.

However, when using QueueChannel, ExecutorChannel, and PublishSubscribeChannel with an Executor, messages will be transferred from one thread to others threads. In this case, we need to propagate the SecurityContext to all threads receiving the messages.

Let create another message flow which starts with a PublishSubscribeChannel channel, and two ServiceActivator subscribes to that channel:

@Bean(name = "startPSChannel")
@SecuredChannel(
  interceptor = "channelSecurityInterceptor", 
  sendAccess = "ROLE_VIEWER")
public PublishSubscribeChannel startChannel() {
    return new PublishSubscribeChannel(executor());
}

@ServiceActivator(
  inputChannel = "startPSChannel", 
  outputChannel = "finalPSResult")
@PreAuthorize("hasRole('ROLE_LOGGER')")
public Message<?> changeMessageToRole(Message<?> message) {
    return buildNewMessage(getRoles(), message);
}

@ServiceActivator(
  inputChannel = "startPSChannel", 
  outputChannel = "finalPSResult")
@PreAuthorize("hasRole('ROLE_VIEWER')")
public Message<?> changeMessageToUserName(Message<?> message) {
    return buildNewMessage(getUsername(), message);
}

In the example above, we have two ServiceActivator subscribe to the startPSChannel. The channel requires an Authentication principal with role ROLE_VIEWER to be able to send a message to it.

Likewise, we can invoke the changeMessageToRole service only if the Authentication principal has the ROLE_LOGGER role.

Also, the changeMessageToUserName service can only be invoked if the Authentication principal has the role ROLE_VIEWER.

Meanwhile, the startPSChannel will run with the support of a ThreadPoolTaskExecutor:

@Bean
public ThreadPoolTaskExecutor executor() {
    ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
    pool.setCorePoolSize(10);
    pool.setMaxPoolSize(10);
    pool.setWaitForTasksToCompleteOnShutdown(true);
    return pool;
}

Consequently, two ServiceActivator will run in two different threads. To propagate the SecurityContext to those threads, we need to add to our message channel a SecurityContextPropagationChannelInterceptor:

@Bean
@GlobalChannelInterceptor(patterns = { "startPSChannel" })
public ChannelInterceptor securityContextPropagationInterceptor() {
    return new SecurityContextPropagationChannelInterceptor();
}

Notice how we decorated the SecurityContextPropagationChannelInterceptor with the @GlobalChannelInterceptor annotation. We also added our startPSChannel to its patterns property.

Therefore, above configuration states that the SecurityContext from the current thread will be propagated to any thread derived from startPSChannel.

6. Testing

Let’s start verifying our message flows using some JUnit tests.

6.1. Dependency

We, of course, need the spring-security-test dependency at this point:

<dependency>
    <groupId>org.springframework.security</groupId>
    <artifactId>spring-security-test</artifactId>
    <version>5.0.3.RELEASE</version>
    <scope>test</scope>
</dependency>

Likewise, the latest version can be checked out from Maven Central: spring-security-test.

6.2. Test Secured Channel

Firstly, we try to send a message to our startDirectChannel:

@Test(expected = AuthenticationCredentialsNotFoundException.class)
public void 
  givenNoUser_whenSendToDirectChannel_thenCredentialNotFound() {

    startDirectChannel
      .send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
}

Since the channel is secured, we expect an AuthenticationCredentialsNotFoundException exception when sending the message without providing an authentication object.

Next, we provide a user who has role ROLE_VIEWER, and sends a message to our startDirectChannel:

@Test
@WithMockUser(roles = { "VIEWER" })
public void 
  givenRoleViewer_whenSendToDirectChannel_thenAccessDenied() {
    expectedException.expectCause
      (IsInstanceOf.<Throwable> instanceOf(AccessDeniedException.class));

    startDirectChannel
      .send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
 }

Now, even though our user can send the message to startDirectChannel because he has role ROLE_VIEWER, but he cannot invoke the logMessage service which requests user with role ROLE_LOGGER.

In this case, a MessageHandlingException which has the cause is AcessDeniedException will be thrown.

The test will throw MessageHandlingException with the cause is AccessDeniedExcecption. Hence, we use an instance of ExpectedException rule to verify the cause exception.

Next, we provide a user with username jane and two roles: ROLE_LOGGER and ROLE_EDITOR.

Then try to send a message to startDirectChannel again*:*

@Test
@WithMockUser(username = "jane", roles = { "LOGGER", "EDITOR" })
public void 
  givenJaneLoggerEditor_whenSendToDirectChannel_thenFlowCompleted() {
    startDirectChannel
      .send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
    assertEquals
      (DIRECT_CHANNEL_MESSAGE, messageConsumer.getMessageContent());
}

The message will travel successfully throughout our flow starting with startDirectChannel to logMessage activator, then go to endDirectChannel. That’s because the provided authentication object has all required authorities to access those components.

6.3. Test SecurityContext Propagation

Before declaring the test case, we can review the whole flow of our example with the PublishSubscribeChannel:

  • The flow starts with a startPSChannel which have the policy sendAccess = “ROLE_VIEWER”
  • Two ServiceActivator subscribe to that channel: one has security annotation @PreAuthorize(“hasRole(‘ROLE_LOGGER’)”) , and one has security annotation @PreAuthorize(“hasRole(‘ROLE_VIEWER’)”)

And so, first we provide a user with role ROLE_VIEWER and try to send a message to our channel:

@Test
@WithMockUser(username = "user", roles = { "VIEWER" })
public void 
  givenRoleUser_whenSendMessageToPSChannel_thenNoMessageArrived() 
  throws IllegalStateException, InterruptedException {
 
    startPSChannel
      .send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));

    executor
      .getThreadPoolExecutor()
      .awaitTermination(2, TimeUnit.SECONDS);

    assertEquals(1, messageConsumer.getMessagePSContent().size());
    assertTrue(
      messageConsumer
      .getMessagePSContent().values().contains("user"));
}

Since our user only has role ROLE_VIEWER, the message can only pass through startPSChannel and one ServiceActivator.

Hence, at the end of the flow, we only receive one message.

Let’s provide a user with both roles ROLE_VIEWER and ROLE_LOGGER:

@Test
@WithMockUser(username = "user", roles = { "LOGGER", "VIEWER" })
public void 
  givenRoleUserAndLogger_whenSendMessageToPSChannel_then2GetMessages() 
  throws IllegalStateException, InterruptedException {
    startPSChannel
      .send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));

    executor
      .getThreadPoolExecutor()
      .awaitTermination(2, TimeUnit.SECONDS);

    assertEquals(2, messageConsumer.getMessagePSContent().size());
    assertTrue
      (messageConsumer
      .getMessagePSContent()
      .values().contains("user"));
    assertTrue
      (messageConsumer
      .getMessagePSContent()
      .values().contains("ROLE_LOGGER,ROLE_VIEWER"));
}

Now, we can receive both messages at the end of our flow because the user has all required authorities it needs.

7. Conclusion

In this tutorial, we’ve explored the possibility of using Spring Security in Spring Integration to secure message channel and ServiceActivator.

As always, we can find all examples over on Github.