【SLS开源兼容系列】使用ES SDK 访问SLS

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
注册配置 MSE Nacos/ZooKeeper,182元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介: 本文介绍如何用es sdk访问sls

场景

小丁最近把公司的elk搬迁到了sls上,使用sls的es兼容功能,配置好了Kibana和Grafana,大家用得都很开心。

不过有一个问题比较困扰,原来有一个应用使用了es的接口进行数据查询。现在因为切换sls,面临改造的问题。

咨询了阿里云sls工程师后,发现原来可以用es sdk访问sls 兼容接口,这真是极大地方便了迁移呀。

以上场景,或许会在es迁移的过程中出现。如果有系统依赖es接口,想继续使用,可以连sls es兼容接口,少量配置修改即可对接起来。

SLS和ES概念对齐

ES概念

Host

格式为https://${project}.${slsEndpoint}/es/

Username

AccessKeyId

Password

AccessKeySecret

Index

${project}.${logstore}

上面的${project}、${logstore} 和${slsEndpoint}请根据实际情况替换

示例

下面以 project 为etl-dev、logstore为accesslog、slsEndpoint为cn-huhehaote.log.aliyuncs.com 为例,演示如何用es 的sdk访问

使用curl访问sls的es兼容接口

curl -u ${AccessKeyId}:${AccessKeySecret} \
 "https://etl-dev.cn-huhehaote.log.aliyuncs.com/es/etl-dev.accesslog/_search?q=status:200"

这里的AccessKeyId 和AccessKeySecret 需要替换为真实值

q=可以填写具体请求的query,支持Lucene Query的格式

使用Python es sdk访问sls的es兼容接口

安装依赖

pip install elasticsearch==7.10.0

样例

#!/bin/env python3
import os
import json
import time
from elasticsearch import Elasticsearch, helpers
slsProject = "etl-dev"
slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
slsLogstore = "accesslog"
esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
esIndex = "%s.%s" % (slsProject, slsLogstore)
# 从环境变量中获取到ak信息
accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
esClient = Elasticsearch(hosts=esHost,
                http_auth=(accessKeyId, accessKeySecret),
                   verify_certs=True, timeout=300)
endTime = int(time.time()*1000)
startTime = endTime - 3600*1000
r = esClient.search(
    index=esIndex,
    body=   {
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": startTime,
                                "lte": endTime,
                                "format": "epoch_millis"
                            }
                        }
                    }
                ]
            }
        }
     }
)
print(json.dumps(r, indent=4))

当然也可以使用python高阶封装的接口, 避免手工组装dsl,安装

pip install elasticsearch-dsl==7.4.1

使用 elasticsearch-dsl方式访问样例

#!/bin/env python3
import os
import json
import time
from elasticsearch import Elasticsearch, helpers
from elasticsearch_dsl import Search, Q
slsProject = "etl-dev"
slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
slsLogstore = "accesslog"
esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
esIndex = "%s.%s" % (slsProject, slsLogstore)
# 从环境变量中获取到ak信息
accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
esClient = Elasticsearch(hosts=esHost,
                http_auth=(accessKeyId, accessKeySecret),
                   verify_certs=True, timeout=300)
endTime = int(time.time()*1000)
startTime = endTime - 3600*1000
s = Search(using=esClient, index=esIndex) \
        .filter(Q("range", **{"@timestamp": {"gte": startTime, "lt": endTime, "format": "epoch_millis"}}))  \
        .query("match", request_method="GET") \
response = s.execute()
for hit in response:
    # request_method, host, client_ip 是sls日志中的字段
    print(hit.request_method, hit.host, hit.client_ip)

使用Golang es sdk 访问sls的es兼容接口

样例

package main
import (
  "context"
  "fmt"
  "os"
  "time"
  "github.com/olivere/elastic/v7"
)
func main() {
  // 下面是一个es sdk访问sls es 兼容接口的样例
  slsProject := "etl-dev"
  slsLogstore := "accesslog"
  slsEndpoint := "cn-huhehaote.log.aliyuncs.com"
  accessKeyID := os.Getenv("ALIYUN_ACCESS_KEY_ID")
  accessKeySecret := os.Getenv("ALIYUN_ACCESS_KEY_SECRET")
  esHost := fmt.Sprintf("https://%s.%s:443/es", slsProject, slsEndpoint)
  esIndex := fmt.Sprintf("%s.%s", slsProject, slsLogstore)
  esClient, err := elastic.NewClient(
    elastic.SetURL(esHost),
    elastic.SetSniff(false),
    elastic.SetBasicAuth(accessKeyID, accessKeySecret), // 设置基本认证的用户名和密码
    elastic.SetHealthcheck(false),                      // 关闭健康检查
  )
  if err != nil {
    panic(err)
  }
  termQuery := elastic.NewTermQuery("request_method", "GET")
  endTime := time.Now().Unix()
  startTime := endTime - 3600
  timeRangeQuery := elastic.NewRangeQuery("@timestamp").Gte(startTime).Lte(endTime)
  boolQuery := elastic.NewBoolQuery()
  boolQuery = boolQuery.Must(timeRangeQuery, termQuery)
  searchResult, err := esClient.Search().
    Index(esIndex).
    Query(boolQuery).
    From(0).Size(10).
    Pretty(true).
    Do(context.Background())
  if err != nil {
    panic(err)
  }
  // 输出结果
  for _, hit := range searchResult.Hits.Hits {
    fmt.Println(string(hit.Source))
  }
}

使用Java es sdk访问sls的es兼容接口

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>estest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.10.2</version>
        </dependency>
    </dependencies>
</project>

样例

package org.example;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
public class Main {
    public static void main(String[] args) throws IOException {
        String slsProject = "etl-dev";
        String slsLogstore = "accesslog";
        String slsEndpoint = "cn-huhehaote.log.aliyuncs.com";
        String schema = "https";
        String esHost = slsProject + "." +  slsEndpoint; // ${project}.${endpoint}
        int port = 443;
        String esIndex = slsProject + "." + slsLogstore; // ${project}.${logstore}
        String esPrefix = "/es/";
        String accessKeyId = System.getenv("ALIYUN_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIYUN_ACCESS_KEY_SECRET");
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(accessKeyId, accessKeySecret));
        RestClientBuilder builder = RestClient.builder(new HttpHost(esHost, port, schema)).setHttpClientConfigCallback(
                    httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
        // Set /es/ prefix
        builder.setPathPrefix(esPrefix);
        RestHighLevelClient client = new RestHighLevelClient(builder);
        // Query
        BoolQueryBuilder boolExpr= new BoolQueryBuilder();
        long endTime = System.currentTimeMillis();
        long startTime = endTime - 3600 * 1000;
        boolExpr.filter().add(new MatchQueryBuilder("request_method", "GET"));
        boolExpr.filter().add(new RangeQueryBuilder("@timestamp").gte(startTime).lte(endTime).format("epoch_millis"));
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolExpr);
        SearchRequest searchRequest = new SearchRequest(esIndex);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println(searchResponse.toString());
        client.close();
    }
}

使用PHP es sdk访问sls的es兼容接口

使用 composer 安装  https://getcomposer.org/download/

composer require elasticsearch/elasticsearch

样例

<?php
require 'vendor/autoload.php';
use Elasticsearch\ClientBuilder;
$slsProject = 'etl-dev';
$slsLogstore = 'accesslog';
$slsEndpoint = 'cn-huhehaote.log.aliyuncs.com';
$esHost = $slsProject . '.' . $slsEndpoint;
$esIndex = $slsProject . '.' . $slsLogstore;
$accessKeyId = getenv('ALIYUN_ACCESS_KEY_ID');
$accessKeySecret = getenv('ALIYUN_ACCESS_KEY_SECRET');
$hosts = [
    [
        'host' => $esHost,
        'port' => '443',
        'scheme' => 'https',
        'path' => '/es',
        'user' => $accessKeyId,
        'pass' => $accessKeySecret,
    ]
];
$client = ClientBuilder::create()
    ->setHosts($hosts)
    ->build();
$endTime = round(microtime(true) * 1000); // 毫秒
$startTime = $endTime - (3600 * 1000);
$params = [
    'index' => $esIndex,
    'body'  => [
        'query' => [
            'bool' => [
                'must' => [
                    'match' => [
                        'request_method' => 'GET'
                    ]
                ],
                'filter' => [
                    'range' => [
                        '@timestamp' => [
                            'gte' => $startTime,
                            'lte' => $endTime
                        ]
                    ]
                ]
            ]
        ]
    ]
];
$response = $client->search($params);
print_r($response);

小结

通过使用es标准的sdk,设置好正确的esHost、esIndex,可以很方便地访问sls的数据。期待您的使用~

参考

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
目录
相关文章
|
9月前
|
人工智能 监控 算法
3D-Speaker:阿里通义开源的多模态说话人识别项目,支持说话人识别、语种识别、多模态识别、说话人重叠检测和日志记录
3D-Speaker是阿里巴巴通义实验室推出的多模态说话人识别开源项目,结合声学、语义和视觉信息,提供高精度的说话人识别和语种识别功能。项目包含工业级模型、训练和推理代码,以及大规模多设备、多距离、多方言的数据集,适用于多种应用场景。
1654 18
3D-Speaker:阿里通义开源的多模态说话人识别项目,支持说话人识别、语种识别、多模态识别、说话人重叠检测和日志记录
|
8月前
|
存储 人工智能 JSON
RAG Logger:专为检索增强生成(RAG)应用设计的开源日志工具,支持查询跟踪、性能监控
RAG Logger 是一款专为检索增强生成(RAG)应用设计的开源日志工具,支持查询跟踪、检索结果记录、LLM 交互记录和性能监控等功能。
328 7
RAG Logger:专为检索增强生成(RAG)应用设计的开源日志工具,支持查询跟踪、性能监控
|
9月前
|
人工智能 算法 数据挖掘
开源更新|通义3D-Speaker多说话人日志功能
开源更新|通义3D-Speaker多说话人日志功能
|
10月前
|
存储 数据采集 监控
开源日志Fluentd
【10月更文挑战第21天】
169 7
|
10月前
|
存储 数据采集 监控
开源日志分析Elasticsearch
【10月更文挑战第22天】
152 5
|
10月前
|
机器学习/深度学习 运维 监控
开源日志分析Kibana
【10月更文挑战第22天】
215 3
|
10月前
|
存储 JSON 监控
开源日志分析Logstash
【10月更文挑战第22天】
210 1
|
10月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
2787 31
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
9月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
243 9
|
7月前
|
存储 SQL 关系型数据库
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
569 35
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log