Serverless Kubernetes 开发实践:异构资源,按需使用

简介: Kubernetes 作为当今云原生业界标准,具备良好的生态以及跨云厂商能力。Kubernetes 很好的抽象了 IaaS 资源交付标准,使得云资源交付变的越来越简单,与此同时用户期望更多的聚焦于业务自身,做到面向应用交付,Serverless 理念也因此而生。 那么如何通过原生 k8s 提供Serverless 能力?如何实现GPU等异构资源按需使用?这里给大家介绍一下我们在Serverless Kubernetes 开发实践:异构资源,按需使用。

Kubernetes 作为当今云原生业界标准,具备良好的生态以及跨云厂商能力。Kubernetes 很好的抽象了 IaaS 资源交付标准,使得云资源交付变的越来越简单,与此同时用户期望更多的聚焦于业务自身,做到面向应用交付,Serverless 理念也因此而生。 那么如何通过原生 k8s 提供Serverless 能力?如何实现GPU等异构资源按需使用?这里给大家介绍一下我们在Serverless Kubernetes 开发实践:异构资源,按需使用。

1. 方案介绍

使用 Knative + ASK 作为 Serverless 架构。数据采集之后,发送到消息队列(Kafka), Kafka事件源接收到事件之后,通过服务网关访问数据处理服务,数据处理服务根据请求量按需自动扩缩容

2. 目标读者

  • Kubernetes 容器领域开发者。

  • Serverless 领域开发者。

3. 适应场景

  • AI 音视频编/解码场景。

  • 大数据及AI 智能识别。

4. 相关概念

  • Serverless Kubernetes(ASK):ACK Serverless集群是阿里云推出的无服务器Kubernetes容器服务。

  • Knative:Knative 是一款基于Kubernetes的Serverless框架。其目标是制定云原生、跨平台的Serverless编排标准。Knative通过整合容器构建(或者函数)、工作负载管理(动态扩缩)以及事件模型这三者来实现的这一Serverless标准。

5. 方案架构

5.1 方案架构图

image

用户将数据采集之后,将数据发送到消息队列Kafka中,Knative 中的Kafka事件源接收到消息,并发送到 AI 推理服务,推理服务对事件进行意图识别处理。

5.2 方案优势

对于希望通过 Serverless 技术实现按需使用资源,节省资源使用成本,简化运维部署 ,另外还有 GPU 等异构资源业务诉求的用户。使用 ASK + Knative 方案 ,可以满足 GPU 等异构资源使用诉求,同时简化应用运维部署(尽可能少的操作 k8s deployment/svc/ingress/hpa等资源),IaaS资源免运维。

6. 方案实施

6.1 前提条件

6.2 操作步骤

第一步:部署推理服务

该服务用于接收事件驱动发送的事件,并根据请求数进行自动扩缩容,进行数据处理。部署之前,我们先确认当前无工作负载,以便观察部署之后的结果。

接着通过 Knative 把推理处理部署到 GPU 类型工作负载。这里我们通过YAML的方式进行部署,YAML内容如下:

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: test-ai-process
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/maxScale: "100"
        autoscaling.knative.dev/minScale: "1"
        k8s.aliyun.com/eci-use-specs: ecs.gn6i-c4g1.xlarge  #指定支持的ECI GPU规格。
    spec:
      containerConcurrency: 2
      containers:
        - image: registry.cn-hangzhou.aliyuncs.com/ai-samples/bert-intent-detection:1.0.1
          name: user-container
          ports:
            - containerPort: 80
              name: http1
          resources:
            limits:
              nvidia.com/gpu: '1'    #容器所需的GPU个数,必须指定该值,否则Pod启动后将会报错。

主要参数说明:

  • minScale和maxScale:表示服务配置的最小和最大Pod数量。

  • containerConcurrency:表示配置的Pod最大请求并发数。

  • k8s.aliyun.com/eci-use-specs:表示配置的 eci 规格。

那么接下来我们部署该服务。

  1. 在集群管理页左侧导航栏中,选择应用 > Knative

  2. 服务管理页签右上角,单击【使用模板创建】。选择default 命名空间,将上面的 YAML 内容粘贴到模板,点击创建。

image

推理服务部署完成,我们通过如下方式进行服务访问。

$ curl -H "host: test-ai-process.default.8829e4ebd8fe9967.app.alicontainer.com" "http://120.26.143.xx/predict?query=music" -v

结果如下:

*   Trying 120.26.143.xx...
* TCP_NODELAY set
* Connected to 120.26.143.xx (120.26.143.xx) port 80 (#0)
> GET /predict?query=music HTTP/1.1
> Host: test-ai-process.knative-serving.8829e4ebd8fe9967.app.alicontainer.com
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 17 Feb 2022 09:10:06 GMT
< Content-Type: text/html; charset=utf-8
< Content-Length: 9
< Connection: keep-alive
<
* Connection #0 to host 120.26.143.xx left intact
PlayMusic* #意图识别结果

第二步:部署事件驱动

事件驱动用于接收事件并进行事件流过滤、流转。这里我们使用 Kafka 事件源作为事件驱动,用于从 Kafka 接收事件,然后把事件到消息处理。我们通过YAML的方式进行部署, YAML内容如下:

apiVersion: sources.knative.dev/v1alpha1
kind: KafkaSource
metadata:
  annotations:
    k8s.aliyun.com/domain: test-ai-process.default.svc.cluster.local
    k8s.aliyun.com/req-timeout: "60"
    k8s.aliyun.com/retry-count: "5"
    k8s.aliyun.com/retry-interval: "2"
  name: test-ai-source
  namespace: default
spec:
  bootstrapServers: alikafka-pre-cn-xxx-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-7pp2kmoc7002-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-xx-3-vpc.alikafka.aliyuncs.com:9092
  consumerGroup: ai-info-consumer
  sink:
    uri: http://120.26.143.xx/predict?query=music
  topics: ai-info

主要参数说明:

  • kafka配置包括:kafka服务地址 ,弹幕消息 topics 以及消费组consumerGroup。

  • 转发路由:

    • 目标服务:k8s.aliyun.com/domain: test-ai-process.default.svc.cluster.local

    • 目标路由:http://120.26.143.xx/predict?query=music

那么接下来我们部署该服务。

  1. 在集群管理页左侧导航栏中,选择应用 > Knative

  2. 服务管理页签右上角,单击【使用模板创建】。选择default 命名空间,将上面的 YAML 内容粘贴到模板,点击创建。

image

7. 方案验证

接下来我们模拟并发发送消息,进行自动弹性验证。golang 客户端向Kafka发送消息代码如下:

package main
import (
 "crypto/tls"
 "crypto/x509"
 "fmt"
 "io/ioutil"
 "strconv"
 "time"
 "github.com/Shopify/sarama"
)
var producer sarama.SyncProducer
func init() {
  // 初始化 kafka 客户端
  // 设置用户名、密码、访问证书
 kafkaConfig := sarama.NewConfig()
 kafkaConfig.Version = sarama.V0_10_2_0
 kafkaConfig.Producer.Return.Successes = true
 kafkaConfig.Net.SASL.Enable = true
 kafkaConfig.Net.SASL.User = "alikafka_xxx"
 kafkaConfig.Net.SASL.Password = "RpfoLE41hdX60ybxxxx"
 kafkaConfig.Net.SASL.Handshake = true
 certBytes, err := ioutil.ReadFile("/Users/richard/common/demo/ca-cert")
 clientCertPool := x509.NewCertPool()
 ok := clientCertPool.AppendCertsFromPEM(certBytes)
 if !ok {
  panic("kafka producer failed to parse root certificate")
 }

 kafkaConfig.Net.TLS.Config = &tls.Config{
  //Certificates:       []tls.Certificate{},
  RootCAs:            clientCertPool,
  InsecureSkipVerify: true,
 }
 kafkaConfig.Net.TLS.Enable = true

 if err = kafkaConfig.Validate(); err != nil {
  msg := fmt.Sprintf("Kafka producer config invalidate. err: %v", err)
  fmt.Println(msg)
  panic(msg)
 }
  // 设置 Kafka 实例访问地址
 producer, err = sarama.NewSyncProducer([]string{"alikafka-pre-cn-xxx-1.alikafka.aliyuncs.com:9093","alikafka-pre-cn-xxx-2.alikafka.aliyuncs.com:9093","alikafka-pre-cn-xxx-3.alikafka.aliyuncs.com:9093"}, kafkaConfig)
 if err != nil {
  msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)
  fmt.Println(msg)
  panic(msg)
 }
}
func main() {
  // 模拟并发50,持续100s, 发送消息
 for i := 1; i <= 100; i++ {
  for j := 0; j < 50; j++ {
   key := strconv.FormatInt(time.Now().UTC().UnixNano()+int64(j), 10)
   value := `{"id":12182,"template_name":"template_media/videos/templates/8f3a5f293149095aa62958029e208501_1606889936.zip","width":360,"height":640}`
   go produce("demo", key, value)
  }
    time.Sleep(time.Second)
 }
}
// Kafka 发送消息函数
func produce(topic string, key string, content string) error {
 msg := &sarama.ProducerMessage{
  Topic:     topic,
  Key:       sarama.StringEncoder(key),
  Value:     sarama.StringEncoder(content),
  Timestamp: time.Now(),
 }
 _, _, err := producer.SendMessage(msg)
 if err != nil {
  msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)
  fmt.Println(msg)
  return err
 }
 fmt.Printf("Send OK topic:%s key:%s value:%s\n", topic, key, content)
 return nil
}

我们可以看到自动弹性扩容如下:

image

8. 小结

Serverless Kubernetes 基于 Kubernetes 之上,提供按需使用、节点免运维、异构资源以及事件驱动等 Serverless 能力,让开发者真正实现通过 Kubernetes 标准化 API 进行 Serverless 应用编程。

9. 常见问题

  • 部署推理服务(test-ai-process)没有Pod创建失败。

    可以查看Pod的event事件异常信息:

    • 若指定ECI GPU规格没有库存,这时候可以调整其它GPU 规格。

    • 若拉镜像失败,需要开通NAT访问公网。

  • 模拟发送消息,Pod实例没有触发自动扩所容。

    Kafka创建的实例需要与ACK Serverless集群属于同一个VPC,并且kafka实例白名单需要设置:0.0.0.0/0

作者介绍
目录