springbatch的封装与使用

栏目: 数据库 · 发布时间: 5年前

内容简介:主要实现批量数据的处理,我对batch进行的封装,提出了jobBase类型,具体job需要实现它即可。Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。主要规定了公用方法的执行策略,而具体的job名称,读,写还是需要具体JOB去实现的。注意,需要为每个job的write启个名称,否则在多job时,writ

springbatch

主要实现批量数据的处理,我对batch进行的封装,提出了jobBase类型,具体job需要实现它即可。Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。

几个组件

  • job
  • step
  • read
  • write
  • listener
  • process
  • validator

JobBase定义了几个公用的方法

/**
  * springBatch的job基础类.
  */
 public abstract class JobBase<T> {
 
   /**
    * 批次.
    */
   protected int chunkCount = 5000;
   /**
    * 监听器.
    */
   private JobExecutionListener jobExecutionListener;
   /**
    * 处理器.
    */
   private ValidatingItemProcessor<T> validatingItemProcessor;
   /**
    * job名称.
    */
   private String jobName;
   /**
    * 检验器.
    */
   private Validator<T> validator;
   @Autowired
   private JobBuilderFactory job;
   @Autowired
   private StepBuilderFactory step;
 
 
   /**
    * 初始化.
    *
    * @param jobName                 job名称
    * @param jobExecutionListener    监听器
    * @param validatingItemProcessor 处理器
    * @param validator               检验
    */
   public JobBase(String jobName,
                  JobExecutionListener jobExecutionListener,
                  ValidatingItemProcessor<T> validatingItemProcessor,
                  Validator<T> validator) {
     this.jobName = jobName;
     this.jobExecutionListener = jobExecutionListener;
     this.validatingItemProcessor = validatingItemProcessor;
     this.validator = validator;
   }
 
   /**
    * job初始化与启动.
    */
   public Job getJob() throws Exception {
     return job.get(jobName).incrementer(new RunIdIncrementer())
         .start(syncStep())
         .listener(jobExecutionListener)
         .build();
   }
 
   /**
    * 执行步骤.
    *
    * @return
    */
   public Step syncStep() throws Exception {
     return step.get("step1")
         .<T, T>chunk(chunkCount)
         .reader(reader())
         .processor(processor())
         .writer(writer())
         .build();
   }
 
   /**
    * 单条处理数据.
    *
    * @return
    */
   public ItemProcessor<T, T> processor() {
     validatingItemProcessor.setValidator(processorValidator());
     return validatingItemProcessor;
   }
 
   /**
    * 校验数据.
    *
    * @return
    */
   @Bean
   public Validator<T> processorValidator() {
     return validator;
   }
 
   /**
    * 批量读数据.
    *
    * @return
    * @throws Exception
    */
   public abstract ItemReader<T> reader() throws Exception;
 
   /**
    * 批量写数据.
    *
    * @return
    */
   @Bean
   public abstract ItemWriter<T> writer();
 
 }

主要规定了公用方法的执行策略,而具体的job名称,读,写还是需要具体JOB去实现的。

具体Job实现

@Configuration
 @EnableBatchProcessing
 public class SyncPersonJob extends JobBase<Person> {
   @Autowired
   private DataSource dataSource;
   @Autowired
   @Qualifier("primaryJdbcTemplate")
   private JdbcTemplate jdbcTemplate;
 
   /**
    * 初始化,规则了job名称和监视器.
    */
   public SyncPersonJob() {
     super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>());
   }
 
   @Override
   public ItemReader<Person> reader() throws Exception {
     StringBuffer sb = new StringBuffer();
     sb.append("select * from person");
     String sql = sb.toString();
     JdbcCursorItemReader<Person> jdbcCursorItemReader =
         new JdbcCursorItemReader<>();
     jdbcCursorItemReader.setSql(sql);
     jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
     jdbcCursorItemReader.setDataSource(dataSource);
 
     return jdbcCursorItemReader;
   }
 
 
   @Override
   @Bean("personJobWriter")
   public ItemWriter<Person> writer() {
     JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
     writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
     String sql = "insert into person_export " + "(id,name,age,nation,address) "
         + "values(:id, :name, :age, :nation,:address)";
     writer.setSql(sql);
     writer.setDataSource(dataSource);
     return writer;
   }
 
 }

写操作需要定义自己的bean的声明

注意,需要为每个job的write启个名称,否则在多job时,write将会被打乱

/**
   * 批量写数据.
   *
   * @return
   */
  @Override
  @Bean("personVerson2JobWriter")
  public ItemWriter<Person> writer() {
   
  }

添加一个api,手动触发

@Autowired
  SyncPersonJob syncPersonJob;

  @Autowired
  JobLauncher jobLauncher;

  void exec(Job job) throws Exception {
    JobParameters jobParameters = new JobParametersBuilder()
        .addLong("time", System.currentTimeMillis())
        .toJobParameters();
    jobLauncher.run(job, jobParameters);
  }

  @RequestMapping("/run1")
  public String run1() throws Exception {
    exec(syncPersonJob.getJob());
    return "personJob success";
  }

以上所述就是小编给大家介绍的《springbatch的封装与使用》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

游戏化思维

游戏化思维

[美] 凯文·韦巴赫(Kevin Werbach)、[美] 丹·亨特(Dan Hunter) / 周逵、王晓丹 / 浙江人民出版社 / 2014-4 / 36.90

[内容简介] ●本书由开设了全世界第一个游戏化课程的沃顿商学院副教授凯文·韦巴赫和丹·亨特所著,第一次全面系统地介绍游戏化的理论,阐述了如何将游戏的理念应用到商业实践中。 ●作者指出,在商业竞争日益激烈的今天,传统的激励方式渐渐失效,未来的管理将更多地建立在员工和消费者的内在动机和自我激励上。这些制作精良、设计巧妙的游戏建立在多年来对人类动机和人类心理的研究基础之上,可以最大限度地激发......一起来看看 《游戏化思维》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具