文档
Spring Batch 批处理设计与调优教程
第一章:批处理核心概念
1.1 批处理架构
JobLauncher → Job → Step(1) → Step(2) → ...
↓
ItemReader → ItemProcessor → ItemWriter
↓ ↓ ↓
(Chunk 1) (Chunk 2) (Chunk N)
↓ commit ↓ commit ↓ commit
1.2 Chunk 模型详解
@Bean
public Step chunkStep(JobRepository repo, PlatformTransactionManager tm) {
return new StepBuilder("chunkStep", repo)
.<InputType, OutputType>chunk(100, tm) // 每 100 条提交一次
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
Chunk 大小选择:
| 场景 | Chunk Size |
|---|---|
| 简单映射 | 500-1000 |
| 网络调用 | 50-100 |
| 复杂计算 | 10-50 |
| 文件操作 | 100-500 |
第二章:Tasklet 模式
2.1 适用场景
Tasklet 适用于不需要 Reader/Processor/Writer 分块的简单任务:
- 文件清理
- 数据库存储过程调用
- 缓存预热
- 数据备份
@Bean
public Step cleanupStep(JobRepository repo, PlatformTransactionManager tm) {
return new StepBuilder("cleanupStep", repo)
.tasklet((contribution, chunkContext) -> {
// 清理临时文件
Files.list(Paths.get("/tmp/batch"))
.filter(f -> f.toString().endsWith(".tmp"))
.forEach(f -> f.toFile().delete());
return RepeatStatus.FINISHED;
}, tm)
.build();
}
第三章:并行处理
3.1 分区处理(Partitioning)
@Bean
public Step partitionedStep(JobRepository repo,
PlatformTransactionManager tm,
Partitioner partitioner,
Step workerStep) {
return new StepBuilder("partitionedStep", repo)
.partitioner("workerStep", partitioner)
.step(workerStep)
.gridSize(10) // 10 个并行分区
.taskExecutor(taskExecutor())
.build();
}
@Component
public class RangePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>();
int range = 1000000 / gridSize;
for (int i = 0; i < gridSize; i++) {
ExecutionContext ctx = new ExecutionContext();
ctx.putInt("minId", i * range);
ctx.putInt("maxId", (i + 1) * range - 1);
map.put("partition" + i, ctx);
}
return map;
}
}
3.2 多线程 Step
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("batch-");
executor.initialize();
return executor;
}
// 注意:非线程安全的 Reader/Writer 需要同步处理
@Bean
public Step multiThreadedStep(JobRepository repo, PlatformTransactionManager tm) {
return new StepBuilder("multiThreadedStep", repo)
.<Customer, Customer>chunk(100, tm)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.throttleLimit(8) // 最大并发线程数
.build();
}
第四章:错误处理策略
4.1 跳过 vs 重试 vs 失败
@Bean
public Step robustStep(JobRepository repo, PlatformTransactionManager tm) {
return new StepBuilder("robustStep", repo)
.<Customer, Customer>chunk(100, tm)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.skip(DataIntegrityViolationException.class) // 跳过约束冲突
.skipLimit(50)
.retry(TransientDataAccessException.class) // 重试瞬时异常
.retryLimit(3)
.noRollback(ValidationException.class) // 不触发回滚
.listener(skipListener())
.build();
}
@Component
public class SkipListener implements
SkipListener<Customer, Customer> {
@Override
public void onSkipInRead(Throwable t) {
log.warn("读取跳过: {}", t.getMessage());
}
@Override
public void onSkipInProcess(Customer item, Throwable t) {
log.warn("处理跳过: {} - {}", item.getEmail(), t.getMessage());
}
@Override
public void onSkipInWrite(Customer item, Throwable t) {
log.error("写入跳过: {} - {}", item.getEmail(), t.getMessage());
}
}
第五章:监控与运维
5.1 Job 元数据查询
@RestController
@RequestMapping("/batch/monitor")
public class BatchMonitor {
@Autowired
private JobExplorer jobExplorer;
@GetMapping("/jobs")
public List<Map<String, Object>> recentJobs() {
return jobExplorer.findJobInstancesByJobName("importCustomerJob", 0, 20)
.stream().map(instance -> {
List<JobExecution> executions =
jobExplorer.getJobExecutions(instance);
JobExecution latest = executions.get(executions.size() - 1);
return Map.of(
"jobName", instance.getJobName(),
"status", latest.getStatus().toString(),
"startTime", latest.getStartTime(),
"duration", latest.getEndTime() != null ?
java.time.Duration.between(
latest.getStartTime(), latest.getEndTime()
).toMillis() : "running"
);
}).collect(Collectors.toList());
}
}
5.2 失败作业重启
@PostMapping("/restart/{executionId}")
public String restart(@PathVariable Long executionId) throws Exception {
JobExecution execution = jobExplorer.getJobExecution(executionId);
if (execution.getStatus() == BatchStatus.FAILED) {
JobExecution newExec = jobLauncher.run(
execution.getJobInstance().getJobName(),
new JobParametersBuilder(execution.getJobParameters())
.addLong("restartTime", System.currentTimeMillis())
.toJobParameters()
);
return "Restarted: " + newExec.getJobId();
}
return "Can't restart. Status: " + execution.getStatus();
}
思考题
- Chunk 大小如何影响事务边界和内存使用?
- 分区处理时如何确保数据不被重复处理?
- 批处理作业失败后,如何实现「断点续传」?
- 在微服务架构中,Spring Batch 如何与消息队列协同工作?