springboot 2.6.2集成elasticsearch 7.16

前面说到elasticsearch 7.16集群安装,本文介绍通过springboot 2.6.2集成es的java api对其进行操作。

首先看一下pom文件

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>springboot-elasticsearch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-elasticsearch</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <hutool.version>5.7.8</hutool.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>RELEASE</version>
        </dependency>

        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>7.16.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>${hutool.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>



Elasticsearch升级到7.16之后,已经废弃了High-level API了,统一使用Low-Level API,所以某些接口发生了变化,下面列出Elasticsearch Low-Level API的一些基本操作:

从application.properties文件读取Elasticsearch配置信息

server.port=8899
spring.application.name=qa-search

elasticsearch.hosts=10.0.2.9:9200,10.0.2.78:9200,10.0.2.211:9200
elasticsearch.username=elastic
elasticsearch.password=elastic
elasticsearch.connection.timeout=10000
elasticsearch.socket.timeout=10000
elasticsearch.connection.request.timeout=10000


配置类

ElasticSearchConfig.java

package com.zh.ch.springboot.elasticsearch.config;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.zh.ch.springboot.elasticsearch.service.ElasticsearchServiceImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

@Configuration
public class ElasticSearchConfig {

    private final Log logger = LogFactory.getLog(ElasticsearchServiceImpl.class);


    @Value("${elasticsearch.hosts}")
    public String elasticsearchHost;

    @Value("${elasticsearch.username}")
    public String elasticsearchUsername;

    @Value("${elasticsearch.password}")
    public String elasticsearchPassword;

    @Value("${elasticsearch.connection.timeout}")
    public int elasticsearchConnectionTimeout;

    @Value("${elasticsearch.socket.timeout}")
    public int elasticsearchSocketTimeout;

    @Value("${elasticsearch.connection.request.timeout}")
    public int getElasticsearchConnectionRequestTimeout;

    @Bean
    public ElasticsearchClient elasticsearchClient() {
        RestClient restClient = RestClient.builder(getESHttpHosts()).setRequestConfigCallback(requestConfigBuilder -> {
            //设置连接超时时间
            requestConfigBuilder.setConnectTimeout(elasticsearchConnectionTimeout);
            requestConfigBuilder.setSocketTimeout(elasticsearchSocketTimeout);
            requestConfigBuilder.setConnectionRequestTimeout(getElasticsearchConnectionRequestTimeout);
            return requestConfigBuilder;
        }).setFailureListener(new RestClient.FailureListener() {
            //某节点失败,这里可以做一些告警
            @Override
            public void onFailure(Node node) {
                logger.error(node);
            }
        }).setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.disableAuthCaching();
            //设置账密
            return getHttpAsyncClientBuilder(httpClientBuilder);
        }).build();
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
        return new ElasticsearchClient(transport);
    }

    /**
     * ElasticSearch 连接地址
     * 多个逗号分隔
     * 示例:127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203
     */
    private HttpHost[] getESHttpHosts() {
        String[] hosts = elasticsearchHost.split(",");
        HttpHost[] httpHosts = new HttpHost[hosts.length];
        for (int i = 0; i < httpHosts.length; i++) {
            String host = hosts[i];
            host = host.replaceAll("http://", "").replaceAll("https://", "");
            Assert.isTrue(host.contains(":"), String.format("your host %s format error , Please refer to [ 127.0.0.1:9200 ] ", host));
            httpHosts[i] = new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1]), "http");
        }
        return httpHosts;
    }

    private HttpAsyncClientBuilder getHttpAsyncClientBuilder(HttpAsyncClientBuilder httpClientBuilder) {
        if (StringUtils.isEmpty(elasticsearchUsername) || StringUtils.isEmpty(elasticsearchPassword)) {
            return httpClientBuilder;
        }
        //账密设置
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        //es账号密码(一般使用,用户elastic)
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticsearchUsername, elasticsearchPassword));
        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        return httpClientBuilder;
    }

}



接口类

ElasticsearchService.java

package com.zh.ch.springboot.elasticsearch.service;

import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;

import java.util.List;
import java.util.Map;

public interface ElasticsearchService<T> {

    /**
     * 判断索引是否存在
     *
     * @param index 索引
     * @return boolean
     */
    public boolean existsIndex(String index);

    /**
     * 创建索引
     *
     * @param index 索引
     * @param aliasename aliasename 
     * @return boolean
     */
    public boolean createIndex(String index, String aliasename, int numOfShards, Map<String, Property> properties);

    /**
     * 删除索引
     *
     * @param indexList indexList
     * @return boolean
     */
    public boolean deleteIndex(List<String> indexList);

    /**
     * 判断文档是否存在
     * @param index index
     * @param id id
     * @return boolean
     */
    public boolean existsDocument(String index, String id, Class<T> clazz);

    /**
     * 保存文档
     * 如果文档存在则更新文档
     * @param index index
     * @param id id
     * @param qa qa
     * @return IndexResponse
     */
    public IndexResponse saveOrUpdateDocument(String index, String id, T qa);

    /**
     * 不指定IO保存文档
     * @param index 索引
     * @param qa 数据
     * @return IndexResponse
     */
    public IndexResponse saveOrUpdateDocument(String index, T qa);


    /**
     * 根据id获取文档
     * @param index index
     * @param id id
     * @param clazz clazz
     * @return T
     */
    public T getById(String index, String id,  Class<T> clazz);

    /**
     * 根据id列表获取文档
     * @param index index
     * @param idList id
     * @param clazz clazz
     * @return List<T>
     */
    public List<T> getByIdList(String index, List<String> idList, Class<T> clazz);

    /**
     * 分页查询
     * @param index index
     * @param pageNo pageNo
     * @param pageSize pageSize
     * @param clazz clazz
     * @return HitsMetadata<T>
     */
    public HitsMetadata<T> searchByPages(String index, Integer pageNo, Integer pageSize, Class<T> clazz);
    
     /**
     * 根据id删除文档
     * @param id id
     */
    public boolean deleteById(String index, String id);
}




实现类

ElasticsearchServiceImpl.java

package com.zh.ch.springboot.elasticsearch.service;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch._types.query_dsl.FieldAndFormat;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryStringQuery;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Highlight;
import co.elastic.clients.elasticsearch.core.search.HighlightField;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.*;

@Component
public class ElasticsearchServiceImpl<T> implements ElasticsearchService<T> {

    private final Log logger = LogFactory.getLog(ElasticsearchServiceImpl.class);


    private ElasticsearchClient client;

    @Autowired
    public void setClient(ElasticsearchClient client) {
        this.client = client;
    }

    public boolean existsIndex(String index) {
        try {
            ExistsRequest existsRequest = new ExistsRequest.Builder().index(index).build();
            BooleanResponse response = client.indices().exists(existsRequest);
            return response.value();
        } catch (IOException e) {
            logger.error("There is an error while getting index", e);
        }
        return false;
    }

    @Override
    public boolean createIndex(String indexName, String aliasesName, int numOfShards, Map<String, Property> properties) {
        try {
            TypeMapping typeMapping = new TypeMapping.Builder().properties(properties).build();
            IndexSettings indexSettings = new IndexSettings.Builder().numberOfShards(String.valueOf(numOfShards)).build();
            CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder()
                    .index(indexName)
                    .aliases(aliasesName, new Alias.Builder().isWriteIndex(true).build())
                    .mappings(typeMapping)
                    .settings(indexSettings)
                    .build();
            CreateIndexResponse response = client.indices().create(createIndexRequest);
            return response.acknowledged();
        } catch (IOException e) {
            logger.error("There is an error while creating index", e);
        }
        return false;
    }

    @Override
    public boolean deleteIndex(List<String> indexList) {
        try {
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(indexList).build();
            DeleteIndexResponse response = client.indices().delete(deleteIndexRequest);
            return response.acknowledged();
        } catch (IOException e) {
            logger.error("There is an error while deleting index", e);
        }
        return false;
    }

    @Override
    public boolean existsDocument(String index, String id, Class<T> clazz) {
        try {
            GetRequest getRequest = new GetRequest.Builder().index(index).id(id).build();
            GetResponse<T> getResponse = client.get(getRequest, clazz);
            return getResponse.found();
        } catch (IOException e) {
            logger.error("There is an error while judging if the document exists", e);
        }
        return false;
    }

    @Override
    public IndexResponse saveOrUpdateDocument(String index, String id, T t) {
        try {
            IndexRequest<T> indexRequest = new IndexRequest.Builder<T>().index(index).id(id).document(t).build();
            return client.index(indexRequest);
        } catch (IOException e) {
            logger.error("There is an error while saving the document", e);
        }
        return null;
    }

    @Override
    public IndexResponse saveOrUpdateDocument(String index, T t) {
        try {
            IndexRequest<T> indexRequest = new IndexRequest.Builder<T>().index(index).document(t).build();
            return client.index(indexRequest);
        } catch (IOException e) {
            logger.error("There is an error while saving the document", e);
        }
        return null;
    }

    @Override
    public T getById(String index, String id,  Class<T> clazz) {
        try {
            GetRequest getRequest = new GetRequest.Builder().index(index).id(id).build();
            GetResponse<T> getResponse = client.get(getRequest, clazz);
            return getResponse.source();
        } catch (IOException e) {
            logger.error("There is an error while getting the document", e);
        }
        return null;
    }

    @Override
    public List<T> getByIdList(String index, List<String> idList, Class<T> clazz) {
        try {
            List<T> tList = new ArrayList<>(idList.size());
            for (String id : idList) {
                tList.add(client.get(new GetRequest.Builder().index(index).id(id).build(), clazz).source());
            }
            return tList;
        } catch (IOException e) {
            logger.error("There is an error while getting the document list", e);
        }
        return null;
    }

    @Override
    public HitsMetadata<T> searchByPages(String index, Integer pageNo, Integer pageSize, Class<T> clazz) {
        try {
            SearchRequest searchRequest = new SearchRequest.Builder().index(Collections.singletonList(index)).from(pageNo).size(pageSize).build();
            SearchResponse<T> searchResponse = client.search(searchRequest, clazz);
            return searchResponse.hits();
        } catch (IOException e) {
            logger.error("There is an error while searching by pages", e);
        }
        return null;
    }

    public boolean deleteById(String index, String id) {
        try {
            DeleteRequest deleteRequest = new DeleteRequest.Builder().index(index).id(id).build();
            DeleteResponse deleteResponse = client.delete(deleteRequest);
            return "deleted".equals(deleteResponse.result().jsonValue());
        } catch (IOException e) {
            logger.error("There is an error while deleting id document", e);
        }
        return false;
    }
}



测试类

package com.zh.ch.springboot.elasticsearch.service;

import co.elastic.clients.elasticsearch._types.mapping.DateProperty;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import com.alibaba.fastjson.JSON;
import com.zh.ch.springboot.elasticsearch.SpringbootElasticsearchApplication;
import com.zh.ch.springboot.elasticsearch.bean.QA;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.SimpleDateFormat;
import java.util.*;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootElasticsearchApplication.class)
class ElasticsearchServiceImplTest {


    private ElasticsearchServiceImpl<QA> elasticsearchService;

    @Autowired
    public void setElasticsearchService(ElasticsearchServiceImpl elasticsearchService) {
        this.elasticsearchService = elasticsearchService;
    }

    @Test
    void existsIndex() {
        String index = "es_index_test_1";
        boolean existsIndexFlag = elasticsearchService.existsIndex(index);
        System.out.printf("%s 是否存在 %b%n", index, existsIndexFlag);
    }

    @Test
    void createIndex() {
        String index = "es_index_test_1";
        String indexAliasesName = "es_index_test_1_aliases";

        Map<String, Property> map = new HashMap<>();

        map.put("id", new Property(new DateProperty.Builder().index(true).store(true).build()));

        boolean createIndexFlag = elasticsearchService.createIndex(index, indexAliasesName, 12, map);
        System.out.printf("创建索引, index:%s , createIndexFlag:%b%n", index, createIndexFlag);
    }

    @Test
    void deleteIndex() {
        List<String> indexList = new ArrayList<>();
        indexList.add("es_index_test_1");
        boolean deleteIndexFlag = elasticsearchService.deleteIndex(indexList);
        System.out.printf("删除 %s 索引是否成功 %b", indexList, deleteIndexFlag);
    }

    @Test
    void existsDocument() {
        String index = "bigdata";
        String id = "1";
        boolean existsDocumentFlag = elasticsearchService.existsDocument(index, id, QA.class);
        System.out.printf("文档 index为 %s, id为 %s 是否存在于es中: %b",index, id, existsDocumentFlag);
    }

    @Test
    void saveOrUpdateDocument() {
        QA qa = new QA();
        qa.setType_name("flink");
        qa.setTitle("# <font color='red'>Checkpoint 做恢复的过程中出现Savepoint failed with error \"Checkpoint expired before completing\"的问题</font>");
        qa.setContent("该问题字面意思看是由于flink在做cp落地hdfs的时候,出现超时失败的问题\n" +
                "\n" +
                "\t/** The default timeout of a checkpoint attempt: 10 minutes. */\n" +
                "\tpublic static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;\n" +
                "可以看到是超时失败的问题(默认超时10min失败)。\n" +
                "\n" +
                "## 1.原因分析与排查:\n" +
                "第一种情况:自身设置了超时时间(自身做持久化的内存也不大的情况)\n" +
                "\n" +
                "//例如:仅仅间隔6sec就做持久化\n" +
                "env.getCheckpointConfig.setCheckpointTimeout( 6 * 1000) //6sec内完成checkpoint\n" +
                "![](https://search.lrting.top/images/20190313190334179.png)" +
                "如上图所示:查看Flink-web-ui的DashBoard中看到checkpoint栏目下的history中各个失败的checkpoint快照,然后查看失败时候,各个算子中使用时间,总有一些大部分完成的算子,但是另外一部分算子做checkpoint时候出现失败的情况。此时要做的是查看这部分算子的计算处理速度慢的原因。\n" +
                "\n" +
                "参考这个:[](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepoint-failed-with-error-quot-Checkpoint-expired-before-completing-quot-td24177.html)\n" +
                "![](https://search.lrting.top/images/2019031211302593.png" +
                "## 2.因此,解决办法在于:\n" +
                "1.检查是否数据倾斜;(比如:数据倾斜导致的个别算子计算能力差异巨大)\n" +
                "\n" +
                "2.开启并发增长个别处理慢的算子的处理能力;\n" +
                "\n" +
                "3.检查代码中是否存在计算速度特别慢的操作(如读写磁盘、数据库、网络传输、创建大对象等耗时操作)\n" +
                "\n" +
                "部分检查点成功问题(刚开始成功,过了几个检查点之后持久化失败的问题,参考https://blog.csdn.net/fct2001140269/article/details/88715808)\n");
        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        qa.setDt(f.format(new Date()));
        qa.setUser_id(1);
        IndexResponse indexResponse = elasticsearchService.saveOrUpdateDocument("bigdata", qa);
        System.out.printf("插入书籍是否成功 %s", indexResponse.result());
    }

    @Test
    void getById() {
        String index = "bigdata";
        QA qaList = elasticsearchService.getById(index, "1", QA.class);
        System.out.println(JSON.toJSONString(qaList));
    }

    @Test
    void getByIdList() {
        String index = "bigdata";
        List<String> idList = new ArrayList<>();
        idList.add("1");
        idList.add("2");
        List<QA> qaList = elasticsearchService.getByIdList(index, idList, QA.class);
        for (QA qa : qaList) {
            System.out.println(JSON.toJSONString(qa));
        }
    }

    @Test
    void searchByPages() {
        String index = "bigdata";
        Integer pageNo = 0;
        Integer pageSize = 10;
        HitsMetadata<QA> qaList = elasticsearchService.searchByPages(index, pageNo, pageSize, QA.class);
        System.out.println(qaList.hits().size());
    }

    @Test
    void searchByQuery() {
        String queryString = "大数据";
        HitsMetadata<QA> qaList = elasticsearchService.searchByQuery(queryString, QA.class);
        for (Hit<QA> hit : qaList.hits()) {
            System.out.println(hit.highlight());
        }
    }

    @Test
    void deleteById() {
        String index = "bigdata";
        String id = "ee00B34BwyhfTnq-1xYe";
        boolean deleteByIdFlag = elasticsearchService.deleteById(index, id);
        System.out.println(deleteByIdFlag);
    }
}




完整代码示例(https://git.lrting.top/xiaozhch5/springboot-elasticsearch.git):

5 1 投票
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/3134/

(2)
上一篇 2022-01-02 23:48
下一篇 2022-01-04 01:00

相关推荐

订阅评论
提醒
guest
0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x
()
x