【SLS开源兼容系列】使用ES SDK 访问SLS

本文涉及的产品
应用实时监控服务-应用监控,每月50GB免费额度
可观测监控 Prometheus 版,每月50GB免费额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: 本文介绍如何用es sdk访问sls

场景

小丁最近把公司的elk搬迁到了sls上,使用sls的es兼容功能,配置好了Kibana和Grafana,大家用得都很开心。

不过有一个问题比较困扰,原来有一个应用使用了es的接口进行数据查询。现在因为切换sls,面临改造的问题。

咨询了阿里云sls工程师后,发现原来可以用es sdk访问sls 兼容接口,这真是极大地方便了迁移呀。

以上场景,或许会在es迁移的过程中出现。如果有系统依赖es接口,想继续使用,可以连sls es兼容接口,少量配置修改即可对接起来。

SLS和ES概念对齐

ES概念

Host

格式为https://${project}.${slsEndpoint}/es/

Username

AccessKeyId

Password

AccessKeySecret

Index

${project}.${logstore}

上面的${project}、${logstore} 和${slsEndpoint}请根据实际情况替换

示例

下面以 project 为etl-dev、logstore为accesslog、slsEndpoint为cn-huhehaote.log.aliyuncs.com 为例,演示如何用es 的sdk访问

使用curl访问sls的es兼容接口

curl -u ${AccessKeyId}:${AccessKeySecret} \
 "https://etl-dev.cn-huhehaote.log.aliyuncs.com/es/etl-dev.accesslog/_search?q=status:200"

这里的AccessKeyId 和AccessKeySecret 需要替换为真实值

q=可以填写具体请求的query,支持Lucene Query的格式

使用Python es sdk访问sls的es兼容接口

安装依赖

pip install elasticsearch==7.10.0

样例

#!/bin/env python3
import os
import json
import time
from elasticsearch import Elasticsearch, helpers
slsProject = "etl-dev"
slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
slsLogstore = "accesslog"
esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
esIndex = "%s.%s" % (slsProject, slsLogstore)
# 从环境变量中获取到ak信息
accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
esClient = Elasticsearch(hosts=esHost,
                http_auth=(accessKeyId, accessKeySecret),
                   verify_certs=True, timeout=300)
endTime = int(time.time()*1000)
startTime = endTime - 3600*1000
r = esClient.search(
    index=esIndex,
    body=   {
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": startTime,
                                "lte": endTime,
                                "format": "epoch_millis"
                            }
                        }
                    }
                ]
            }
        }
     }
)
print(json.dumps(r, indent=4))

当然也可以使用python高阶封装的接口, 避免手工组装dsl,安装

pip install elasticsearch-dsl==7.4.1

使用 elasticsearch-dsl方式访问样例

#!/bin/env python3
import os
import json
import time
from elasticsearch import Elasticsearch, helpers
from elasticsearch_dsl import Search, Q
slsProject = "etl-dev"
slsEndpoint = "cn-huhehaote.log.aliyuncs.com"
slsLogstore = "accesslog"
esHost = "https://%s.%s/es/" % (slsProject, slsEndpoint)
esIndex = "%s.%s" % (slsProject, slsLogstore)
# 从环境变量中获取到ak信息
accessKeyId = os.environ['ALIYUN_ACCESS_KEY_ID']
accessKeySecret = os.environ['ALIYUN_ACCESS_KEY_SECRET']
esClient = Elasticsearch(hosts=esHost,
                http_auth=(accessKeyId, accessKeySecret),
                   verify_certs=True, timeout=300)
endTime = int(time.time()*1000)
startTime = endTime - 3600*1000
s = Search(using=esClient, index=esIndex) \
        .filter(Q("range", **{"@timestamp": {"gte": startTime, "lt": endTime, "format": "epoch_millis"}}))  \
        .query("match", request_method="GET") \
response = s.execute()
for hit in response:
    # request_method, host, client_ip 是sls日志中的字段
    print(hit.request_method, hit.host, hit.client_ip)

使用Golang es sdk 访问sls的es兼容接口

样例

package main
import (
  "context"
  "fmt"
  "os"
  "time"
  "github.com/olivere/elastic/v7"
)
func main() {
  // 下面是一个es sdk访问sls es 兼容接口的样例
  slsProject := "etl-dev"
  slsLogstore := "accesslog"
  slsEndpoint := "cn-huhehaote.log.aliyuncs.com"
  accessKeyID := os.Getenv("ALIYUN_ACCESS_KEY_ID")
  accessKeySecret := os.Getenv("ALIYUN_ACCESS_KEY_SECRET")
  esHost := fmt.Sprintf("https://%s.%s:443/es", slsProject, slsEndpoint)
  esIndex := fmt.Sprintf("%s.%s", slsProject, slsLogstore)
  esClient, err := elastic.NewClient(
    elastic.SetURL(esHost),
    elastic.SetSniff(false),
    elastic.SetBasicAuth(accessKeyID, accessKeySecret), // 设置基本认证的用户名和密码
    elastic.SetHealthcheck(false),                      // 关闭健康检查
  )
  if err != nil {
    panic(err)
  }
  termQuery := elastic.NewTermQuery("request_method", "GET")
  endTime := time.Now().Unix()
  startTime := endTime - 3600
  timeRangeQuery := elastic.NewRangeQuery("@timestamp").Gte(startTime).Lte(endTime)
  boolQuery := elastic.NewBoolQuery()
  boolQuery = boolQuery.Must(timeRangeQuery, termQuery)
  searchResult, err := esClient.Search().
    Index(esIndex).
    Query(boolQuery).
    From(0).Size(10).
    Pretty(true).
    Do(context.Background())
  if err != nil {
    panic(err)
  }
  // 输出结果
  for _, hit := range searchResult.Hits.Hits {
    fmt.Println(string(hit.Source))
  }
}

使用Java es sdk访问sls的es兼容接口

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>estest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.10.2</version>
        </dependency>
    </dependencies>
</project>

样例

package org.example;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
public class Main {
    public static void main(String[] args) throws IOException {
        String slsProject = "etl-dev";
        String slsLogstore = "accesslog";
        String slsEndpoint = "cn-huhehaote.log.aliyuncs.com";
        String schema = "https";
        String esHost = slsProject + "." +  slsEndpoint; // ${project}.${endpoint}
        int port = 443;
        String esIndex = slsProject + "." + slsLogstore; // ${project}.${logstore}
        String esPrefix = "/es/";
        String accessKeyId = System.getenv("ALIYUN_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIYUN_ACCESS_KEY_SECRET");
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(accessKeyId, accessKeySecret));
        RestClientBuilder builder = RestClient.builder(new HttpHost(esHost, port, schema)).setHttpClientConfigCallback(
                    httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
        // Set /es/ prefix
        builder.setPathPrefix(esPrefix);
        RestHighLevelClient client = new RestHighLevelClient(builder);
        // Query
        BoolQueryBuilder boolExpr= new BoolQueryBuilder();
        long endTime = System.currentTimeMillis();
        long startTime = endTime - 3600 * 1000;
        boolExpr.filter().add(new MatchQueryBuilder("request_method", "GET"));
        boolExpr.filter().add(new RangeQueryBuilder("@timestamp").gte(startTime).lte(endTime).format("epoch_millis"));
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolExpr);
        SearchRequest searchRequest = new SearchRequest(esIndex);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println(searchResponse.toString());
        client.close();
    }
}

使用PHP es sdk访问sls的es兼容接口

使用 composer 安装  https://getcomposer.org/download/

composer require elasticsearch/elasticsearch

样例

<?php
require 'vendor/autoload.php';
use Elasticsearch\ClientBuilder;
$slsProject = 'etl-dev';
$slsLogstore = 'accesslog';
$slsEndpoint = 'cn-huhehaote.log.aliyuncs.com';
$esHost = $slsProject . '.' . $slsEndpoint;
$esIndex = $slsProject . '.' . $slsLogstore;
$accessKeyId = getenv('ALIYUN_ACCESS_KEY_ID');
$accessKeySecret = getenv('ALIYUN_ACCESS_KEY_SECRET');
$hosts = [
    [
        'host' => $esHost,
        'port' => '443',
        'scheme' => 'https',
        'path' => '/es',
        'user' => $accessKeyId,
        'pass' => $accessKeySecret,
    ]
];
$client = ClientBuilder::create()
    ->setHosts($hosts)
    ->build();
$endTime = round(microtime(true) * 1000); // 毫秒
$startTime = $endTime - (3600 * 1000);
$params = [
    'index' => $esIndex,
    'body'  => [
        'query' => [
            'bool' => [
                'must' => [
                    'match' => [
                        'request_method' => 'GET'
                    ]
                ],
                'filter' => [
                    'range' => [
                        '@timestamp' => [
                            'gte' => $startTime,
                            'lte' => $endTime
                        ]
                    ]
                ]
            ]
        ]
    ]
];
$response = $client->search($params);
print_r($response);

小结

通过使用es标准的sdk,设置好正确的esHost、esIndex,可以很方便地访问sls的数据。期待您的使用~

参考

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
22天前
|
存储 运维 监控
开源日志Graylog
【10月更文挑战第21天】
81 8
|
22天前
|
存储 数据采集 监控
开源日志Fluentd
【10月更文挑战第21天】
34 7
|
22天前
|
存储 监控 安全
|
21天前
|
存储 数据采集 监控
开源日志分析Elasticsearch
【10月更文挑战第22天】
45 5
|
21天前
|
机器学习/深度学习 运维 监控
开源日志分析Kibana
【10月更文挑战第22天】
29 3
|
21天前
|
存储 JSON 监控
开源日志分析Logstash
【10月更文挑战第22天】
35 1
|
23天前
|
存储 运维 监控
开源日志分析工具
【10月更文挑战第20天】
49 3
|
19天前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
156 30
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
1月前
|
XML JSON Java
Logback 与 log4j2 性能对比:谁才是日志框架的性能王者?
【10月更文挑战第5天】在Java开发中,日志框架是不可或缺的工具,它们帮助我们记录系统运行时的信息、警告和错误,对于开发人员来说至关重要。在众多日志框架中,Logback和log4j2以其卓越的性能和丰富的功能脱颖而出,成为开发者们的首选。本文将深入探讨Logback与log4j2在性能方面的对比,通过详细的分析和实例,帮助大家理解两者之间的性能差异,以便在实际项目中做出更明智的选择。
244 3
|
1月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1645 14
下一篇
无影云桌面