创建ES专栏很久了,但是写的文章非常的少,实在是项目比较忙,2018年最后一天了也该总结一下对es的封装的东西了,这篇博客我们通过java对es批量插入为主流程,来介绍一下java对接es 的全部过程;
需求处理流程:
从mysql中插入手机号7位所有的组合,然后通过程序处理补充后四位所有的可能,然后组成一个庞大的手机号码库,然后在讲手机号加密,为其他的应用提供 手机号明密文转换服务;
1、引入jar
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.0.1</version> </dependency>
2、根据配置创建索引
2.1、配置文件内容:
es.cluster.name=elasticsearch es.cluster.nodes=39.**.45.**:9201 #集群ip端口都好分隔 es.client.useragent=ACS/Search 1.0 es.index.casetel={"settings":{"number_of_shards":2,"number_of_replicas":1,"max_result_window":10000000},"mappings":{"casetel":{"properties":{"telId":{"type":"long"},"tel":{"type":"keyword"},"encryptTel":{"type":"keyword"}}}}}
2.2、 配置文件对应 操作类
package com.zqf.search.es.model; import com.zqf.common.utils.JsonUtils; /** * @description * @param * @date 10:18 2018/12/9 * @return * @author zhenghao */ public class ElasticSearchConfig { private String clusterName; private String nodes; private String userAgent; private String caseTel; public String getClusterName() { return clusterName; } public void setClusterName(String clusterName) { this.clusterName = clusterName; } public String getNodes() { return nodes; } public void setNodes(String nodes) { this.nodes = nodes; } public String getUserAgent() { return userAgent; } public void setUserAgent(String userAgent) { this.userAgent = userAgent; } public String getCaseTel() { return caseTel; } public void setCaseTel(String caseTel) { this.caseTel = caseTel; } @Override public String toString() { return JsonUtils.writeValue(this); } }
通过 spring管理配置实体类:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-lazy-init="false"> <bean id="esNodeConfig" class="com.zqf.search.es.model.ElasticSearchConfig"> <property name="clusterName" value="${es.cluster.name}" /> <property name="nodes" value="${es.cluster.nodes}" /> <property name="userAgent" value="${es.client.useragent}" /> <property name="caseTel" value="${es.index.casetel}" /> </bean> </beans>
2.3、系统启动检查是否有索引,如果没有根据配置创建相关索引
package com.zqf.search.es.service; import java.util.Collections; import java.util.Map; import com.sun.org.apache.bcel.internal.generic.NEW; import com.zqf.search.es.model.ElasticSearchConfig; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.client.config.RequestConfig; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.message.BasicHeader; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.action.search.*; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Service; /** * @description * @param * @date 10:18 2018/12/9 * @return * @author zhenghao */ @Service public class ElasticSearchService implements ApplicationListener<ContextRefreshedEvent> { private final Logger log = LoggerFactory.getLogger(ElasticSearchService.class); public static final int CONNECT_TIMEOUT_MILLIS = 60 * 1000; public static final int SOCKET_TIMEOUT_MILLIS = 5 * 60 * 1000; public static final int MAX_RETRY_TIMEOUT_MILLIS = SOCKET_TIMEOUT_MILLIS; public static final int CONNECTION_REQUEST_TIMEOUT_MILLIS = SOCKET_TIMEOUT_MILLIS; private RestClient restClient; private RestHighLevelClient restHighLevelClient; private BasicHeader[] basicHeaders; @Autowired private ElasticSearchConfig elasticSearchConfig; @Autowired private IndexService indexService; public BulkResponse bulk(BulkRequest bulkRequest) { try { return restHighLevelClient.bulk(bulkRequest, basicHeaders); } catch (Exception e) { log.error("bulk exception", e); } return null; } public void bulkAsync(BulkRequest bulkRequest, ActionListener<BulkResponse> listener) { restHighLevelClient.bulkAsync(bulkRequest, listener, basicHeaders); } public boolean ping() { try { return restHighLevelClient.ping(basicHeaders); } catch (Exception e) { log.error("ping exception", e); } return false; } public MainResponse info() { try { return restHighLevelClient.info(basicHeaders); } catch (Exception e) { log.error("ping exception", e); } return null; } public GetResponse get(GetRequest getRequest) { try { return restHighLevelClient.get(getRequest, basicHeaders); } catch (Exception e) { log.error("get exception", e); } return null; } public void getAsync(GetRequest getRequest, ActionListener<GetResponse> listener) { restHighLevelClient.getAsync(getRequest, listener, basicHeaders); } public boolean exists(GetRequest getRequest) { try { return restHighLevelClient.exists(getRequest, basicHeaders); } catch (Exception e) { log.error("exists exception", e); } return false; } public void existsAsync(GetRequest getRequest, ActionListener<Boolean> listener) { restHighLevelClient.existsAsync(getRequest, listener, basicHeaders); } public IndexResponse index(IndexRequest indexRequest) { try { return restHighLevelClient.index(indexRequest, basicHeaders); } catch (Exception e) { log.error("index exception", e); } return null; } public void indexAsync(IndexRequest indexRequest, ActionListener<IndexResponse> listener) { restHighLevelClient.indexAsync(indexRequest, listener, basicHeaders); } public UpdateResponse update(UpdateRequest updateRequest) { try { return restHighLevelClient.update(updateRequest, basicHeaders); } catch (Exception e) { log.error("update exception", e); } return null; } public void updateAsync(UpdateRequest updateRequest, ActionListener<UpdateResponse> listener) { restHighLevelClient.updateAsync(updateRequest, listener, basicHeaders); } public DeleteResponse delete(DeleteRequest deleteRequest) { try { return restHighLevelClient.delete(deleteRequest, basicHeaders); } catch (Exception e) { log.error("delete exception", e); } return null; } public void deleteAsync(DeleteRequest deleteRequest, ActionListener<DeleteResponse> listener) { restHighLevelClient.deleteAsync(deleteRequest, listener, basicHeaders); } public SearchResponse search(SearchRequest searchRequest) { try { return restHighLevelClient.search(searchRequest, basicHeaders); } catch (Exception e) { log.error("search exception", e); } return null; } public void searchAsync(SearchRequest searchRequest, ActionListener<SearchResponse> listener) { restHighLevelClient.searchAsync(searchRequest, listener, basicHeaders); } public SearchResponse searchScroll(SearchScrollRequest searchScrollRequest) { try { return restHighLevelClient.searchScroll(searchScrollRequest, basicHeaders); } catch (Exception e) { log.error("searchScroll exception", e); } return null; } public void searchScrollAsync(SearchScrollRequest searchScrollRequest, ActionListener<SearchResponse> listener) { restHighLevelClient.searchScrollAsync(searchScrollRequest, listener, basicHeaders); } public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest) { try { return restHighLevelClient.clearScroll(clearScrollRequest, basicHeaders); } catch (Exception e) { log.error("clearScroll exception", e); } return null; } public void clearScrollAsync(ClearScrollRequest clearScrollRequest, ActionListener<ClearScrollResponse> listener) { restHighLevelClient.clearScrollAsync(clearScrollRequest, listener, basicHeaders); } public Response performRequest(String method, String endpoint) { try { return restClient.performRequest(method, endpoint, Collections.emptyMap(), basicHeaders); } catch (Exception e) { log.error("performRequest exception", e); } return null; } public Response performRequest(String method, String endpoint, Map<String, String> params) { try { return restClient.performRequest(method, endpoint, params, basicHeaders); } catch (Exception e) { log.error("performRequest exception", e); } return null; } public Response performRequest(String method, String endpoint, Map<String, String> params, String jsonBody) { try { HttpEntity entity = new StringEntity(jsonBody, ContentType.APPLICATION_JSON); return restClient.performRequest(method, endpoint, params, entity, basicHeaders); } catch (Exception e) { log.error("performRequest exception", e); } return null; } public Response performRequest(String method, String endpoint, Map<String, String> params, String jsonBody, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) { try { HttpEntity entity = new StringEntity(jsonBody, ContentType.APPLICATION_JSON); return restClient.performRequest(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, basicHeaders); } catch (Exception e) { log.error("performRequest exception", e); } return null; } public void performRequestAsync(String method, String endpoint, ResponseListener responseListener) { restClient.performRequestAsync(method, endpoint, Collections.emptyMap(), responseListener, basicHeaders); } public void performRequestAsync(String method, String endpoint, Map<String, String> params, ResponseListener responseListener) { restClient.performRequestAsync(method, endpoint, params, responseListener, basicHeaders); } public void performRequestAsync(String method, String endpoint, Map<String, String> params, String jsonBody, ResponseListener responseListener) { HttpEntity entity = new StringEntity(jsonBody, ContentType.APPLICATION_JSON); restClient.performRequestAsync(method, endpoint, params, entity, HttpAsyncResponseConsumerFactory.DEFAULT, responseListener, basicHeaders); } public void performRequestAsync(String method, String endpoint, Map<String, String> params, String jsonBody, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, ResponseListener responseListener) { HttpEntity entity = new StringEntity(jsonBody, ContentType.APPLICATION_JSON); restClient.performRequestAsync(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, responseListener, basicHeaders); } /** * spring容器初始化完成后执行,确保可以正确初始化搜索客户端 * @param contextRefreshedEvent */ @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { initClient(); } private void initClient() { if (restHighLevelClient == null) { synchronized (this) { if (restHighLevelClient == null) { String[] nodes = elasticSearchConfig.getNodes().split(","); HttpHost[] hosts = new HttpHost[nodes.length]; for (int i = 0; i < nodes.length; i++) { hosts[i] = HttpHost.create(nodes[i]); } basicHeaders = new BasicHeader[] { new BasicHeader("Accept", "application/json; charset=UTF-8"), //new BasicHeader("Accept-Encoding", "gzip, deflate"), new BasicHeader("User-Agent", elasticSearchConfig.getUserAgent()) }; RestClientBuilder restClientBuilder = RestClient.builder(hosts); restClientBuilder.setDefaultHeaders(basicHeaders).setMaxRetryTimeoutMillis(MAX_RETRY_TIMEOUT_MILLIS).setRequestConfigCallback((RequestConfig.Builder requestConfigBuilder) -> { requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS); requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS); return requestConfigBuilder; }); // for the RestHighLevelClient restClient = restClientBuilder.build(); restHighLevelClient = new RestHighLevelClient(restClientBuilder); // 检查索引是否存在 indexService.checkIndexIsExist(); } } } } }
3、批量向索引 中插入数据service
package com.zqf.search.es.service; import com.zqf.common.utils.SHA; import com.zqf.common.utils.service.ThreadPoolService; import com.zqf.db.urgerobot.enhance.dao.NativeSqlMapper; import com.zqf.search.es.model.ElasticSearchConfig; import org.apache.commons.collections.map.HashedMap; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Response; import org.elasticsearch.common.collect.HppcMaps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.servlet.http.HttpServletResponse; import java.util.*; /** * @author zhenghao * @description * @date 2018/12/8 13:12 */ @Service public class IndexService { private static Logger log = LoggerFactory.getLogger(IndexService.class); @Autowired private ElasticSearchService elasticSearchService; @Autowired private NativeSqlMapper nativeSqlMapper; @Autowired public ElasticSearchConfig elasticSearchConfig; @Autowired private ThreadPoolService threadPoolService; /** * 检查索引是否存在,不存在则创建 */ public void checkIndexIsExist() { // 检查通话记录索引是否存在 checkAndCreateIndex("casetel", elasticSearchConfig.getCaseTel()); } public void dealTel() { threadPoolService.execute(() -> { //多线程 multiDealTel(); }); } public void multiDealTel() { StringBuilder sb = new StringBuilder(); sb.append("select number tel from sys_number where state = 1"); List<Map<String, Object>> caseCallInfoList = nativeSqlMapper.execReadSql(sb.toString()); if (caseCallInfoList == null || caseCallInfoList.size() <= 0) { return; } //启动20个线程处理 int threadCount = threadPoolService.getSuggestThreadNum(); log.info("容量线程数" + threadCount); int size = caseCallInfoList.size(); if (size < threadCount) { threadCount = caseCallInfoList.size(); } int pageCount = size / threadCount; int index = 0; List<String> telList = new ArrayList<>(); for (Map<String, Object> map : caseCallInfoList) { index++; String tel = (String) map.get("tel"); if (tel == null) { continue; } telList.add(tel); if (index > pageCount) { final List<String> finalCaseIds = new ArrayList<>(); finalCaseIds.addAll(telList); threadPoolService.execute(() -> { threadDealTel(finalCaseIds); }); telList.clear(); index = 0; } } if (telList.size() > 0) { final List<String> finalCaseIds1 = telList; threadPoolService.execute(() -> { threadDealTel(finalCaseIds1); }); } } /** * @param * @return * @description 多线程跑 * @date 10:11 2018/12/9 * @author zhenghao */ public void threadDealTel(List<String> caseCallInfoList) { if ((caseCallInfoList == null) || (caseCallInfoList.size() <= 0)) { return; } List<Map<String, Object>> telList = new ArrayList<>(); for (String tel : caseCallInfoList) { for (int i = 0; i <= 9; i++) { for (int l = 0; l <= 9; l++) { for (int m = 0; m <= 9; m++) { for (int n = 0; n <= 9; n++) { String str = tel + i + l + m + n; Map<String, Object> strMap = new HashedMap(); strMap.put("tel", str); String ret = SHA.getSHA(str); strMap.put("encryptTel", ret); telList.add(strMap); if (telList.size() == 1000) { this.insertEs(telList); log.info("size:" + telList.size()); telList.clear(); } } } } } if (telList.size() > 0 ) { this.insertEs(telList); telList.clear(); } StringBuilder sb = new StringBuilder(); sb.append("update sys_number set state =2 where number = ").append(tel); nativeSqlMapper.execWriteSql(sb.toString()); } } public void insertEs(List<Map<String, Object>> mapList) { BulkRequest bulkRequest = new BulkRequest(); mapList.forEach(caseCallInfo -> { IndexRequest indexRequest = new IndexRequest(); indexRequest.index("casetel") .type("casetel") .source(caseCallInfo); bulkRequest.add(indexRequest); }); elasticSearchService.bulkAsync(bulkRequest, new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkItemResponses) { System.out.println("使用时间" + bulkItemResponses.getTook().getMillis()); } @Override public void onFailure(Exception e) { System.out.println("发生错误原因" + e.getMessage()); } }); } /** * 检查索引是否存在,不存在则进行创建 * * @param indexName 索引名字 * @param indexConfig 索引配置信息 */ private void checkAndCreateIndex(String indexName, String indexConfig) { Response response = elasticSearchService.performRequest("HEAD", String.format("/%s", indexName), Collections.emptyMap()); // 检查索引是否存在,不存在则创建 if ((response == null) || (response.getStatusLine().getStatusCode() != HttpServletResponse.SC_OK)) { elasticSearchService.performRequest("PUT", String.format("/%s/", indexName), Collections.emptyMap(), indexConfig); } } }
4、查询service
package com.zqf.search.es.service; import org.apache.commons.lang.StringUtils; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author zhenghao * @description * @date 2018/12/9 10:26 */ @Service public class QueryService { @Autowired private ElasticSearchService elasticSearchService; /** * @description 查询电话号码是否存在 * @param * @date 10:34 2018/12/9 * @return * @author zhenghao */ public Long getTelCount(String tel) { if (tel == null) { return null; } /** * 构造查询条件 */ BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); String temTel = (tel == null) ? null : tel.replace("*", "").trim(); boolQueryBuilder.filter(QueryBuilders.wildcardQuery("tel", String.format("*%s*", temTel))); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(boolQueryBuilder); SearchRequest searchRequest = new SearchRequest(); searchRequest.indices("casetel").types("casetel").source(searchSourceBuilder); SearchResponse searchResponse = elasticSearchService.search(searchRequest); SearchHits searchHits = searchResponse.getHits(); long total = searchHits.getTotalHits(); return total; } }
6、从mysql查询数据
package com.zqf.search.es.service; import java.util.*; import java.util.function.IntFunction; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import com.zqf.db.plugin.Page; import com.zqf.db.urgerobot.enhance.dao.NativeSqlMapper; import com.zqf.db.urgerobot.user.dao.RobotUserMapper; import com.zqf.db.urgerobot.user.model.RobotUser; import com.zqf.db.urgerobot.user.model.RobotUserExample; /** * @author zhenghao * @description * @date 2018/5/30 18:29 */ @Service public class UserService { @Autowired private RobotUserMapper userMapper; @Autowired private NativeSqlMapper nativeSqlMapper; /** * @param * @return * @Description: 根据id查询员工的详细信息 * @date 19:20 2017/9/28 * @author zhenghao */ public void getUserInfoById(Long userId) { StringBuilder sb = new StringBuilder(); sb.append("select id telId, number tel,area_code encryptTel from management_sys_tel_ownership"); List<Map<String, Object>> caseCallInfoList = nativeSqlMapper.execReadSql(sb.toString()); caseCallInfoList.forEach(caseInfo->{ Long telId = (Long) caseInfo.get("telId"); String tel = (String) caseInfo.get("tel"); if (tel.length()!=7){ StringBuilder sb1 = new StringBuilder(); sb1.append("delete from management_sys_tel_ownership where id = ").append(telId); nativeSqlMapper.execReadSql(sb1.toString()); } }); } }
7、方法调用
/** * @param * @return * @description 用户登陆 * @date 0:22 2017/12/20 * @author zhenghao */ @RequestMapping("/dealData") public void login(HttpServletRequest req, HttpServletResponse res) { indexService.dealTel(); ServletUtils.toJson(req,res); }
总结:
通过上面的几个步骤,我们就可以 快速的搭建起一个search项目,所有 操作es的服务都有这个项目提供,这样其余的项目直接调用接口即可, 从 创建索引到插入数据,查询数据,基本都是模板代码,通过本篇博客可以快速构建!