ES transport client底层是netty实现,netty本质上是异步方式,但是netty自身可以使用sync或者await(future超时机制)来实现类似同步调用!因此,ES transport client可以同步调用也可以异步(不过底层的socket必然是异步实现)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介:

ES transport client底层是netty实现,netty本质上是异步方式,但是netty自身可以使用sync或者await(future超时机制)来实现类似同步调用!

因此,ES transport client可以同步调用也可以异步(不过底层的socket必然是异步实现)。

发送端例子

对于java client的数据发送(这里以bulk为例),写过的人都知道,其实是很简单的,因为大部分事情都已经被client做掉了,那么我们先给出例子感知一下:

client初始化

Settings settings = Settings.settingsBuilder() .put("cluster.name", "myClusterName") .put("client.transport.sniff", true).build(); client=new TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress("host1",9300)) .addTransportAddress(new InetSocketTransportAddress("host2",9300)); 

bulk数据发送

对于数据的发送ES提供了两种方式:

第一种bulk api:

import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item } 

可以看到这种方式是由client端自己添加数据,然后调用BulkResponse bulkResponse = bulkRequest.get();来完成数据的发送。

第二种叫做Bulk Processor:

import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .build(); 

初始化bulk processor之后,客户端只需要往bulkProcessor添加数据即可bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));,你可以先配置好bulk的size、interval等,其他的事情就交给processor自己去做吧。

两种方式各有利弊,第一种要自己控制bulk size和interval,但是有利于对发送失败的处理;而第二种简单易用,用户只管add数据就好,但是对于使用回调函数来处理异常会不那么方便,如何选择就看使用场景的了。

 

部分内容摘自:http://www.opscoder.info/es_javaclient.html















本文转自张昺华-sky博客园博客,原文链接:http://www.cnblogs.com/bonelee/p/7889931.html,如需转载请自行联系原作者


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
成功解决: Client network socket disconnected before secure TLS connection was established
这篇文章记录了在使用Avue时遇到的"Client network socket disconnected before secure TLS connection was established"错误的解决方法,即通过修改为国内镜像(如淘宝npm镜像)来解决安装问题,并提供了具体的命令示例以及安装成功后的截图。
成功解决: Client network socket disconnected before secure TLS connection was established
|
6月前
|
监控 网络协议 Java
Java一分钟之-Netty:高性能异步网络库
【6月更文挑战第11天】Netty是Java的高性能异步网络框架,基于NIO,以其高吞吐量、低延迟、灵活性和安全性受到青睐。常见问题包括内存泄漏、ChannelHandler滥用和异常处理不当。要规避这些问题,需正确释放ByteBuf,精简ChannelPipeline,妥善处理异常,并深入理解Netty原理。通过代码审查、遵循最佳实践和监控日志,可提升代码质量和性能。掌握Netty,打造高效网络服务。
95 2
|
5月前
|
安全 NoSQL Java
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
|
6月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到控制台报错:onnected to the target VM, address: '127.0.0.1:56821', transport: 'socket',是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
209 0
|
7月前
|
监控 Unix Linux
采用异步socket实现客户端和服务端的通信
采用异步socket实现客户端和服务端的通信
52 0
|
7月前
|
前端开发 网络协议
启动异步之旅:探索Netty中Bootstrap的神奇世界
启动异步之旅:探索Netty中Bootstrap的神奇世界
46 0
|
7月前
|
前端开发 网络协议 Java
Netty入门指南:从零开始的异步网络通信
Netty入门指南:从零开始的异步网络通信
191 0
|
7月前
|
前端开发 Java API
构建异步高并发服务器:Netty与Spring Boot的完美结合
构建异步高并发服务器:Netty与Spring Boot的完美结合
|
7月前
C++socket客户端select异步连接发送接收数据
C++socket客户端select异步连接发送接收数据
135 0
|
2月前
|
网络协议 测试技术 网络安全
Python编程-Socket网络编程
Python编程-Socket网络编程
31 0