CSV 文件批量导入

知识库
知识库文档
/tech-stacks/spring-batch/examples/CSV 文件批量导入.md

文档

Spring Batch CSV 文件批量导入示例

目标

演示 Spring Batch 经典 Chunk 模型:读取 CSV → 处理转换 → 批量写入数据库。

完整代码

1. 实体与 DTO

// 数据库实体
@Entity
@Table(name = "customers")
public class Customer {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String firstName;
    private String lastName;
    private String email;
    private String phone;

    // constructors, getters, setters...
    public Customer() {}

    public Customer(String firstName, String lastName, String email) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.email = email;
    }
    public void setPhone(String phone) { this.phone = phone; }
    // ... other getters/setters
}

2. ItemReader — 读取 CSV

@Configuration
public class CustomerItemReader {

    @Bean
    public FlatFileItemReader<Customer> reader(
            @Value("${batch.input.file}") String inputFile) {

        return new FlatFileItemReaderBuilder<Customer>()
            .name("customerItemReader")
            .resource(new FileSystemResource(inputFile))
            .delimited()
            .delimiter(",")
            .names("firstName", "lastName", "email")
            .linesToSkip(1)  // 跳过表头
            .fieldSetMapper(fieldSet -> new Customer(
                fieldSet.readString("firstName"),
                fieldSet.readString("lastName"),
                fieldSet.readString("email")
            ))
            .build();
    }
}

3. ItemProcessor — 数据处理

@Component
public class CustomerProcessor implements ItemProcessor<Customer, Customer> {

    private static final Set<String> BLACKLIST = Set.of(
        "spam@example.com", "invalid@test.com"
    );

    @Override
    public Customer process(Customer customer) throws Exception {
        // 过滤黑名单
        if (BLACKLIST.contains(customer.getEmail().toLowerCase())) {
            return null;  // 返回 null 跳过该条
        }

        // 格式化姓名
        customer.setFirstName(
            customer.getFirstName().trim().toUpperCase());
        customer.setLastName(
            customer.getLastName().trim().toUpperCase());

        // 模拟耗时处理
        Thread.sleep(10);

        return customer;
    }
}

4. ItemWriter — 写入数据库

@Configuration
public class CustomerItemWriter {

    @Autowired
    private EntityManagerFactory emf;

    @Bean
    public JpaItemWriter<Customer> writer() {
        JpaItemWriter<Customer> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(emf);
        return writer;
    }
}

5. Step 与 Job 配置

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Autowired
    private FlatFileItemReader<Customer> reader;

    @Autowired
    private CustomerProcessor processor;

    @Autowired
    private JpaItemWriter<Customer> writer;

    @Bean
    public Step importStep() {
        return new StepBuilder("importStep", jobRepository)
            .<Customer, Customer>chunk(100, transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .skip(IllegalArgumentException.class)
            .skipLimit(10)                      // 跳过上限
            .retryLimit(3)                      // 重试次数
            .listener(new StepListener() {
                @Override
                public void afterStep(StepExecution exec) {
                    System.out.printf("读取: %d, 写入: %d, 跳过: %d%n",
                        exec.getReadCount(),
                        exec.getWriteCount(),
                        exec.getSkipCount());
                }
            })
            .build();
    }

    @Bean
    public Job importCustomerJob() {
        return new JobBuilder("importCustomerJob", jobRepository)
            .incrementer(new RunIdIncrementer())
            .listener(new JobExecutionListener() {
                @Override
                public void afterJob(JobExecution exec) {
                    System.out.printf("Job %s 状态: %s, 耗时: %s%n",
                        exec.getJobInstance().getJobName(),
                        exec.getStatus(),
                        exec.getEndTime().toInstant()
                            .minusMillis(exec.getStartTime()
                                .toInstant().toEpochMilli()));
                }
            })
            .start(importStep())
            .build();
    }
}

6. 触发 Job

@RestController
@RequestMapping("/batch")
public class BatchController {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importCustomerJob;

    @PostMapping("/import-customers")
    public ResponseEntity<String> launch() throws Exception {
        JobExecution execution = jobLauncher.run(
            importCustomerJob,
            new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters()
        );
        return ResponseEntity.accepted()
            .body("Job ID: " + execution.getJobId());
    }
}

CSV 文件示例

firstName,lastName,email
John,Doe,john@example.com
Jane,Smith,jane@example.com
Spam,Bot,spam@example.com
Bob,Wilson,bob@test.com

运行

# 配置输入文件路径
# application.properties: batch.input.file=/data/customers.csv

curl -X POST http://localhost:8080/batch/import-customers

# 输出日志:
# 读取: 4, 写入: 3, 跳过: 1
# Job importCustomerJob 状态: COMPLETED

信息

路径
/tech-stacks/spring-batch/examples/CSV 文件批量导入.md
更新时间
2026/5/30