Spring Batch

技术栈
后端框架
javaspring批处理ETL大数据调度

概览

Spring Batch

Spring Batch 是 Spring 生态中的轻量级批处理框架,专为处理大量数据的企业级批处理应用设计。

是什么

基于 "Chunk-oriented Processing"(分块处理)模式,提供可重用的批处理基础设施:作业(Job)、步骤(Step)、读取器(Reader)、处理器(Processor)、写入器(Writer)。支持事务管理、跳过/重试、作业调度与监控。

解决什么问题

  • 大数据量处理:逐块读取-处理-写入,避免 OOM
  • 事务管理:每块提交一次事务,失败可回滚
  • 容错能力:跳过无效记录、失败重试、作业重启
  • 作业监控:内置 Job Repository 记录执行历史和状态

关键特性

  • Chunk 模型:ItemReader → ItemProcessor → ItemWriter
  • Tasklet 模型:简单一次性任务
  • Job 调度:与 Spring Scheduler / Quartz 集成
  • 重启与重试:从上次失败处继续
  • 分区处理:并行分片处理,提升吞吐量
  • 监听器:JobExecutionListener / StepExecutionListener

安装

Spring Batch 安装指南

1. 环境准备

前置条件

  • JDK 8+(Spring Batch 5.x 需 JDK 17+)
  • 关系型数据库(MySQL / PostgreSQL / H2),用于存储 Job 元数据
  • Spring Boot 2.x 或 3.x 项目

构建工具

  • Maven 3.6+ 或 Gradle 7+

2. 安装步骤

Maven

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

<!-- 数据库驱动 -->
<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <scope>runtime</scope>
</dependency>

<!-- 调度支持(可选,用于定时触发 Job) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

Gradle

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    runtimeOnly 'com.mysql:mysql-connector-j'
    implementation 'org.springframework.boot:spring-boot-starter-quartz'
}

配置 Batch 元数据表

Spring Batch 需要 6 张元数据表。配置自动创建:

# 开发环境自动建表
spring.batch.jdbc.initialize-schema=always

# 生产环境手动执行 SQL
# spring.batch.jdbc.initialize-schema=never

手动 SQL 脚本位置:

org/springframework/batch/core/schema-mysql.sql

完整配置

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/batch_db
    username: root
    password: ${DB_PASSWORD}
  batch:
    jdbc:
      initialize-schema: always
    job:
      enabled: false  # 禁止启动时自动执行所有 Job

3. 常见安装问题

Q: 元数据表已存在但仍报错?

设置 spring.batch.jdbc.initialize-schema=never,手动建表。

Q: 内存数据库 H2 兼容问题?

H2 与 MySQL 方言不同,确保 spring.batch.jdbc.platform=h2 或移除 MySQL 驱动。

Q: Job 在启动时自动执行?

默认 Spring Boot 会执行所有 Job。设置为:

spring.batch.job.enabled=false

然后通过 REST API 或调度器手动触发。

示例

Spring Batch CSV 文件批量导入示例

目标

演示 Spring Batch 经典 Chunk 模型:读取 CSV → 处理转换 → 批量写入数据库。

完整代码

1. 实体与 DTO

// 数据库实体
@Entity
@Table(name = "customers")
public class Customer {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String firstName;
    private String lastName;
    private String email;
    private String phone;

    // constructors, getters, setters...
    public Customer() {}

    public Customer(String firstName, String lastName, String email) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.email = email;
    }
    public void setPhone(String phone) { this.phone = phone; }
    // ... other getters/setters
}

2. ItemReader — 读取 CSV

@Configuration
public class CustomerItemReader {

    @Bean
    public FlatFileItemReader<Customer> reader(
            @Value("${batch.input.file}") String inputFile) {

        return new FlatFileItemReaderBuilder<Customer>()
            .name("customerItemReader")
            .resource(new FileSystemResource(inputFile))
            .delimited()
            .delimiter(",")
            .names("firstName", "lastName", "email")
            .linesToSkip(1)  // 跳过表头
            .fieldSetMapper(fieldSet -> new Customer(
                fieldSet.readString("firstName"),
                fieldSet.readString("lastName"),
                fieldSet.readString("email")
            ))
            .build();
    }
}

3. ItemProcessor — 数据处理

@Component
public class CustomerProcessor implements ItemProcessor<Customer, Customer> {

    private static final Set<String> BLACKLIST = Set.of(
        "spam@example.com", "invalid@test.com"
    );

    @Override
    public Customer process(Customer customer) throws Exception {
        // 过滤黑名单
        if (BLACKLIST.contains(customer.getEmail().toLowerCase())) {
            return null;  // 返回 null 跳过该条
        }

        // 格式化姓名
        customer.setFirstName(
            customer.getFirstName().trim().toUpperCase());
        customer.setLastName(
            customer.getLastName().trim().toUpperCase());

        // 模拟耗时处理
        Thread.sleep(10);

        return customer;
    }
}

4. ItemWriter — 写入数据库

@Configuration
public class CustomerItemWriter {

    @Autowired
    private EntityManagerFactory emf;

    @Bean
    public JpaItemWriter<Customer> writer() {
        JpaItemWriter<Customer> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(emf);
        return writer;
    }
}

5. Step 与 Job 配置

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Autowired
    private FlatFileItemReader<Customer> reader;

    @Autowired
    private CustomerProcessor processor;

    @Autowired
    private JpaItemWriter<Customer> writer;

    @Bean
    public Step importStep() {
        return new StepBuilder("importStep", jobRepository)
            .<Customer, Customer>chunk(100, transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .skip(IllegalArgumentException.class)
            .skipLimit(10)                      // 跳过上限
            .retryLimit(3)                      // 重试次数
            .listener(new StepListener() {
                @Override
                public void afterStep(StepExecution exec) {
                    System.out.printf("读取: %d, 写入: %d, 跳过: %d%n",
                        exec.getReadCount(),
                        exec.getWriteCount(),
                        exec.getSkipCount());
                }
            })
            .build();
    }

    @Bean
    public Job importCustomerJob() {
        return new JobBuilder("importCustomerJob", jobRepository)
            .incrementer(new RunIdIncrementer())
            .listener(new JobExecutionListener() {
                @Override
                public void afterJob(JobExecution exec) {
                    System.out.printf("Job %s 状态: %s, 耗时: %s%n",
                        exec.getJobInstance().getJobName(),
                        exec.getStatus(),
                        exec.getEndTime().toInstant()
                            .minusMillis(exec.getStartTime()
                                .toInstant().toEpochMilli()));
                }
            })
            .start(importStep())
            .build();
    }
}

6. 触发 Job

@RestController
@RequestMapping("/batch")
public class BatchController {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importCustomerJob;

    @PostMapping("/import-customers")
    public ResponseEntity<String> launch() throws Exception {
        JobExecution execution = jobLauncher.run(
            importCustomerJob,
            new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters()
        );
        return ResponseEntity.accepted()
            .body("Job ID: " + execution.getJobId());
    }
}

CSV 文件示例

firstName,lastName,email
John,Doe,john@example.com
Jane,Smith,jane@example.com
Spam,Bot,spam@example.com
Bob,Wilson,bob@test.com

运行

# 配置输入文件路径
# application.properties: batch.input.file=/data/customers.csv

curl -X POST http://localhost:8080/batch/import-customers

# 输出日志:
# 读取: 4, 写入: 3, 跳过: 1
# Job importCustomerJob 状态: COMPLETED

教程

Spring Batch 批处理设计与调优教程

第一章:批处理核心概念

1.1 批处理架构

JobLauncher → Job → Step(1) → Step(2) → ...
                     ↓
                ItemReader → ItemProcessor → ItemWriter
                     ↓            ↓              ↓
                    (Chunk 1)   (Chunk 2)   (Chunk N)
                     ↓ commit    ↓ commit     ↓ commit

1.2 Chunk 模型详解

@Bean
public Step chunkStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("chunkStep", repo)
        .<InputType, OutputType>chunk(100, tm)  // 每 100 条提交一次
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .build();
}

Chunk 大小选择:

场景 Chunk Size
简单映射 500-1000
网络调用 50-100
复杂计算 10-50
文件操作 100-500

第二章:Tasklet 模式

2.1 适用场景

Tasklet 适用于不需要 Reader/Processor/Writer 分块的简单任务:

  • 文件清理
  • 数据库存储过程调用
  • 缓存预热
  • 数据备份
@Bean
public Step cleanupStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("cleanupStep", repo)
        .tasklet((contribution, chunkContext) -> {
            // 清理临时文件
            Files.list(Paths.get("/tmp/batch"))
                .filter(f -> f.toString().endsWith(".tmp"))
                .forEach(f -> f.toFile().delete());
            return RepeatStatus.FINISHED;
        }, tm)
        .build();
}

第三章:并行处理

3.1 分区处理(Partitioning)

@Bean
public Step partitionedStep(JobRepository repo,
        PlatformTransactionManager tm,
        Partitioner partitioner,
        Step workerStep) {

    return new StepBuilder("partitionedStep", repo)
        .partitioner("workerStep", partitioner)
        .step(workerStep)
        .gridSize(10)  // 10 个并行分区
        .taskExecutor(taskExecutor())
        .build();
}

@Component
public class RangePartitioner implements Partitioner {
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>();
        int range = 1000000 / gridSize;

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext ctx = new ExecutionContext();
            ctx.putInt("minId", i * range);
            ctx.putInt("maxId", (i + 1) * range - 1);
            map.put("partition" + i, ctx);
        }
        return map;
    }
}

3.2 多线程 Step

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(8);
    executor.setMaxPoolSize(16);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("batch-");
    executor.initialize();
    return executor;
}

// 注意:非线程安全的 Reader/Writer 需要同步处理
@Bean
public Step multiThreadedStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("multiThreadedStep", repo)
        .<Customer, Customer>chunk(100, tm)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .taskExecutor(taskExecutor())
        .throttleLimit(8)  // 最大并发线程数
        .build();
}

第四章:错误处理策略

4.1 跳过 vs 重试 vs 失败

@Bean
public Step robustStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("robustStep", repo)
        .<Customer, Customer>chunk(100, tm)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .faultTolerant()
        .skip(DataIntegrityViolationException.class)  // 跳过约束冲突
        .skipLimit(50)
        .retry(TransientDataAccessException.class)     // 重试瞬时异常
        .retryLimit(3)
        .noRollback(ValidationException.class)         // 不触发回滚
        .listener(skipListener())
        .build();
}

@Component
public class SkipListener implements
        SkipListener<Customer, Customer> {

    @Override
    public void onSkipInRead(Throwable t) {
        log.warn("读取跳过: {}", t.getMessage());
    }

    @Override
    public void onSkipInProcess(Customer item, Throwable t) {
        log.warn("处理跳过: {} - {}", item.getEmail(), t.getMessage());
    }

    @Override
    public void onSkipInWrite(Customer item, Throwable t) {
        log.error("写入跳过: {} - {}", item.getEmail(), t.getMessage());
    }
}

第五章:监控与运维

5.1 Job 元数据查询

@RestController
@RequestMapping("/batch/monitor")
public class BatchMonitor {

    @Autowired
    private JobExplorer jobExplorer;

    @GetMapping("/jobs")
    public List<Map<String, Object>> recentJobs() {
        return jobExplorer.findJobInstancesByJobName("importCustomerJob", 0, 20)
            .stream().map(instance -> {
                List<JobExecution> executions =
                    jobExplorer.getJobExecutions(instance);
                JobExecution latest = executions.get(executions.size() - 1);
                return Map.of(
                    "jobName", instance.getJobName(),
                    "status", latest.getStatus().toString(),
                    "startTime", latest.getStartTime(),
                    "duration", latest.getEndTime() != null ?
                        java.time.Duration.between(
                            latest.getStartTime(), latest.getEndTime()
                        ).toMillis() : "running"
                );
            }).collect(Collectors.toList());
    }
}

5.2 失败作业重启

@PostMapping("/restart/{executionId}")
public String restart(@PathVariable Long executionId) throws Exception {
    JobExecution execution = jobExplorer.getJobExecution(executionId);
    if (execution.getStatus() == BatchStatus.FAILED) {
        JobExecution newExec = jobLauncher.run(
            execution.getJobInstance().getJobName(),
            new JobParametersBuilder(execution.getJobParameters())
                .addLong("restartTime", System.currentTimeMillis())
                .toJobParameters()
        );
        return "Restarted: " + newExec.getJobId();
    }
    return "Can't restart. Status: " + execution.getStatus();
}

思考题

  1. Chunk 大小如何影响事务边界和内存使用?
  2. 分区处理时如何确保数据不被重复处理?
  3. 批处理作业失败后,如何实现「断点续传」?
  4. 在微服务架构中,Spring Batch 如何与消息队列协同工作?

参考资料

  1. [1] Spring Team. Spring Batch Reference Documentation. 2024. https://docs.spring.io/spring-batch/reference/
  2. [2] Michael T. Minella. The Definitive Guide to Spring Batch (2nd Edition). 2019.
  3. [3] Arnaud Cogoluegnes, Thierry Templier, Gary Gregory, et al.. Spring Batch in Action. 2012.