1. 概述
在系列文章的第一篇中,我们介绍了Spring Cloud Data Flow的架构组件以及如何使用它来创建流处理数据管道。与流处理不同,批处理流程使得我们可以轻松地创建短暂运行的服务,其中任务按需执行。
2. 当地数据流服务器和壳牌
本地数据流服务器负责部署应用,而数据流壳则允许我们执行与服务器交互所需的DSL命令。
在上一篇文章中,我们使用了Spring Initilizr将它们都设置为Spring Boot应用。
在服务器的主类上添加@EnableDataFlowServer
注解,在壳的主类上添加@EnableDataFlowShell
注解后,就可以通过以下操作启动它们:
mvn spring-boot:run
服务器将在9393端口启动,并且一个shell会准备好从提示符与之交互。
有关如何获取和使用本地数据流服务器及其壳客户端的详细信息,请参阅上一篇文章。
3. 批处理应用
与服务器和壳相同,我们可以使用Spring Initilizr来设置一个根Spring Boot批处理应用。
到达网站后,只需选择一个组、一个Artifact名称,并在依赖项搜索框中选择Cloud Task。
完成后,点击“生成项目”按钮开始下载Maven工件。
该工件已经预配置并带有基本代码。让我们来看看如何编辑它以构建我们的批处理应用。
3.1. Maven依赖
首先,我们需要添加一些Maven依赖。由于这是一个批处理应用,我们需要导入Spring Batch项目的库:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
此外,由于Spring Cloud Task使用关系数据库存储执行任务的结果,我们需要添加对RDBMS驱动程序的依赖:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
我们选择了由Spring提供的内存H2数据库。这为我们提供了一个简单的开发启动方法。然而,在生产环境中,您需要配置自己的DataSource。
请记住,工件的版本将从Spring Boot的父pom.xml文件继承。
3.2. 主类
启用所需功能的关键在于在Spring Boot的主类上添加@EnableTask
和@EnableBatchProcessing
注解。这个类级别的注解告诉Spring Cloud Task启动所有东西:
@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchJobApplication {
public static void main(String[] args) {
SpringApplication.run(BatchJobApplication.class, args);
}
}
3.3. 作业配置
最后,让我们配置一个作业——在这个例子中,是将一个字符串打印到日志文件:
@Configuration
public class JobConfiguration {
private static Log logger
= LogFactory.getLog(JobConfiguration.class);
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(stepBuilderFactory.get("jobStep1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
logger.info("Job was run");
return RepeatStatus.FINISHED;
}
}).build()).build();
}
}
有关如何配置和定义作业的详细信息超出了本文的范围。更多信息,请参考我们的Spring Batch入门指南。
至此,我们的应用已经准备就绪。现在让我们将其安装到本地Maven仓库。为此,请切换到项目的根目录并运行命令:
mvn clean install
现在是时候将应用程序放入数据流服务器中了。
4. 注册应用
要在App注册表中注册应用,我们需要提供一个唯一的名称、应用类型以及可以解析到应用工件的URI。
在Spring Cloud Data Flow Shell中,从提示符发出命令:
app register --name batch-job --type task
--uri maven://com.baeldung.spring.cloud:batch-job:jar:0.0.1-SNAPSHOT
5. 创建任务
可以使用以下命令创建任务定义:
task create myjob --definition batch-job
这将创建一个名为myjob
的任务,指向先前注册的batch-job应用。
可以使用以下命令获取当前的任务定义列表:
task list
6. 启动任务
要启动任务,可以使用命令:
task launch myjob
任务启动后,其状态将存储在关系型数据库中。我们可以使用命令检查任务执行状态:
task execution list
7. 查看结果
在这个示例中,作业只是简单地将一个字符串打印到日志文件中。日志文件位于数据流服务器输出的日志目录中。
要查看结果,我们可以跟踪日志:
tail -f PATH_TO_LOG\spring-cloud-dataflow-2385233467298102321\myjob-1472827120414\myjob
[...] --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [jobStep1]
[...] --- [main] o.b.spring.cloud.JobConfiguration: Job was run
[...] --- [main] o.s.b.c.l.support.SimpleJobLauncher:
Job: [SimpleJob: [name=job]] completed with the following parameters:
[{}] and the following status: [COMPLETED]
8. 总结
在这篇文章中,我们展示了如何通过Spring Cloud Data Flow处理批处理。
示例代码可以在GitHub项目中找到。