1. 概述

在系列文章的第一篇中,我们介绍了Spring Cloud Data Flow的架构组件以及如何使用它来创建流处理数据管道。与流处理不同,批处理流程使得我们可以轻松地创建短暂运行的服务,其中任务按需执行

2. 当地数据流服务器和壳牌

本地数据流服务器负责部署应用,而数据流壳则允许我们执行与服务器交互所需的DSL命令。

上一篇文章中,我们使用了Spring Initilizr将它们都设置为Spring Boot应用。

在服务器的主类上添加@EnableDataFlowServer注解,在壳的主类上添加@EnableDataFlowShell注解后,就可以通过以下操作启动它们:

mvn spring-boot:run

服务器将在9393端口启动,并且一个shell会准备好从提示符与之交互。

有关如何获取和使用本地数据流服务器及其壳客户端的详细信息,请参阅上一篇文章。

3. 批处理应用

与服务器和壳相同,我们可以使用Spring Initilizr来设置一个根Spring Boot批处理应用。

到达网站后,只需选择一个、一个Artifact名称,并在依赖项搜索框中选择Cloud Task

完成后,点击“生成项目”按钮开始下载Maven工件。

该工件已经预配置并带有基本代码。让我们来看看如何编辑它以构建我们的批处理应用。

3.1. Maven依赖

首先,我们需要添加一些Maven依赖。由于这是一个批处理应用,我们需要导入Spring Batch项目的库:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

此外,由于Spring Cloud Task使用关系数据库存储执行任务的结果,我们需要添加对RDBMS驱动程序的依赖:

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
</dependency>

我们选择了由Spring提供的内存H2数据库。这为我们提供了一个简单的开发启动方法。然而,在生产环境中,您需要配置自己的DataSource

请记住,工件的版本将从Spring Boot的父pom.xml文件继承。

3.2. 主类

启用所需功能的关键在于在Spring Boot的主类上添加@EnableTask@EnableBatchProcessing注解。这个类级别的注解告诉Spring Cloud Task启动所有东西:

@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchJobApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchJobApplication.class, args);
    }
}

3.3. 作业配置

最后,让我们配置一个作业——在这个例子中,是将一个字符串打印到日志文件:

@Configuration
public class JobConfiguration {

    private static Log logger
      = LogFactory.getLog(JobConfiguration.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
          .start(stepBuilderFactory.get("jobStep1")
          .tasklet(new Tasklet() {
            
              @Override
              public RepeatStatus execute(StepContribution contribution, 
                ChunkContext chunkContext) throws Exception {
                
                logger.info("Job was run");
                return RepeatStatus.FINISHED;
              }
        }).build()).build();
    }
}

有关如何配置和定义作业的详细信息超出了本文的范围。更多信息,请参考我们的Spring Batch入门指南

至此,我们的应用已经准备就绪。现在让我们将其安装到本地Maven仓库。为此,请切换到项目的根目录并运行命令:

mvn clean install

现在是时候将应用程序放入数据流服务器中了。

4. 注册应用

要在App注册表中注册应用,我们需要提供一个唯一的名称、应用类型以及可以解析到应用工件的URI。

在Spring Cloud Data Flow Shell中,从提示符发出命令:

app register --name batch-job --type task 
  --uri maven://com.baeldung.spring.cloud:batch-job:jar:0.0.1-SNAPSHOT

5. 创建任务

可以使用以下命令创建任务定义:

task create myjob --definition batch-job

这将创建一个名为myjob的任务,指向先前注册的batch-job应用。

可以使用以下命令获取当前的任务定义列表:

task list

6. 启动任务

要启动任务,可以使用命令:

task launch myjob

任务启动后,其状态将存储在关系型数据库中。我们可以使用命令检查任务执行状态:

task execution list

7. 查看结果

在这个示例中,作业只是简单地将一个字符串打印到日志文件中。日志文件位于数据流服务器输出的日志目录中。

要查看结果,我们可以跟踪日志:

tail -f PATH_TO_LOG\spring-cloud-dataflow-2385233467298102321\myjob-1472827120414\myjob
[...] --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [jobStep1]
[...] --- [main] o.b.spring.cloud.JobConfiguration: Job was run
[...] --- [main] o.s.b.c.l.support.SimpleJobLauncher:
  Job: [SimpleJob: [name=job]] completed with the following parameters: 
    [{}] and the following status: [COMPLETED]

8. 总结

在这篇文章中,我们展示了如何通过Spring Cloud Data Flow处理批处理。

示例代码可以在GitHub项目中找到。