Java Client

简介: 本教程介绍如何使用Elasticsearch 7.17.x的新版Java Client配置客户端、创建索引、映射分析及增删改查文档。通过商城搜索场景,演示索引映射设计、Java模型类构建、批量导入数据等操作,并解决LocalDateTime序列化等问题,提升开发效率。

4.1. 配置Java client

前边我们都是在Kibana中使用DSL语法直接请求Elasticsearch的HTTP 接口进行测试,DSL是Elasticsearch的领域特定语言(Domain Specific Language, DSL)是一种基于JSON的查询语言。

在项目开发中为了提高开发效率ES官方提供了各种不同语言的客户端,这些客户端的本质就是组装DSL语句,通过http请求发送给ES。官方文档地址

由于ES目前最新版本是8.x,提供了全新版本的客户端Java Client,老版本的客户端Java REST Client已经被标记为过时。

我们使用的是7.17.x版本,新版本和老版本都支持,将来的版本会全面抛弃老版本的Java REST Client ,所以本教程使用新版本的Java Client

Java Client要求:

  • Java 8 或更高版本。
  • JSON 对象映射库,可将您的应用程序类与 Elasticsearch API 无缝集成。Java 客户端支持JacksonJSON-B库(如 Eclipse Yasson)

如何集成Java Client,可参考文档:地址链接

hmall-parent中添加依赖管理:

<properties>
  <es.version>7.17.7</es.version>
  <jackson.version>2.13.0</jackson.version>
  <jakarta.json-ai.version>2.0.1</jakarta.json-ai.version>
</properties>
<!-- 对依赖包进行管理 -->
<dependencyManagement>
  <dependencies>
    <!--es-->
    <dependency>
      <groupId>co.elastic.clients</groupId>
      <artifactId>elasticsearch-java</artifactId>
      <version>${es.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>${jackson.version}</version>
    </dependency>
    <dependency>
      <groupId>jakarta.json</groupId>
      <artifactId>jakarta.json-api</artifactId>
      <version>${jakarta.json-ai.version}</version>
    </dependency>
  </dependencies>
</dependencyManagement>

在hmall-item添加如下依赖:

<dependency>
  <groupId>co.elastic.clients</groupId>
  <artifactId>elasticsearch-java</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
  <groupId>jakarta.json</groupId>
  <artifactId>jakarta.json-api</artifactId>
</dependency>

这里为了单元测试方便,我们创建一个测试类IndexTest,然后将初始化的代码编写在@BeforeEach方法中:参考:链接

package com.hmall.item.es;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
public class IndexTest {
    private ElasticsearchClient esClient;
    private RestClient restClient;
    @BeforeEach
    void setUp() {
        // Create the low-level client
        this.restClient = RestClient.builder(
            new HttpHost("192.168.101.68", 9200)).build();
        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(
            restClient, new JacksonJsonpMapper());
        // And create the API client
        this.esClient = new ElasticsearchClient(transport);
    }
    @AfterEach
    void tearDown() throws IOException {
        this.restClient.close();
    }
}

4.2. 创建索引

下边我们以商城项目为例,使用Java Client维护索引数据。搜索页面的效果如图所示:

最终我们使用Elaticsearch实现搜索接口。

4.2.1 分析索引的映射

以下分析过程非常重要,强烈建议:自己根据上面的页面完成独立分析、落地工作

首先我们需要创建索引,配置映射,首先针对上图去分析映射结构,包括哪些字段以及字段的类型等属性。

实现搜索功能需要的字段包括三大部分:

  • 搜索关键字字段:
  • 商品名称
  • 过滤字段
  • 分类
  • 品牌
  • 价格
  • 排序字段
  • 默认:按照更新时间降序排序
  • 销量
  • 价格
  • 展示字段
  • 商品id:用于点击后跳转
  • 图片地址
  • 是否是广告推广商品
  • 名称
  • 价格
  • 评价数量
  • 销量

对应的商品表结构如下,索引库无关字段已经划掉:

结合数据库表结构,以上字段对应的mapping映射属性如下:

字段名

字段类型

类型说明

是否

参与搜索

是否

参与分词

分词器

id

long

长整数

——

name

text

字符串,参与分词搜索

IK

price

integer

以分为单位,所以是整数

——

stock

integer

字符串,但是不分词

——

image

keyword

字符串,但是不分词

——

category

keyword

字符串,但是不分词

——

brand

keyword

字符串,但是不分词

——

sold

integer

销量,整数

——

commentCount

integer

评价,整数

——

isAD

boolean

布尔类型

——

updateTime

Date

更新时间

——

4.2.2 创建索引

根据分析我们使用下边的语句创建items索引:

PUT /items
{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "price":{
        "type": "integer"
      },
      "stock":{
        "type": "integer"
      },
      "image":{
        "type": "keyword",
        "index": false
      },
      "category":{
        "type": "keyword"
      },
      "brand":{
        "type": "keyword"
      },
      "sold":{
        "type": "integer"
      },
      "commentCount":{
        "type": "integer",
        "index": false
      },
      "isAD":{
        "type": "boolean"
      },
      "updateTime":{
        "type": "date"
      }
    }
  }
}

我们为什么不用Java Client去创建索引呢?

这就好比在MySQL中创建表,通常我们使用DDL语句通过MySQL客户端执行,而对于数据的CRUD及复杂的SQL语句我们会通过jdbc 去访问mysql数据库一样。

所以,使用Elasticsearch通常我们使用Kibana通过DSL语句去创建索引,而不会使用Java Client去创建索引,使用Java Client主要是为了向索引中添加文档、从索引中搜索文档。

4.3. 新增文档

接下来我们使用Java Client向索引中添加文档。

Java Client的使用方法可以参考 文档 学习

4.3.1 创建模型类

就和使用MyBatis一样操作数据库需要一个模型对象,使用Elasticsearch向索引执行CRUD操作也需要模型类。

依据索引映射创建模型类:

小技巧:创建模型类可以提供索引映射由AI去生成模型类

package com.hmall.item.domain.po;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@ApiModel(description = "索引库实体")
public class ItemDoc {
    @ApiModelProperty("商品id")
    private String id;
    @ApiModelProperty("商品名称")
    private String name;
    @ApiModelProperty("价格(分)")
    private Integer price;
    @ApiModelProperty("库存")
    private Integer stock;
    @ApiModelProperty("商品图片")
    private String image;
    @ApiModelProperty("类目名称")
    private String category;
    @ApiModelProperty("品牌名称")
    private String brand;
    @ApiModelProperty("销量")
    private Integer sold;
    @ApiModelProperty("评论数")
    private Integer commentCount;
    @ApiModelProperty("是否是推广广告,true/false")
    private Boolean isAD;
    @ApiModelProperty("更新时间")
    private LocalDateTime updateTime;
}

4.3.2 编写客户端代码

下边参考ES文档编写客户端代码。

@SpringBootTest
@Slf4j
public class IndexTest {
    ...
    @Autowired
    private IItemService itemService;
    
    @Test
    void testAddDocument() throws IOException {
        //商品id
        Long id = 100002644680L;
        // 1.根据id查询商品数据
        Item item = itemService.getById(id);
        // 2.转换为文档类型
        ItemDoc itemDoc = BeanUtil.copyProperties(item, ItemDoc.class);
        IndexResponse response = esClient.index(i -> i
                .index("items")//指定索引名称
                .id(itemDoc.getId())//指定主键
                .document(itemDoc)//指定文档对象
        );
        //结果
        String s = response.result().jsonValue();
        log.info("result:"+s);
    
    }
}

4.3.3 测试

下边进行运行测试方法。报错:

Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 date/time type `java.time.LocalDateTime` not supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling (through reference chain: com.hmall.item.domain.po.ItemDoc["updateTime"])

根据提示猜测是数据绑定出问题,现在是要把Java 对象的信息映射为ES的索引文档,jackson-datatype-jsr310对于LocalDateTime不支持。

使用AI解决:

elasticsearch使用的是7.17.7,使用co.elastic.clients.elasticsearch.ElasticsearchClient 向索引新增文档报错如下:
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 date/time type `java.time.LocalDateTime` not supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling (through reference chain: com.hmall.item.domain.po.ItemDoc["updateTime"])

根据AI提示修改如下:

添加 JavaTimeModule 以支持 LocalDateTime 类型。

这个类引包别整错了:package com.fasterxml.jackson.databind;

完整代码如下:

package com.hmall.item.es;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.json.JSONUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.AcknowledgedResponse;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.InfoRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.PutMappingResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.hmall.item.domain.po.Item;
import com.hmall.item.domain.po.ItemDoc;
import com.hmall.item.service.IItemService;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@SpringBootTest
@Slf4j
public class IndexTest {
    private ElasticsearchClient esClient;
    private RestClient restClient;
    @Autowired
    private IItemService itemService;
    @BeforeEach
    void setUp() {
        // Create the low-level client
        this.restClient = RestClient.builder(
            new HttpHost("192.168.101.68", 9200)).build();
        // 创建 ObjectMapper 实例
        ObjectMapper objectMapper = new ObjectMapper();
        // 添加 JavaTimeModule 以支持 LocalDateTime 类型
        objectMapper.registerModule(new JavaTimeModule());
        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(
            restClient, new JacksonJsonpMapper(objectMapper));
        // And create the API client
        this.esClient = new ElasticsearchClient(transport);
    }
    //创建文档
    @Test
    void testAddDocument() throws IOException {
        // 1.根据id查询商品数据
        Item item = itemService.getById(100002644680L);
        // 2.转换为文档类型
        ItemDoc itemDoc = BeanUtil.copyProperties(item, ItemDoc.class);
        IndexResponse response = esClient.index(i -> i
                                                .index("items")//指定索引名称
                                                .id(itemDoc.getId())//指定主键
                .document(itemDoc)//指定文档对象
        );
        //结果
        String s = response.result().jsonValue();
        log.info("result:"+s);
    }
    @AfterEach
    void tearDown() throws IOException {
        this.restClient.close();
    }
}

执行成功进行验证

我们使用DSL查询:

GET /items/_search

结果:

确定ES的索引中存在刚才添加的文档。

4.3.4 错误修复

这里我运行单测,提示seata相关错误,如下

解决方案:

  • 注释pom文件中seata依赖,重启测试即可

4.4 查询文档

参考文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/reading.html

通过阅读文档可知,这里实现的是根据ID查询文档,编写代码:

@Test
void testGetDocumentById() throws IOException {
GetResponse<ItemDoc> response = esClient.get(g -> g
                                             .index("items")
                                             .id("100002644680"),
                                             ItemDoc.class
                                            );
if (response.found()) {
    ItemDoc itemDoc = response.source();
    log.info("itemDoc: " + itemDoc);
} else {
    log.info ("itemDoc not found");
}
}

4.5 删除文档

通过两个API方法的学习:

  • 新增文档:esClient.index()方法
  • 查询文档:esClient.get()方法

对于删除文档的方式可以根据代码提示自行编写,如下:

@Test
void testDeleteDocumentById() throws IOException {
    DeleteResponse response = esClient.delete(d -> d
                                              .index("items")
                                              .id("100002644680")
                                             );
    String s = response.result().jsonValue();
    log.info("result:"+s);
}

4.6 修改文档

  1. 局部修改

如果要更新的文档不存在会报错。

@Test
void testUpdateDocumentById() throws IOException {
    //更新对象
    ItemDoc itemDoc = new ItemDoc();
    itemDoc.setName("更新名称");
    UpdateResponse<ItemDoc> response = esClient.update(u -> u
                                                       .index("items")
                                                       .id("100002644680")
                                                       .doc(itemDoc), ItemDoc.class);
    String s = response.result().jsonValue();
    log.info("result:"+s);
}
  1. 有则更新,没有则添加。
@Test
void testUpdateDocumentById2() throws IOException {
    //更新对象
    ItemDoc itemDoc = new ItemDoc();
    itemDoc.setName("更新名称");
    UpdateResponse<ItemDoc> response = esClient.update(u -> u
                                                       .index("items")
                                                       .id("100002644680aa")
                                                       .doc(itemDoc)
                                                       .docAsUpsert(true), ItemDoc.class);
    String s = response.result().jsonValue();
    log.info("result:"+s);
}

通过docAsUpsert(true)控制,如果没有该文档则添加新文档。

4.7. 批量导入

文档地址:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/indexing-bulk.html

测试代码如下:

引包注意

@Test
void testBatchAddDocment() throws Exception {
    //取第一页10条数据
    Page<Item> page = Page.of(0, 10);
    //查询所有商品信息
    Page<Item> itemPage = itemService.page(page, new LambdaQueryWrapper<Item>());
    //获取商品集合
    List<Item> items = itemPage.getRecords();
    //拷贝属性
    List<ItemDoc> itemDocs = BeanUtils.copyList(items, ItemDoc.class);
    //批量添加请求
    BulkRequest.Builder br = new BulkRequest.Builder();
    itemDocs.forEach(itemDoc ->
                     br.operations(op -> op
                                   .index(i -> i
                                          .index("items")
                                          .id(itemDoc.getId().toString())
                                          .document(itemDoc))));
    //构建请求
    BulkRequest build = br.build();
    //批量添加
    BulkResponse bulkResponse = esClient.bulk(build);
    //遍历结果
    bulkResponse.items().forEach(item -> log.info("添加结果:{}",item.result().toString()));
    //如果有错误
    if(bulkResponse.errors()){
        log.error("批量添加失败");
        //遍历错误
        bulkResponse.items().forEach(item -> log.error("添加失败:{}",item.error().reason()));
    }
}

此时去Kibana查询,发现多了刚才新增的10条,一共12条


相关文章
|
JSON 自然语言处理 Java
Java原生操作Elasticsearch
Java原生操作Elasticsearch
379 0
|
存储 消息中间件 负载均衡
深入理解RocketMQ广播消费
这篇文章我们聊聊广播消费,因为广播消费在某些场景下真的有奇效。笔者会从基础概念、实现机制、实战案例、注意事项四个方面一一展开,希望能帮助到大家。
深入理解RocketMQ广播消费
|
Kubernetes Linux Docker
Docker容器生产实践1——永远设置容器内存限制
背景 在默认情况下,docker容器并不会对容器内部进程使用的内存大小进行任何限制。对于PaaS系统而言,或者对于直接使用docker的用户而言,这非常危险。
3964 0
|
弹性计算 运维 网络协议
揭秘云网络大会“网红”:阿里云自研高性能网关XGW
XGW是洛神云网络平台的硬件转发层核心,提供了高性能的网络转发能力,负责公网,专线和跨Region流量的汇聚和分发,满足用户大带宽、大单流、稳定性、低延时/低抖动等需求。
8500 0
揭秘云网络大会“网红”:阿里云自研高性能网关XGW
|
8月前
|
机器学习/深度学习 运维 监控
SSE 为何引发热议?实时数据背后的关键技术指南
SSE(Server-Sent Events)是一种基于HTTP的单向实时通信技术,允许服务器主动向客户端推送数据,广泛应用于新闻通知、股票行情、赛事直播等实时场景。相比轮询和WebSocket,SSE 更节省资源、易于实现,适合无需双向交互的实时数据传输需求。
|
4月前
|
人工智能 文字识别 自然语言处理
数智化改造ERP的真实实操记录:从传统到智能的落地过程
本文分享了基于JBoltAI框架对传统ERP系统进行低侵入式数智化改造的实战经验。针对数据录入繁琐、流程协同不畅、决策缺乏支撑等痛点,通过集成AI能力,实现采购合同解析、库存智能预警、财务自动报销、生产智能助手等场景自动化。依托JBoltAI的文档处理、RAG知识库、Agent智能体与Function调用等特性,在不重构原有SpringBoot架构的前提下,完成AI赋能,显著提升效率与决策水平,为传统ERP转型提供可复用的技术路径。
380 0
|
10月前
|
存储 缓存 自然语言处理
Elasticsearch 查询性能优化:从 3 秒到 300ms 的 6 个核心参数调优指南
本文分享某电商平台 Elasticsearch 性能调优实战,通过调整分片数、刷新间隔、缓存配置等 6 个核心参数,将商品搜索从 3 秒优化至 300 毫秒,显著提升查询性能与系统吞吐量。内容涵盖性能诊断、参数调优逻辑、实操方案及避坑指南,助力高频查询场景下的 ES 优化。
|
11月前
|
SQL 人工智能 Java
阿里云百炼开源面向 Java 开发者的 NL2SQL 智能体框架
Spring-ai-alibaba-nl2sql 是析言 GBI 产品在数据问答领域的一次重要开源尝试,专注于 NL2SQL 场景下的核心能力开放。
2946 48
|
传感器 数据采集 编解码
LabVIEW代码性能优化
LabVIEW代码性能优化
342 1
|
安全 Linux 数据安全/隐私保护
【Linux】深入理解linux权限
本文深入解析Linux权限管理机制,涵盖权限概念、用户角色、文件属性及操作方法。文章分为前言、权限介绍、用户与角色、文件属性、权限修改及常见问题六大板块。详细说明了权限类型(r/w/x)、角色优先级、chmod/chown指令用法,以及目录权限、umask掩码、粘滞位等重点内容。掌握这些知识,可有效提升Linux系统安全性和灵活性,是管理员必备技能。喜欢的话别忘了点赞支持哦! ❤❤❤
807 6