k8s与日志--采用golang实 现Fluent Bit的output插件

简介: s" "time" "unsafe" "github.com/Shopify/sarama" "github.com/fluent/fluent-bit-go/output" "github.com/ugorji/go/codec" ) var ( brokers []string producer sarama.
s" "time" "unsafe" "github.com/Shopify/sarama" "github.com/fluent/fluent-bit-go/output" "github.com/ugorji/go/codec"
)

var (
 brokers []string
 producer sarama.SyncProducer
 timeout = 0 * time.Minute
 topic string
 module string
 messageKey string
)

//export FLBPluginRegister func FLBPluginRegister(ctx unsafe.Pointer) int {
 return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")
}

//export FLBPluginInit // ctx (context) pointer to fluentbit context (state/ c code) func FLBPluginInit(ctx unsafe.Pointer) int {

 if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {
 brokers = strings.Split(bs, ",")
 } else {
 log.Printf("you must set brokers")
 return output.FLB_ERROR
 }

 if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {
 topic = tp
 } else {
 log.Printf("you must set topics")
 return output.FLB_ERROR
 }

 if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {
 module = mo
 } else {
 log.Printf("you must set module")
 return output.FLB_ERROR
 }

 if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {
 messageKey = key
 } else {
 log.Printf("you must set message_key")
 return output.FLB_ERROR
 }

 config := sarama.NewConfig()
 config.Producer.Return.Successes = true if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {
 if acks, err := strconv.Atoi(required_acks); err == nil {
 config.Producer.RequiredAcks = sarama.RequiredAcks(acks)
 }
 }

 if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {
 if codec, err := strconv.Atoi(compression_codec); err == nil {
 config.Producer.Compression = sarama.CompressionCodec(codec)
 }
 }

 if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {
 if max_retry, err := strconv.Atoi(max_retry); err == nil {
 config.Producer.Retry.Max = max_retry
 }
 }

 if timeout == 0 {
 timeout = 5 * time.Minute
 }
 // If Kafka is not running on init, wait to connect
 deadline := time.Now().Add(timeout)
 for tries := 0; time.Now().Before(deadline); tries++ {
 var err error
 if producer == nil {
 producer, err = sarama.NewSyncProducer(brokers, config)
 }
 if err == nil {
 return output.FLB_OK
 }
 log.Printf("Cannot connect to Kafka: (%s) retrying...", err)
 time.Sleep(time.Second * 30)
 }

 log.Printf("Kafka failed to respond after %s", timeout)
 return output.FLB_ERROR

}

//export FLBPluginFlush // FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent. func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
 var h codec.MsgpackHandle

 var b []byte var m interface{}
 var err error

 b = C.GoBytes(data, length)
 dec := codec.NewDecoderBytes(b, &h)

 // Iterate the original MessagePack array var msgs []*sarama.ProducerMessage
 for {
 // decode the msgpack data
 err = dec.Decode(&m)
 if err != nil {
 if err == io.EOF {
 break
 }
 log.Printf("Failed to decode msgpack data: %v\n", err)
 return output.FLB_ERROR
 }

 // Get a slice and their two entries: timestamp and map
 slice := reflect.ValueOf(m)
 data := slice.Index(1)

 // Convert slice data to a real map and iterate
 mapData := data.Interface().(map[interface{}]interface{})
 flattenData, err := Flatten(mapData, "", UnderscoreStyle)
 if err != nil {
 break
 }

 message := ""
 host := "" for k, v := range flattenData {
 value := "" switch t := v.(type) {
 case string:
 value = t
 case []byte:
 value = string(t)
 default:
 value = fmt.Sprintf("%v", v)
 }

 if k == "pod_name" {
 host = value
 }

 if k == messageKey {
 message = value
 }

 }
 if message == "" || host == "" {
 break
 }

 m := &sarama.ProducerMessage{
 Topic: topic,
 Key: sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),
 Value: sarama.ByteEncoder(message),
 }
 msgs = append(msgs, m)

 }

 err = producer.SendMessages(msgs)
 if err != nil {
 log.Printf("FAILED to send kafka message: %s\n", err)
 return output.FLB_ERROR
 }
 return output.FLB_OK
}

//export FLBPluginExit func FLBPluginExit() int {
 producer.Close()
 return output.FLB_OK
}

func main() {
}
  • FLBPluginExit 插件退出的时候需要执行的一些方法,比如关闭连接。
  • FLBPluginRegister 注册插件
  • FLBPluginInit 插件初始化
  • FLBPluginFlush flush到数据到output
  • FLBPluginConfigKey 获取配置文件中参数

PS
当然除了FLBPluginConfigKey之外,也可以通过获取环境变量来获得设置参数。
ctx相当于一个上下文,负责之间的数据的传递。

编译和执行

编译的时候

go build -buildmode=c-shared -o out_kafka.so .

生成out_kafka.so

执行的时候

/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so

总结

采用类似的编写结构,就可以定制化自己的输出插件了。

本文转自SegmentFault-k8s与日志--采用golang实现Fluent Bit的output插件

相关文章
|
运维 监控 Cloud Native
一行代码都不改,Golang 应用链路指标日志全知道
本文将通过阿里云开源的 Golang Agent,帮助用户实现“一行代码都不改”就能获取到应用产生的各种观测数据,同时提升运维团队和研发团队的幸福感。
678 134
|
Kubernetes Ubuntu Windows
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)
368 3
|
Prometheus Cloud Native Go
Golang语言之Prometheus的日志模块使用案例
这篇文章是关于如何在Golang语言项目中使用Prometheus的日志模块的案例,包括源代码编写、编译和测试步骤。
291 4
Golang语言之Prometheus的日志模块使用案例
|
Kubernetes 容器 Perl
Kubernetes网络插件体系及flannel基础
文章主要介绍了Kubernetes网络插件体系,特别是flannel网络模型的工作原理、配置和测试方法。
388 3
Kubernetes网络插件体系及flannel基础
|
Kubernetes API Docker
跟着iLogtail学习容器运行时与K8s下日志采集方案
iLogtail 作为开源可观测数据采集器,对 Kubernetes 环境下日志采集有着非常好的支持,本文跟随 iLogtail 的脚步,了解容器运行时与 K8s 下日志数据采集原理。
1015 8
|
消息中间件 Kubernetes API
在K8S中,如何收集k8s集群日志?
在K8S中,如何收集k8s集群日志?
|
运维 Kubernetes 监控
Loki+Promtail+Grafana监控K8s日志
综上,Loki+Promtail+Grafana 监控组合对于在 K8s 环境中优化日志管理至关重要,它不仅提供了强大且易于扩展的日志收集与汇总工具,还有可视化这些日志的能力。通过有效地使用这套工具,可以显著地提高对应用的运维监控能力和故障诊断效率。
1899 0
|
消息中间件 Kubernetes Kafka
微服务从代码到k8s部署应有尽有系列(十一、日志收集)
微服务从代码到k8s部署应有尽有系列(十一、日志收集)
|
Kubernetes Shell 网络安全
【Azure K8S】记录AKS VMSS实例日志收集方式
【Azure K8S】记录AKS VMSS实例日志收集方式
203 0
|
3月前
|
人工智能 算法 调度
阿里云ACK托管集群Pro版共享GPU调度操作指南
本文介绍在阿里云ACK托管集群Pro版中,如何通过共享GPU调度实现显存与算力的精细化分配,涵盖前提条件、使用限制、节点池配置及任务部署全流程,提升GPU资源利用率,适用于AI训练与推理场景。
359 2

推荐镜像

更多