这篇文章主要介绍了Spring Batch如何向Elasticsearch批量导入数据,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

主要从事网页设计、PC网站建设(电脑版网站建设)、wap网站建设(手机版网站建设)、响应式网站开发、程序开发、微网站、微信小程序等,凭借多年来在互联网的打拼,我们在互联网网站建设行业积累了丰富的网站制作、成都网站设计、网络营销经验,集策划、开发、设计、营销、管理等多方位专业化运作于一体,具备承接不同规模与类型的建设项目的能力。
1.介绍
当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。
2.示例
2.1 pom.xml
本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3
4.0.0 com.hfcsbc.estl es-etl 0.0.1-SNAPSHOT jar es-etl Demo project for Spring Boot org.springframework.boot spring-boot-starter-parent 2.0.0.M7 UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-data-jpa org.postgresql postgresql org.springframework.boot spring-boot-starter-batch com.github.vanroy spring-boot-starter-data-jest 3.0.0.RELEASE io.searchbox jest 5.3.2 org.projectlombok lombok org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin spring-snapshots Spring Snapshots https://repo.spring.io/snapshot true spring-milestones Spring Milestones https://repo.spring.io/milestone false spring-snapshots Spring Snapshots https://repo.spring.io/snapshot true spring-milestones Spring Milestones https://repo.spring.io/milestone false
2.2 实体类及repository
package com.hfcsbc.esetl.domain;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;
/**
* Create by pengchao on 2018/2/23
*/
@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")
@Entity
@Data
public class Person {
@Id
private Long id;
private String name;
@OneToOne
@Field(type = FieldType.Nested)
private Address address;
}package com.hfcsbc.esetl.domain;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
/**
* Create by pengchao on 2018/2/23
*/
@Entity
@Data
public class Address {
@Id
private Long id;
private String name;
}package com.hfcsbc.esetl.repository.jpa; import com.hfcsbc.esetl.domain.Person; import org.springframework.data.jpa.repository.JpaRepository; /** * Create by pengchao on 2018/2/23 */ public interface PersonRepository extends JpaRepository{ }
package com.hfcsbc.esetl.repository.es; import com.hfcsbc.esetl.domain.Person; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; /** * Create by pengchao on 2018/2/23 */ public interface EsPersonRepository extends ElasticsearchRepository{ }
2.3 配置elasticsearchItemWriter
package com.hfcsbc.esetl.itemWriter; import com.hfcsbc.esetl.repository.es.EsPersonRepository; import com.hfcsbc.esetl.domain.Person; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.item.ItemWriter; import java.util.List; /** * Create by pengchao on 2018/2/23 */ public class ElasticsearchItemWriter implements ItemWriter, ItemWriteListener , StepExecutionListener { private EsPersonRepository personRepository; public ElasticsearchItemWriter(EsPersonRepository personRepository) { this.personRepository = personRepository; } @Override public void beforeWrite(List extends Person> items) { } @Override public void afterWrite(List extends Person> items) { } @Override public void onWriteError(Exception exception, List extends Person> items) { } @Override public void beforeStep(StepExecution stepExecution) { } @Override public ExitStatus afterStep(StepExecution stepExecution) { return null; } @Override public void write(List extends Person> items) throws Exception { //实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引 personRepository.saveAll(items); } }
2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)
package com.hfcsbc.esetl.itemReader; import org.springframework.batch.item.data.AbstractPaginatedDataItemReader; import org.springframework.beans.factory.InitializingBean; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.query.SearchQuery; import java.util.Iterator; /** * Create by pengchao on 2018/2/24 */ public class ElasticsearchItemReaderextends AbstractPaginatedDataItemReader implements InitializingBean { private final ElasticsearchOperations elasticsearchOperations; private final SearchQuery query; private final Class extends Person> targetType; public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class extends Person> targetType) { this.elasticsearchOperations = elasticsearchOperations; this.query = query; this.targetType = targetType; } @Override protected Iterator doPageRead() { return (Iterator )elasticsearchOperations.queryForList(query, targetType).iterator(); } @Override public void afterPropertiesSet() throws Exception { } }
2.5 配置spring batch需要的配置
package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
/**
* Create by pengchao on 2018/2/23
*/
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private EsPersonRepository personRepository;
@Bean
public ItemReader orderItemReader(EntityManagerFactory entityManagerFactory){
JpaPagingItemReader reader = new JpaPagingItemReader();
String sqlQuery = "select * from person";
try {
JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider();
queryProvider.setSqlQuery(sqlQuery);
queryProvider.setEntityClass(Person.class);
queryProvider.afterPropertiesSet();
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(10000);
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
reader.setSaveState(true);
} catch (Exception e) {
e.printStackTrace();
}
return reader;
}
@Bean
public ElasticsearchItemWriter itemWriter(){
return new ElasticsearchItemWriter(personRepository);
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory,
ItemReader itemReader,
ItemWriter itemWriter){
return stepBuilderFactory
.get("step1")
.chunk(10000)
.reader(itemReader)
.writer(itemWriter)
.build();
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, Step step){
return jobBuilderFactory
.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.build();
}
/**
* spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource
* @param dataSource
* @param manager
* @return
*/
@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(manager);
jobRepositoryFactoryBean.setDatabaseType("postgres");
try {
return jobRepositoryFactoryBean.getObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
} 2.6配置数据库及es的连接地址
spring: redis: host: 192.168.1.222 data: jest: uri: http://192.168.1.222:9200 username: elastic password: changeme jpa: database: POSTGRESQL show-sql: true hibernate: ddl-auto: update datasource: platform: postgres url: jdbc:postgresql://192.168.1.222:5433/person username: hfcb password: hfcb driver-class-name: org.postgresql.Driver max-active: 2 spring.batch.initialize-schema: always
2.7 配置入口类
package com.hfcsbc.esetl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")
public class EsEtlApplication {
public static void main(String[] args) {
SpringApplication.run(EsEtlApplication.class, args);
}
}感谢你能够认真阅读完这篇文章,希望小编分享的“Spring Batch如何向Elasticsearch批量导入数据”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!