日志上云利器 - Aliyun LOG Java Producer

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
对象存储 OSS,恶意文件检测 1000次 1年
简介: 日志无处不在,它作为记录世间万物变化的载体,在运维、研发、运营、安全、BI、审计等领域有着广泛的应用场景。阿里云日志服务是日志类数据的一站式服务平台,其核心组件 LogHub 凭借着高吞吐、低延迟、可自动伸缩等特性,逐渐成为大数据处理领域特别是实时数据处理场景下的基础设施。

背景

日志无处不在,它作为记录世间万物变化的载体,在运维、研发、运营、安全、BI、审计等领域有着广泛的应用场景。

undefined

阿里云日志服务是日志类数据的一站式服务平台,其核心组件 LogHub 凭借着高吞吐、低延迟、可自动伸缩等特性,逐渐成为大数据处理领域特别是实时数据处理场景下的基础设施。那些运行在 FlinkSparkStorm 等大数据计算引擎中的任务往往会将数据处理结果或中间结果实时写入 LogHub,下游系统基于 LogHub 中的数据提供查询分析、监控告警、机器学习、迭代计算等能力。下图展示了面向 LogHub 的大数据处理系统架构图。

producer_overview

要让整个系统稳定地运行,提供便捷高效的数据写入手段是前提。直接使用 API 或 SDK 往往无法满足大数据场景下对数据写入能力的要求,在这样的背景下 Aliyun LOG Java Producer 应运而生。

功能特点

Aliyun LOG Java Producer 是一个易于使用且高度可配置的 Java 类库,它有如下功能特点:

  1. 线程安全 - producer 接口暴露的所有方法都是线程安全的。
  2. 异步发送 - 调用 producer 的发送接口通常能够立即返回。Producer 内部会缓存并合并发送数据,然后批量发送以提高吞吐量。
  3. 自动重试 - 对可重试的异常,producer 会根据用户配置的最大重试次数和重试退避时间进行重试。
  4. 行为追溯 - 用户通过 callback 或 future 不仅能获取当前数据是否发送成功的信息,还可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。
  5. 上下文还原 - 同一个 producer 实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。
  6. 优雅关闭 - 保证 close 方法退时,producer 缓存的所有数据都能被处理,用户也能得到相应的通知。

功能优势

使用 producer 相对于直接通过 API 或 SDK 向 LogHub 写数据会有如下优势。

高性能

在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer 实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。

异步非阻塞

在可用内存充足的前提下,producer 会对发往 LogHub 的数据进行缓存,因此用户调用 send 方法时能够立即返回,不会阻塞,达到计算与 I/O 逻辑分离的目的。稍后,用户可以通过返回的 future 对象或传入的 callback 获得数据发送的结果。

资源可控制

可以通过参数控制 producer 用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样做一方面避免了 producer 无限制地消耗资源,另一方面可以让您根据实际情况平衡资源消耗和写入吞吐量。

小结

综上所述,producer 在给您带来诸多优势的同时只暴露了简单的接口,为您屏蔽了复杂的底层细节;另外,您也无须担心它会影响到上层业务的正常运行,大大降低了数据接入门槛。

快速入门

Producer 的实现比较复杂,但使用起来却非常简单。想了解 producer 的正确打开方式请参考文章 Aliyun LOG Java Producer 快速入门

原理剖析

为了让您更好地理解 producer 的表现行为,本章将带您探究它的实现原理,包括数据写入逻辑、核心组件的实现方式以及如何优雅地关闭 producer 中的各个组件。Producer 的整体架构如下图所示。

producer_architecture

数据写入

Producer 的数据写入逻辑如下:

  1. 用户调用producer.send()方法发送数据,数据会被加入到 LogAccumulator 中的某个 ProducerBatch 里。通常情况下 send 方法会立即返回,但如果该 producer 实例没有足够空间容纳当前数据,此方法会被阻塞直到下列任意一个条件被满足。

    1. 之前缓存的数据被 BatchHandler 处理完成后,占用的内存被“释放”,producer 有足够空间容纳当前数据。
    2. 到达用户指定的最长阻塞时间,此时会抛出异常。
  2. 在调用 send 方法过程中,如果发现目标 ProducerBatch 包含的日志条数到达了 maxBatchCount 或该 ProducerBatch 剩余的空间无法容纳当前数据,则会首先将该 ProducerBatch 投递到 IOThreadPool 里,然后再新建一个 ProducerBatch 存放当前数据。为了不阻塞用户线程,IOThreadPool 选用无界阻塞队列,因为单个 Producer 实例能缓存的日志总大小是有限的,该队列长度不会无限增长。
  3. Mover 会遍历 LogAccumulator 中的每个 ProducerBatch,把超过了缓存时间的 batch 加入 expiredBatches 里。同时会记录未过期 batch 的最近超时时间,记为 t。
  4. 将从 LogAccumulator 中获取的 expiredBatches 投递到 IOThreadPool 里。
  5. 获取 RetryQueue 中所有满足发送条件的 ProducerBatch,如果当前没有 batch 满足发送条件则最多等待时间 t。
  6. 将从 RetryQueue 中获取的 expiredBatches 投递到 IOThreadPool 里。(Mover 完成步骤 6 后会再次进入步骤 3)。
  7. IOThreadPool 中的工作线程从阻塞队列里或取 ProducerBatch,然后发送给目标 logStore。
  8. 如果数据发送成功,会将该 ProducerBatch 写入成功队列。
  9. 如果数据发送失败,且满足下列任意一个条件,会将该 ProducerBatch 写入失败队列。

    1. 该错误无法重试。
    2. RetryQueue 被关闭。
    3. 达到了指定的重试次数且失败队列中的 batch 数不超过待发送 batch 总数的二分之一。
  10. 否则,计算当前 ProducerBatch 的下次计划发送时间然后将其放入 RetryQueue 中。
  11. 线程 SuccessBatchHandler 从成功队列里获取 batch,执行和该 batch 绑定的所有回调。
  12. 线程 FailureBatchHandler 从失败队列里获取 batch,执行和该 batch 绑定的所有回调。

核心组件

Producer 包含 LogAccumulatorRetryQueueMoverIOThreadPoolSendProducerBatchTaskBatchHandler 等核心组件。

LogAccumulator

为了提高吞吐量,一个常见的做法是将若干个小包合并成大包批量发送,本小节介绍的 LogAccumulator 的主要作用便是合并待发送的数据。由于服务端要求具有相同 project、logstore、topic、source、shardHash 的数据才能组装成一个大包,LogAccumulator 会根据数据的这些属性将其缓存到内部 map 的不同位置。这个 map 的 key 为上述五元组,value 为 ProducerBatch。为了保证线程安全同时支持高并发,这里选用 ConcurrentMap 作为 map 的实现。

LogAccumulator 的另一个作用是控制缓存数据的总大小,这里选用 Semaphore 实现控制逻辑。Semaphore 是基于 AQS 实现的高性能同步工具,它会首先尝试通过自旋的方式获取共享资源,减少线程上下文切换的开销。

RetryQueue

RetryQueue 用于存放发送失败待重试的 ProducerBatch,每个 batch 有一个字段用于标识下次计划发送时间。为了高效地获取超时 batch,内部选用 DelayQueue 存放这些 batch。DelayQueue 是一种按时间排序的优先队列,最先超时的 batch 会被优先取出,同时它也是线程安全的。

Mover

Mover 是一个独立的线程,它会循环地将 LogAccumulator 和 RetryQueue 中的超时 batch 投递到 IOThreadPool 里。为了避免空转占用宝贵的 CPU 资源,当 Mover 发现 LogAccumulator 和 RetryQueue 里没有满足发送条件的 batch 时,会在 RetryQueue 的 expiredBatches 方法上等待用户配置的数据最长缓存时间 lingerMs。

IOThreadPool

IOThreadPool 中的工作线程用于真正执行数据发送任务,该线程池的大小可通过参数 ioThreadCount 指定,默认为可用处理器个数乘以 2。

SendProducerBatchTask

SendProducerBatchTask 封装了 batch 发送逻辑。为了避免阻塞 IO 线程,不论当前 batch 发送成功与否都会将其投递到队列中交由独立线程去执行回调。另外,如果某个发送失败的 batch 满足重试条件,不会在当前 IO 线程中立即重试(立即重试通常也会失败),而是根据指数退避策略将其投递到 RetryQueue 中。

BatchHandler

Producer 会启动一个 SuccessBatchHandler 和一个 FailureBatchHandler 分别用来处理发送成功或失败的 batch。Handler 在执行完 batch 的 callback、设置好 batch 的 future 后便会“释放”该 batch 占用的内存空间,供新的数据使用。分开处理的原因是为了隔离发送成功和发送失败的 batch,保持 producer 整体的流动性。

优雅关闭

要实现优雅关闭,需要做到以下几点:

  1. Close 方法在期望时间内返回时,producer 中的所有线程都应停止,缓存的数据都应得到处理,用户注册的 callback 都应被执行,返回给用户的 future 都应被设置。
  2. 支持用户设定 close 方法的最长等待时间,超过这个时间不论线程是否停止,缓存的数据是否完全处理,该方法都应立即返回。
  3. Close 方法支持被调用多次,在多线程环境下也能按预期工作。
  4. 在 callback 里调用 close 方法是安全的,不会造成程序死锁。

为了达到上述目标,producer 的关闭逻辑设计如下:

  1. 关闭 LogAccumulator,这时继续往 LogAccumulator 中写数据会抛异常。
  2. 关闭 RetryQueue,这时继续往 RetryQueue 中投递 batch 会抛异常。
  3. 关闭 Mover 并等待其完全退出。Mover 检测到关闭信号后会把 LogAccumulator 和 RetryQueue 中剩余的 batch 全部取出并投递到 IOThreadPool 中,不论它们是否满足发送条件。为了防止数据丢失,Mover 会不断从 LogAccumulator 和 RetryQueue 中获取 batch 直到没有其他线程正在写入。
  4. 关闭 IOThreadPool 并等待已提交的任务全部执行完毕。这时由于 RetryQueue 已经关闭,发送失败的 batch 会被直接投递到失败队列中。
  5. 关闭 SuccessBatchHandler 并等待其完全退出(如果检测到在 callback 里调用 close 方法会跳过等待过程)。SuccessBatchHandler 检测到关闭信号后会把成功队列中的 batch 全部取出并依次处理。
  6. 关闭 FailureBatchHandler 并等待其完全退出(如果检测到在 callback 里调用 close 方法会跳过等待过程)。FailureBatchHandler 检测到关闭信号后会把失败队列中的 batch 全部取出并依次处理。

可以看到,这里按照数据流动方向依次关闭队列和线程来达到优雅关闭、安全退出的目的。

总结

Aliyun LOG Java Producer 是对老版 producer 的全面升级,解决了上一版存在的多个问题,包括网络异常情况下 CPU 占用率过高、关闭 producer 可能出现少量数据丢失等问题。另外,在容错方面也进行了加强,就算用户使用不合理,在资源、吞吐、隔离上都有较好的保证。

技术支持

日志服务-SLS

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
3月前
|
Java Apache 开发工具
【Azure 事件中心】 org.slf4j.Logger 收集 Event Hub SDK(Java) 输出日志并以文件形式保存
【Azure 事件中心】 org.slf4j.Logger 收集 Event Hub SDK(Java) 输出日志并以文件形式保存
|
23天前
|
人工智能 Oracle Java
解决 Java 打印日志吞异常堆栈的问题
前几天有同学找我查一个空指针问题,Java 打印日志时,异常堆栈信息被吞了,导致定位不到出问题的地方。
30 2
|
3月前
|
Java 应用服务中间件 HSF
Java应用结构规范问题之配置Logback以仅记录错误级别的日志到一个滚动文件中的问题如何解决
Java应用结构规范问题之配置Logback以仅记录错误级别的日志到一个滚动文件中的问题如何解决
|
3月前
|
Java 应用服务中间件 HSF
Java应用结构规范问题之配置Logback以在控制台输出日志的问题如何解决
Java应用结构规范问题之配置Logback以在控制台输出日志的问题如何解决
|
3月前
|
Java 应用服务中间件 HSF
Java应用结构规范问题之AllLoggers接口获取异常日志的Logger实例的问题如何解决
Java应用结构规范问题之AllLoggers接口获取异常日志的Logger实例的问题如何解决
|
3月前
|
存储 消息中间件 监控
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统ELK、日志收集分析
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统、日志收集分析。日志级别从小到大的关系(优先级从低到高): ALL < TRACE < DEBUG < INFO < WARN < ERROR < FATAL < OFF 低级别的会输出高级别的信息,高级别的不会输出低级别的信息
|
3月前
|
消息中间件 Prometheus 监控
Producer的监控与日志记录最佳实践
【8月更文第29天】在分布式系统中,消息队列作为关键组件之一,其稳定性和性能至关重要。生产者(Producer)负责生成并发送消息到消息队列中,因此确保生产者的健康运行是非常重要的。本文将探讨如何为生产者设置监控和日志记录,以跟踪其健康状况和性能指标。
62 0
|
13天前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
123 30
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
1月前
|
XML JSON Java
Logback 与 log4j2 性能对比:谁才是日志框架的性能王者?
【10月更文挑战第5天】在Java开发中,日志框架是不可或缺的工具,它们帮助我们记录系统运行时的信息、警告和错误,对于开发人员来说至关重要。在众多日志框架中,Logback和log4j2以其卓越的性能和丰富的功能脱颖而出,成为开发者们的首选。本文将深入探讨Logback与log4j2在性能方面的对比,通过详细的分析和实例,帮助大家理解两者之间的性能差异,以便在实际项目中做出更明智的选择。
226 3
|
1月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1632 14

相关产品

  • 日志服务