Elasticsearch-Jest 配置ES集群&源码解读

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Elasticsearch-Jest 配置ES集群&源码解读

20200120224331959.png

Jest Github地址

直接访问 https://github.com/searchbox-io/Jest ,把源码拉下来


搭建源码环境

我拉了个5.3.4的版本,最新版本为6.3.1 ,大同小异


20200119220835770.png


test 这个module是我自己写的测试集群代码,GitHub上是没有这个的 .


Jest配置ES集群

单例Client ,有个属性JestClient ,需要初始化。

package com.artisan.test;
import com.google.gson.GsonBuilder;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class Client {
    // volatile修饰,确保内存可见
    private volatile static Client client = null;
    private static JestClient jestClient;
    /**
     * 私有构造函数
     */
    private Client() {
        initJestClient(); // 初始化JestClient
    }
    /**
     * 懒汉模式
     * double Check
     * @return
     */
    public static Client getInstance() {
        if (client == null) {
            synchronized (Client.class) {
                if (client == null) {
                    client = new Client();
                }
            }
        }
        return client;
    }
    /**
     * 获取JestClient
     * @return
     */
    public static JestClient getJestClient() {
        return jestClient;
    }
    private void initJestClient() {
        // 初始化的集群节点
        String[] serverUris = new String[]{"http://127.0.0.1:9200", "http://127.0.0.1:8200"};
        JestClientFactory factory = new JestClientFactory();
        // 设置HttpClientConfig
        factory.setHttpClientConfig(new HttpClientConfig
                .Builder(Arrays.asList(serverUris))
                .discoveryEnabled(true) // 节点发现,确保访问的节点都是存活的节点,达到高可用
                .discoveryFrequency(2000, TimeUnit.MILLISECONDS) // NodeChecker的执行频率,默认10S
                .gson(new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create())
                .multiThreaded(true)
                .readTimeout(10000)
                .build());
        // 返回jestClient
        jestClient = factory.getObject();
    }
}


测试类

package com.artisan.test;
import io.searchbox.client.JestResult;
import io.searchbox.core.Get;
import java.io.IOException;
public class JestClientTest {
    /**
     * 构造函数
     */
    public JestClientTest() {
        Client.getInstance();// 初始化Client
    }
    private static void getDocumentMyStroe(String id) {
        Get get = new Get.Builder("my_store", id).type("product").build();
        JestResult result ;
        try {
            result = Client.getJestClient().execute(get);
            if (result != null) System.out.println(id + ":" + result.getJsonObject());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception {
        Thread.sleep(5000);// 先让NodeChecker运行,获取存活的节点,主线程这里先休眠5秒
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            Thread.sleep(2000);
            getDocumentMyStroe("998");
        }
    }
}



Jest 配置ES集群,确保应用高可用的原理探究

来看看关键点.discoveryEnabled(true) 都干了啥?

初始化 JestClient

到 JestClientFactory#getObject() 方法 中看下 ,大致说下整个方法的逻辑:

public JestClient getObject() {
       // 初始化 JestHttpClient
        JestHttpClient client = new JestHttpClient();
        if (httpClientConfig == null) {
            log.debug("There is no configuration to create http client. Going to create simple client with default values");
            httpClientConfig = new HttpClientConfig.Builder("http://localhost:9200").build();
        }
        client.setRequestCompressionEnabled(httpClientConfig.isRequestCompressionEnabled());
       // 初始化的es集群节点
        client.setServers(httpClientConfig.getServerList());
        // 设置HttpClient、AsyncClient 
        final HttpClientConnectionManager connectionManager = getConnectionManager();
        final NHttpClientConnectionManager asyncConnectionManager = getAsyncConnectionManager();
        client.setHttpClient(createHttpClient(connectionManager));
        client.setAsyncClient(createAsyncHttpClient(asyncConnectionManager));
        // 设置自定义的Gson
        Gson gson = httpClientConfig.getGson();
        if (gson == null) {
            log.info("Using default GSON instance");
        } else {
            log.info("Using custom GSON instance");
            client.setGson(gson);
        }
    // 创建NodeChecker并启动Node Discovery
        // set discovery (should be set after setting the httpClient on jestClient)
        if (httpClientConfig.isDiscoveryEnabled()) {
            log.info("Node Discovery enabled...");
            if (!Strings.isNullOrEmpty(httpClientConfig.getDiscoveryFilter())) {
                log.info("Node Discovery filtering nodes on \"{}\"", httpClientConfig.getDiscoveryFilter());
            }
            NodeChecker nodeChecker = createNodeChecker(client, httpClientConfig);
            client.setNodeChecker(nodeChecker);
            nodeChecker.startAsync();
            nodeChecker.awaitRunning();
        } else {
            log.info("Node Discovery disabled...");
        }
    //  如果maxConnectionIdleTime大于0则会创建IdleConnectionReaper,进行Idle connection reaping  (空闲线程回收)
        // schedule idle connection reaping if configured
        if (httpClientConfig.getMaxConnectionIdleTime() > 0) {
            log.info("Idle connection reaping enabled...");
            IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager, asyncConnectionManager));
            client.setIdleConnectionReaper(reaper);
            reaper.startAsync();
            reaper.awaitRunning();
        } else {
            log.info("Idle connection reaping disabled...");
        }
        Set<HttpHost> preemptiveAuthTargetHosts = httpClientConfig.getPreemptiveAuthTargetHosts();
        if (!preemptiveAuthTargetHosts.isEmpty()) {
            log.info("Authentication cache set for preemptive authentication");
            client.setHttpClientContextTemplate(createPreemptiveAuthContext(preemptiveAuthTargetHosts));
        }
        return client;
    }


重点看下 discoveryEnable 设置为true的情况下,Jest的处理逻辑


2020012000330217.png


NodeChecker 源码分析

NodeChecker继承了com.google.common.util.concurrent.AbstractScheduledService


2020012000512411.png

它的构造器根据clientConfig的discoveryFrequency及discoveryFrequencyTimeUnit了fixedDelayScheduler来执行node checker;

public NodeChecker(JestClient jestClient, ClientConfig clientConfig) {
       // 构建action ,可以根据前面HttpClientConfig#discoveryFilter(String discoveryFilter) 添加Node
        action = new NodesInfo.Builder()
                .withHttp()
                .addNode(clientConfig.getDiscoveryFilter())
                .build();
        this.client = jestClient;
        this.defaultScheme = clientConfig.getDefaultSchemeForDiscoveredNodes();
    // 根据discoveryFrequency(2000, TimeUnit.MILLISECONDS) 实例化一个定时任务出来 使用的Google Guava的包 
        this.scheduler = Scheduler.newFixedDelaySchedule(
                0l,
                clientConfig.getDiscoveryFrequency(),
                clientConfig.getDiscoveryFrequencyTimeUnit()
        );
       // 初始化的根节点 
        this.bootstrapServerList = ImmutableSet.copyOf(clientConfig.getServerList());
     // 实例化 discoveredServerList  为空,后续使用 
        this.discoveredServerList = new LinkedHashSet<String>();
    }


实现了runOneIteration方法,该方法主要是发送NodesInfo请求 GET /_nodes/_all/http

 @Override
    protected void runOneIteration() throws Exception {
        JestResult result;
        try {
            result = client.execute(action);
        } catch (CouldNotConnectException cnce) {
            // Can't connect to this node, remove it from the list
            log.error("Connect exception executing NodesInfo!", cnce);
            removeNodeAndUpdateServers(cnce.getHost());
            return;
            // do not elevate the exception since that will stop the scheduled calls.
            // throw new RuntimeException("Error executing NodesInfo!", e);
        } catch (Exception e) {
            log.error("Error executing NodesInfo!", e);
            client.setServers(bootstrapServerList);
            return;
            // do not elevate the exception since that will stop the scheduled calls.
            // throw new RuntimeException("Error executing NodesInfo!", e);
        }  
        if (result.isSucceeded()) {
            LinkedHashSet<String> httpHosts = new LinkedHashSet<String>();
            JsonObject jsonMap = result.getJsonObject();
            JsonObject nodes = (JsonObject) jsonMap.get("nodes");
            if (nodes != null) {
                for (Entry<String, JsonElement> entry : nodes.entrySet()) {
                    JsonObject host = entry.getValue().getAsJsonObject();
                    JsonElement addressElement = null;
                    if (host.has("version")) {
                        int majorVersion = Integer.parseInt(Splitter.on('.').splitToList(host.get("version").getAsString()).get(0));
                        if (majorVersion >= 5) {
                            JsonObject http = host.getAsJsonObject("http");
                            if (http != null && http.has(PUBLISH_ADDRESS_KEY_V5))
                                addressElement = http.get(PUBLISH_ADDRESS_KEY_V5);
                        }
                    }
                    if (addressElement == null) {
                        // get as a JsonElement first as some nodes in the cluster may not have an http_address
                        if (host.has(PUBLISH_ADDRESS_KEY)) addressElement = host.get(PUBLISH_ADDRESS_KEY);
                    }
                    if (addressElement != null && !addressElement.isJsonNull()) {
                        String httpAddress = getHttpAddress(addressElement.getAsString());
                        if(httpAddress != null) httpHosts.add(httpAddress);
                    }
              }
            }
            if (log.isDebugEnabled()) {
                log.debug("Discovered {} HTTP hosts: {}", httpHosts.size(), Joiner.on(',').join(httpHosts));
            }
            discoveredServerList = httpHosts;
            client.setServers(discoveredServerList);
        } else {
            log.warn("NodesInfo request resulted in error: {}", result.getErrorMessage());
            client.setServers(bootstrapServerList);
        }
    }


请求成功的话 解析body,如果nodes下面有version,取第一位,判断大于等于5的话则取http节点下面的PUBLISH_ADDRESS_KEY_V5[publish_address]属性值,封装成http后添加到discoveredServerList ,供请求获取URL使用。(里面都是存活的节点),如果没有取到,则取PUBLISH_ADDRESS_KEY[http_address]属性值,封装成http后添加到discoveredServerList。

请求抛出CouldNotConnectException则调用removeNodeAndUpdateServers方法移除该host;如果抛出其他的Exception则将client的servers重置为bootstrapServerList


20200120010505394.png


发起请求的过程


20200120010624435.png

执行的execute方法。Client.getJestClient 返回的是 JestClient接口

20200120010943394.png


看下 JestHttpClient#execute

 /**
     * @throws IOException in case of a problem or the connection was aborted during request,
     *                     or in case of a problem while reading the response stream
     * @throws CouldNotConnectException if an {@link HttpHostConnectException} is encountered
     */
    @Override
    public <T extends JestResult> T execute(Action<T> clientRequest) throws IOException {
        return execute(clientRequest, null);
    }


继续

public <T extends JestResult> T execute(Action<T> clientRequest, RequestConfig requestConfig) throws IOException {
      // 获取 HttpUriRequest 
        HttpUriRequest request = prepareRequest(clientRequest, requestConfig);
        CloseableHttpResponse response = null;
        try {
            response = executeRequest(request);
            return deserializeResponse(response, request, clientRequest);
        } catch (HttpHostConnectException ex) {
            throw new CouldNotConnectException(ex.getHost().toURI(), ex);
        } finally {
            if (response != null) {
                try {
                    response.close();
                } catch (IOException ex) {
                    log.error("Exception occurred while closing response stream.", ex);
                }
            }
        }
    }


重点来了

HttpUriRequest request = prepareRequest(clientRequest, requestConfig);


继续跟到prepareRequest

   protected <T extends JestResult> HttpUriRequest prepareRequest(final Action<T> clientRequest, final RequestConfig requestConfig) {
        String elasticSearchRestUrl = getRequestURL(getNextServer(), clientRequest.getURI());
        HttpUriRequest request = constructHttpMethod(clientRequest.getRestMethodName(), elasticSearchRestUrl, clientRequest.getData(gson), requestConfig);
        log.debug("Request method={} url={}", clientRequest.getRestMethodName(), elasticSearchRestUrl);
        // add headers added to action
        for (Entry<String, Object> header : clientRequest.getHeaders().entrySet()) {
            request.addHeader(header.getKey(), header.getValue().toString());
        }
        return request;
    }


重点: getNextServer()


20200120011217471.png


 /**
     * @throws io.searchbox.client.config.exception.NoServerConfiguredException
     */
    protected String getNextServer() {
        return serverPoolReference.get().getNextServer();
    }

继续

20200120011302393.png


总结一下:


JestHttpClient继承了AbstractJestClient,它的execute及executeAsync方法都调用了prepareRequest来构造HttpUriRequest;

prepareRequest方法会先调用getNextServer方法来获取要请求的elasticSearchServer的地址;

而getNextServer方法则是调用的serverPoolReference.get().getNextServer()

看看 serverPoolReference 是个啥?

 private final AtomicReference<ServerPool> serverPoolReference =
            new AtomicReference<ServerPool>(new ServerPool(ImmutableSet.<String>of()));
  • 再看看刚才NodeChecker 处理完成后调用的 client.setServers(discoveredServerList);

到 AbstractJestClient 类中看下 setServers方法


20200120012310346.png


AbstractJestClient有一个serverPoolReference属性,AtomicReference,其泛型为ServerPool;setServers方法则是创建新的ServerPool,然后更新serverPoolReference

20200120012502326.png

ServerPool有个AtomicInteger类型的nextServerIndex,getNextServer方法则是通过nextServerIndex.getAndIncrement() % serversRing.size()来确定取的serversRing这个List的index,其实现的是Round Robin策略;极端情况下出现IndexOutOfBoundsException的话,则会重置nextServerIndex为0,然后继续按Round Robin策略取下一个server


是不是就对上了? NodeChecker负责更新,execute则从里面取,所里取出来的都是 存活的节点。 这样就做到了动态的发现。


节点上线后,自动发送到该节点,节点挂掉后,能自动移除。 全称无需干预。


再说一点, NodeChecker有个执行频率, 确保这个执行完了以后,再请求ES。 举个例子,比如3个节点,你启动应用的时候,正好有一个节点是挂掉的,而且正常的业务请求正好请求到了这个坏的节点上,是不是就挂了。 如果NodeChecker执行完以后,那取出的节点肯定是都是存活的。


遇到的问题


说下背景, 老项目 升级 , 以前是 单个ES节点,所以 没有配置 集群,且Jest版本为Jdk1.7

初始化JestClient如下

  JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(new  HttpClientConfi.Builder("http://127.0.0.1:9200")
.gson(new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create())
                .multiThreaded(true)
                .readTimeout(10000)
                .build());
        jestClient = factory.getObject();

配置连接集群的地址,最重要的一行代码,增加 .discoveryEnabled(true)


用的是2.4.0的版本, 升级到了5.3.4以后,去debug jest的源码的时候,打上的断点,总和是源码对不起来 … 结果是 IDEA 发布的Tomcat工程路径中 老的2.4.0的jar包还在原来的目录下面,导致Tomcat加载了2.4.0 jar包中的类,删除老的jar包,重新编译测试,通过。


做了几件事儿


  1. 升级JDK到1.8
  2. Jest 升级到 5.3.4
  3. 依赖的Guava升级到了19.0

感兴趣的同学,用我上面提供的测试代码测试即可。



相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
8天前
|
存储 监控 安全
Elasticsearch 集群
【11月更文挑战第3天】
84 54
|
4天前
|
监控 API 索引
Elasticsearch集群健康检查
【11月更文挑战第4天】
16 3
|
8天前
|
存储 安全 数据管理
如何在 Rocky Linux 8 上安装和配置 Elasticsearch
本文详细介绍了在 Rocky Linux 8 上安装和配置 Elasticsearch 的步骤,包括添加仓库、安装 Elasticsearch、配置文件修改、设置内存和文件描述符、启动和验证 Elasticsearch,以及常见问题的解决方法。通过这些步骤,你可以快速搭建起这个强大的分布式搜索和分析引擎。
21 5
|
1月前
|
存储 缓存 监控
深入解析:Elasticsearch集群性能调优策略与最佳实践
【10月更文挑战第8天】Elasticsearch 是一个分布式的、基于 RESTful 风格的搜索和数据分析引擎,它能够快速地存储、搜索和分析大量数据。随着企业对实时数据处理需求的增长,Elasticsearch 被广泛应用于日志分析、全文搜索、安全信息和事件管理(SIEM)等领域。然而,为了确保 Elasticsearch 集群能够高效运行并满足业务需求,需要进行一系列的性能调优工作。
79 3
|
1月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
41 4
|
1月前
|
存储 JSON Java
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
这篇文章是关于Elasticsearch的学习指南,包括了解Elasticsearch、版本对应、安装运行Elasticsearch和Kibana、安装head插件和elasticsearch-ik分词器的步骤。
113 0
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
|
1月前
|
运维 监控 数据可视化
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
62 1
|
1月前
|
自然语言处理 搜索推荐 Java
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(一)
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图
49 0
|
1月前
|
存储 自然语言处理 搜索推荐
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(二)
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(二)
34 0
|
2月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo