批处理设计与调优

知识库
知识库文档
/tech-stacks/spring-batch/tutorial/批处理设计与调优.md

文档

Spring Batch 批处理设计与调优教程

第一章:批处理核心概念

1.1 批处理架构

JobLauncher → Job → Step(1) → Step(2) → ...
                     ↓
                ItemReader → ItemProcessor → ItemWriter
                     ↓            ↓              ↓
                    (Chunk 1)   (Chunk 2)   (Chunk N)
                     ↓ commit    ↓ commit     ↓ commit

1.2 Chunk 模型详解

@Bean
public Step chunkStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("chunkStep", repo)
        .<InputType, OutputType>chunk(100, tm)  // 每 100 条提交一次
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .build();
}

Chunk 大小选择:

场景 Chunk Size
简单映射 500-1000
网络调用 50-100
复杂计算 10-50
文件操作 100-500

第二章:Tasklet 模式

2.1 适用场景

Tasklet 适用于不需要 Reader/Processor/Writer 分块的简单任务:

  • 文件清理
  • 数据库存储过程调用
  • 缓存预热
  • 数据备份
@Bean
public Step cleanupStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("cleanupStep", repo)
        .tasklet((contribution, chunkContext) -> {
            // 清理临时文件
            Files.list(Paths.get("/tmp/batch"))
                .filter(f -> f.toString().endsWith(".tmp"))
                .forEach(f -> f.toFile().delete());
            return RepeatStatus.FINISHED;
        }, tm)
        .build();
}

第三章:并行处理

3.1 分区处理(Partitioning)

@Bean
public Step partitionedStep(JobRepository repo,
        PlatformTransactionManager tm,
        Partitioner partitioner,
        Step workerStep) {

    return new StepBuilder("partitionedStep", repo)
        .partitioner("workerStep", partitioner)
        .step(workerStep)
        .gridSize(10)  // 10 个并行分区
        .taskExecutor(taskExecutor())
        .build();
}

@Component
public class RangePartitioner implements Partitioner {
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>();
        int range = 1000000 / gridSize;

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext ctx = new ExecutionContext();
            ctx.putInt("minId", i * range);
            ctx.putInt("maxId", (i + 1) * range - 1);
            map.put("partition" + i, ctx);
        }
        return map;
    }
}

3.2 多线程 Step

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(8);
    executor.setMaxPoolSize(16);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("batch-");
    executor.initialize();
    return executor;
}

// 注意:非线程安全的 Reader/Writer 需要同步处理
@Bean
public Step multiThreadedStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("multiThreadedStep", repo)
        .<Customer, Customer>chunk(100, tm)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .taskExecutor(taskExecutor())
        .throttleLimit(8)  // 最大并发线程数
        .build();
}

第四章:错误处理策略

4.1 跳过 vs 重试 vs 失败

@Bean
public Step robustStep(JobRepository repo, PlatformTransactionManager tm) {
    return new StepBuilder("robustStep", repo)
        .<Customer, Customer>chunk(100, tm)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .faultTolerant()
        .skip(DataIntegrityViolationException.class)  // 跳过约束冲突
        .skipLimit(50)
        .retry(TransientDataAccessException.class)     // 重试瞬时异常
        .retryLimit(3)
        .noRollback(ValidationException.class)         // 不触发回滚
        .listener(skipListener())
        .build();
}

@Component
public class SkipListener implements
        SkipListener<Customer, Customer> {

    @Override
    public void onSkipInRead(Throwable t) {
        log.warn("读取跳过: {}", t.getMessage());
    }

    @Override
    public void onSkipInProcess(Customer item, Throwable t) {
        log.warn("处理跳过: {} - {}", item.getEmail(), t.getMessage());
    }

    @Override
    public void onSkipInWrite(Customer item, Throwable t) {
        log.error("写入跳过: {} - {}", item.getEmail(), t.getMessage());
    }
}

第五章:监控与运维

5.1 Job 元数据查询

@RestController
@RequestMapping("/batch/monitor")
public class BatchMonitor {

    @Autowired
    private JobExplorer jobExplorer;

    @GetMapping("/jobs")
    public List<Map<String, Object>> recentJobs() {
        return jobExplorer.findJobInstancesByJobName("importCustomerJob", 0, 20)
            .stream().map(instance -> {
                List<JobExecution> executions =
                    jobExplorer.getJobExecutions(instance);
                JobExecution latest = executions.get(executions.size() - 1);
                return Map.of(
                    "jobName", instance.getJobName(),
                    "status", latest.getStatus().toString(),
                    "startTime", latest.getStartTime(),
                    "duration", latest.getEndTime() != null ?
                        java.time.Duration.between(
                            latest.getStartTime(), latest.getEndTime()
                        ).toMillis() : "running"
                );
            }).collect(Collectors.toList());
    }
}

5.2 失败作业重启

@PostMapping("/restart/{executionId}")
public String restart(@PathVariable Long executionId) throws Exception {
    JobExecution execution = jobExplorer.getJobExecution(executionId);
    if (execution.getStatus() == BatchStatus.FAILED) {
        JobExecution newExec = jobLauncher.run(
            execution.getJobInstance().getJobName(),
            new JobParametersBuilder(execution.getJobParameters())
                .addLong("restartTime", System.currentTimeMillis())
                .toJobParameters()
        );
        return "Restarted: " + newExec.getJobId();
    }
    return "Can't restart. Status: " + execution.getStatus();
}

思考题

  1. Chunk 大小如何影响事务边界和内存使用?
  2. 分区处理时如何确保数据不被重复处理?
  3. 批处理作业失败后,如何实现「断点续传」?
  4. 在微服务架构中,Spring Batch 如何与消息队列协同工作?

信息

路径
/tech-stacks/spring-batch/tutorial/批处理设计与调优.md
更新时间
2026/5/30