文档
Spring Batch CSV 文件批量导入示例
目标
演示 Spring Batch 经典 Chunk 模型:读取 CSV → 处理转换 → 批量写入数据库。
完整代码
1. 实体与 DTO
// 数据库实体
@Entity
@Table(name = "customers")
public class Customer {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String firstName;
private String lastName;
private String email;
private String phone;
// constructors, getters, setters...
public Customer() {}
public Customer(String firstName, String lastName, String email) {
this.firstName = firstName;
this.lastName = lastName;
this.email = email;
}
public void setPhone(String phone) { this.phone = phone; }
// ... other getters/setters
}
2. ItemReader — 读取 CSV
@Configuration
public class CustomerItemReader {
@Bean
public FlatFileItemReader<Customer> reader(
@Value("${batch.input.file}") String inputFile) {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader")
.resource(new FileSystemResource(inputFile))
.delimited()
.delimiter(",")
.names("firstName", "lastName", "email")
.linesToSkip(1) // 跳过表头
.fieldSetMapper(fieldSet -> new Customer(
fieldSet.readString("firstName"),
fieldSet.readString("lastName"),
fieldSet.readString("email")
))
.build();
}
}
3. ItemProcessor — 数据处理
@Component
public class CustomerProcessor implements ItemProcessor<Customer, Customer> {
private static final Set<String> BLACKLIST = Set.of(
"spam@example.com", "invalid@test.com"
);
@Override
public Customer process(Customer customer) throws Exception {
// 过滤黑名单
if (BLACKLIST.contains(customer.getEmail().toLowerCase())) {
return null; // 返回 null 跳过该条
}
// 格式化姓名
customer.setFirstName(
customer.getFirstName().trim().toUpperCase());
customer.setLastName(
customer.getLastName().trim().toUpperCase());
// 模拟耗时处理
Thread.sleep(10);
return customer;
}
}
4. ItemWriter — 写入数据库
@Configuration
public class CustomerItemWriter {
@Autowired
private EntityManagerFactory emf;
@Bean
public JpaItemWriter<Customer> writer() {
JpaItemWriter<Customer> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
return writer;
}
}
5. Step 与 Job 配置
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private FlatFileItemReader<Customer> reader;
@Autowired
private CustomerProcessor processor;
@Autowired
private JpaItemWriter<Customer> writer;
@Bean
public Step importStep() {
return new StepBuilder("importStep", jobRepository)
.<Customer, Customer>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skip(IllegalArgumentException.class)
.skipLimit(10) // 跳过上限
.retryLimit(3) // 重试次数
.listener(new StepListener() {
@Override
public void afterStep(StepExecution exec) {
System.out.printf("读取: %d, 写入: %d, 跳过: %d%n",
exec.getReadCount(),
exec.getWriteCount(),
exec.getSkipCount());
}
})
.build();
}
@Bean
public Job importCustomerJob() {
return new JobBuilder("importCustomerJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(new JobExecutionListener() {
@Override
public void afterJob(JobExecution exec) {
System.out.printf("Job %s 状态: %s, 耗时: %s%n",
exec.getJobInstance().getJobName(),
exec.getStatus(),
exec.getEndTime().toInstant()
.minusMillis(exec.getStartTime()
.toInstant().toEpochMilli()));
}
})
.start(importStep())
.build();
}
}
6. 触发 Job
@RestController
@RequestMapping("/batch")
public class BatchController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importCustomerJob;
@PostMapping("/import-customers")
public ResponseEntity<String> launch() throws Exception {
JobExecution execution = jobLauncher.run(
importCustomerJob,
new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters()
);
return ResponseEntity.accepted()
.body("Job ID: " + execution.getJobId());
}
}
CSV 文件示例
firstName,lastName,email
John,Doe,john@example.com
Jane,Smith,jane@example.com
Spam,Bot,spam@example.com
Bob,Wilson,bob@test.com
运行
# 配置输入文件路径
# application.properties: batch.input.file=/data/customers.csv
curl -X POST http://localhost:8080/batch/import-customers
# 输出日志:
# 读取: 4, 写入: 3, 跳过: 1
# Job importCustomerJob 状态: COMPLETED