【SLS开源兼容系列】使用ES SDK 访问SLS

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 本文介绍如何用es sdk访问sls

场景

小丁最近把公司的elk搬迁到了sls上,使用sls的es兼容功能,配置好了Kibana和Grafana,大家用得都很开心。

不过有一个问题比较困扰,原来有一个应用使用了es的接口进行数据查询。现在因为切换sls,面临改造的问题。

咨询了阿里云sls工程师后,发现原来可以用es sdk访问sls 兼容接口,这真是极大地方便了迁移呀。

以上场景,或许会在es迁移的过程中出现。如果有系统依赖es接口,想继续使用,可以连sls es兼容接口,少量配置修改即可对接起来。

SLS和ES概念对齐

ES概念

Host

格式为https://${project}.${slsEndpoint}/es/

Username

AccessKeyId

Password

AccessKeySecret

Index

${project}.${logstore}

上面的${project}、${logstore} 和${slsEndpoint}请根据实际情况替换

示例

下面以 project 为etl-dev、logstore为accesslog、slsEndpoint为cn-huhehaote.log.aliyuncs.com 为例,演示如何用es 的sdk访问

使用curl访问sls的es兼容接口

curl -u ${AccessKeyId}:${AccessKeySecret} \
 "https://etl-dev.cn-huhehaote.log.aliyuncs.com/es/etl-dev.accesslog/_search?q=status:200"

这里的AccessKeyId 和AccessKeySecret 需要替换为真实值

q=可以填写具体请求的query,支持Lucene Query的格式

使用Python es sdk访问sls的es兼容接口

安装依赖

pip install elasticsearch==7.10.0

样例

#!/bin/env python3
import os
import json
import time
from elasticsearch import Elasticsearch, helpers
slsProject = "etl-dev"
slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
slsLogstore = "accesslog"
esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
esIndex = "%s.%s" % (slsProject, slsLogstore)
# 从环境变量中获取到ak信息
accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
esClient = Elasticsearch(hosts=esHost,
                http_auth=(accessKeyId, accessKeySecret),
                   verify_certs=True, timeout=300)
endTime = int(time.time()*1000)
startTime = endTime - 3600*1000
r = esClient.search(
    index=esIndex,
    body=   {
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": startTime,
                                "lte": endTime,
                                "format": "epoch_millis"
                            }
                        }
                    }
                ]
            }
        }
     }
)
print(json.dumps(r, indent=4))

当然也可以使用python高阶封装的接口, 避免手工组装dsl,安装

pip install elasticsearch-dsl==7.4.1

使用 elasticsearch-dsl方式访问样例

#!/bin/env python3
import os
import json
import time
from elasticsearch import Elasticsearch, helpers
from elasticsearch_dsl import Search, Q
slsProject = "etl-dev"
slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
slsLogstore = "accesslog"
esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
esIndex = "%s.%s" % (slsProject, slsLogstore)
# 从环境变量中获取到ak信息
accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
esClient = Elasticsearch(hosts=esHost,
                http_auth=(accessKeyId, accessKeySecret),
                   verify_certs=True, timeout=300)
endTime = int(time.time()*1000)
startTime = endTime - 3600*1000
s = Search(using=esClient, index=esIndex) \
        .filter(Q("range", **{"@timestamp": {"gte": startTime, "lt": endTime, "format": "epoch_millis"}}))  \
        .query("match", request_method="GET") \
response = s.execute()
for hit in response:
    # request_method, host, client_ip 是sls日志中的字段
    print(hit.request_method, hit.host, hit.client_ip)

使用Golang es sdk 访问sls的es兼容接口

样例

package main
import (
  "context"
  "fmt"
  "os"
  "time"
  "github.com/olivere/elastic/v7"
)
func main() {
  // 下面是一个es sdk访问sls es 兼容接口的样例
  slsProject := "etl-dev"
  slsLogstore := "accesslog"
  slsEndpoint := "cn-huhehaote.log.aliyuncs.com"
  accessKeyID := os.Getenv("ALIYUN_ACCESS_KEY_ID")
  accessKeySecret := os.Getenv("ALIYUN_ACCESS_KEY_SECRET")
  esHost := fmt.Sprintf("https://%s.%s:443/es", slsProject, slsEndpoint)
  esIndex := fmt.Sprintf("%s.%s", slsProject, slsLogstore)
  esClient, err := elastic.NewClient(
    elastic.SetURL(esHost),
    elastic.SetSniff(false),
    elastic.SetBasicAuth(accessKeyID, accessKeySecret), // 设置基本认证的用户名和密码
    elastic.SetHealthcheck(false),                      // 关闭健康检查
  )
  if err != nil {
    panic(err)
  }
  termQuery := elastic.NewTermQuery("request_method", "GET")
  endTime := time.Now().Unix()
  startTime := endTime - 3600
  timeRangeQuery := elastic.NewRangeQuery("@timestamp").Gte(startTime).Lte(endTime)
  boolQuery := elastic.NewBoolQuery()
  boolQuery = boolQuery.Must(timeRangeQuery, termQuery)
  searchResult, err := esClient.Search().
    Index(esIndex).
    Query(boolQuery).
    From(0).Size(10).
    Pretty(true).
    Do(context.Background())
  if err != nil {
    panic(err)
  }
  // 输出结果
  for _, hit := range searchResult.Hits.Hits {
    fmt.Println(string(hit.Source))
  }
}

使用Java es sdk访问sls的es兼容接口

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>estest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.10.2</version>
        </dependency>
    </dependencies>
</project>

样例

package org.example;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
public class Main {
    public static void main(String[] args) throws IOException {
        String slsProject = "etl-dev";
        String slsLogstore = "accesslog";
        String slsEndpoint = "cn-huhehaote.log.aliyuncs.com";
        String schema = "https";
        String esHost = slsProject + "." +  slsEndpoint; // ${project}.${endpoint}
        int port = 443;
        String esIndex = slsProject + "." + slsLogstore; // ${project}.${logstore}
        String esPrefix = "/es/";
        String accessKeyId = System.getenv("ALIYUN_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIYUN_ACCESS_KEY_SECRET");
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(accessKeyId, accessKeySecret));
        RestClientBuilder builder = RestClient.builder(new HttpHost(esHost, port, schema)).setHttpClientConfigCallback(
                    httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
        // Set /es/ prefix
        builder.setPathPrefix(esPrefix);
        RestHighLevelClient client = new RestHighLevelClient(builder);
        // Query
        BoolQueryBuilder boolExpr= new BoolQueryBuilder();
        long endTime = System.currentTimeMillis();
        long startTime = endTime - 3600 * 1000;
        boolExpr.filter().add(new MatchQueryBuilder("request_method", "GET"));
        boolExpr.filter().add(new RangeQueryBuilder("@timestamp").gte(startTime).lte(endTime).format("epoch_millis"));
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolExpr);
        SearchRequest searchRequest = new SearchRequest(esIndex);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println(searchResponse.toString());
        client.close();
    }
}

使用PHP es sdk访问sls的es兼容接口

使用 composer 安装  https://getcomposer.org/download/

composer require elasticsearch/elasticsearch

样例

<?php
require 'vendor/autoload.php';
use Elasticsearch\ClientBuilder;
$slsProject = 'etl-dev';
$slsLogstore = 'accesslog';
$slsEndpoint = 'cn-huhehaote.log.aliyuncs.com';
$esHost = $slsProject . '.' . $slsEndpoint;
$esIndex = $slsProject . '.' . $slsLogstore;
$accessKeyId = getenv('ALIYUN_ACCESS_KEY_ID');
$accessKeySecret = getenv('ALIYUN_ACCESS_KEY_SECRET');
$hosts = [
    [
        'host' => $esHost,
        'port' => '443',
        'scheme' => 'https',
        'path' => '/es',
        'user' => $accessKeyId,
        'pass' => $accessKeySecret,
    ]
];
$client = ClientBuilder::create()
    ->setHosts($hosts)
    ->build();
$endTime = round(microtime(true) * 1000); // 毫秒
$startTime = $endTime - (3600 * 1000);
$params = [
    'index' => $esIndex,
    'body'  => [
        'query' => [
            'bool' => [
                'must' => [
                    'match' => [
                        'request_method' => 'GET'
                    ]
                ],
                'filter' => [
                    'range' => [
                        '@timestamp' => [
                            'gte' => $startTime,
                            'lte' => $endTime
                        ]
                    ]
                ]
            ]
        ]
    ]
];
$response = $client->search($params);
print_r($response);

小结

通过使用es标准的sdk,设置好正确的esHost、esIndex,可以很方便地访问sls的数据。期待您的使用~

参考

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
3月前
|
存储
优秀开源日志平台GrayLog5.0一键安装脚本
优秀开源日志平台GrayLog5.0一键安装脚本
82 0
|
3月前
|
Linux C# 开发工具
C#开源的一款友好的.NET SDK管理器
C#开源的一款友好的.NET SDK管理器
|
2月前
|
Prometheus Cloud Native 数据库
Grafana 系列文章(九):开源云原生日志解决方案 Loki 简介
Grafana 系列文章(九):开源云原生日志解决方案 Loki 简介
|
12天前
|
API 开发工具 C语言
【嵌入式开源库】EasyLogger的使用, 一款轻量级且高性能的日志库
【嵌入式开源库】EasyLogger的使用, 一款轻量级且高性能的日志库
|
1月前
|
存储 运维 网络协议
【开源物联网平台】物联网设备上云提供开箱即用接入SDK
IOTDeviceSDK是物联网平台提供的设备端软件开发工具包,可简化开发过程,实现设备快速接入各大物联网平台。设备厂商获取SDK后,根据需要选择相应功能进行移植,即可快速集成IOTDeviceSDK,实现设备的接入。
115 0
|
3月前
|
数据可视化 API 开发工具
Baumer工业相机堡盟工业相机如何通过NEOAPI SDK使用相机日志跟踪功能(C++)
Baumer工业相机堡盟工业相机如何通过NEOAPI SDK使用相机日志跟踪功能(C++)
33 0
|
3月前
|
Linux Shell
开源日志平台GrayLog5.1.10 CentOS7一键安装脚本
开源日志平台GrayLog5.1.10 CentOS7一键安装脚本
93 0
|
3月前
|
Linux
开源日志平台GrayLog5.1.7 CentOS7一键安装脚本
开源日志平台GrayLog5.1.7 CentOS7一键安装脚本
81 1
|
3月前
开源日志平台GrayLog5.1.2一键安装脚本
开源日志平台GrayLog5.1.2一键安装脚本
43 0
|
3月前
|
消息中间件 Kafka C++
【SLS开源兼容系列】从ES平滑迁移到SLS
当我们考虑把elk的数据链路迁移到sls时,往往希望做到平滑的迁移,减少迁移的代价。本文介绍几种迁移方案,供大家在做elk迁移时参考。
132 4