日志服务(原SLS)新功能发布(3)--多实例协同消费库(loghub client library)

简介: loghub client library是对LogHub消费者提供的高级模式,解决多个消费者同时消费logstore时自动分配shard问题。 例如在storm、spark场景中多个消费者情况下,自动处理shard的负载均衡,消费者failover等逻辑。用户只需专注在自己业务逻辑上,而无需关心

使用场景

loghub client library是对LogHub消费者提供的高级模式,解决多个消费者同时消费logstore时自动分配shard问题。
例如在stormspark streaming场景中多个消费者情况下,自动处理shard的负载均衡,消费者failover等逻辑。用户只需专注在自己业务逻辑上,而无需关心shard分配、CheckPoint、Failover等事宜。

举一个例子而言,用户需要通过storm进行流计算,启动了A、B、C 3个消费实例。在有10个shard情况下,系统会自动为A、B、C分配3、3、4个Shard进行消费。

  • 当消费实例A宕机情况下,系统会把A未消费的3个Shard中数据自动均衡B、C上,当A恢复后,会重新均衡。
  • 当添加实例D、E情况下,系统会自动进行均衡,每个实例消费2个Shard。
  • 当Shard有Merge/Split等情况下,会根据最新的Shard信息,重新均衡。
  • 当read only状态的shard消费完之后,剩余的shard会重新做负载均衡。

以上整个过程不会产生数据丢失、以及重复,用户只需在代码中做三件事情:

  1. 创建Consumer Group。
  2. 将实例名注册为Instance,并连接到Consumer Group中。
  3. 写处理日志的代码。

我们强烈建议使用loghub client library进行数据消费,这样您只需要关心怎么处理数据,而不需要关注复杂的负载均衡、消费断点保存、按序消费、消费异常处理等问题

术语简介

loghub client library中主要有4个概念,分别是consumer group、consumer、heartbeat和checkpoint,它们之间的关系如下:

screenshot

  • consumer group

    是logstore的子资源,拥有相同consumer group 名字的消费者共同消费同一个logstore的所有数据,这些消费者之间不会重复消费数据,一个logstore下面可以最多创建5个consumer group,不可以重名,同一个logstore下面的consumer group之间消费数据不会互相影响。consumer group有两个很重要的属性:
{
    "order":boolean,
    "timeout": integer
}

order属性表示是否按照写入时间顺序消费key相同的数据,timeout表示consumer group中消费者的超时时间,单位是秒,当一个消费者汇报心跳的时间间隔超过timeout,会被认为已经超时,服务端认为这个consumer此时已经下线了。

  • consumer

    消费者,每个consumer上会被分配若干个shard,consumer的职责就是要消费这些shard上的数据,同一个consumer group中的consumer必须不重名。
    
  • heartbeat

    消费者心跳,consumer需要定期向服务端汇报一个心跳包,用于表明自己还处于存活状态。
  • checkpoint

    消费者定期将分配给自己的shard消费到的位置保存到服务端,这样当这个shard被分配给其它消费者时,从服务端可以获取shard的消费断点,接着从断点继续消费数据。
    

实现原理

通信协议

定义如下消费者和服务端的通信接口:

/*
    创建consumr group,inOrder表示是否希望key相同的数据能够按照写入的时间顺序被消费,timeoutInSec表示consumer的心跳超时时间,超过timeoutInSec没有汇报心跳的consumer会被认为已经下线了,建议取值20s左右。
*/
boolean CreateConsumerGroup(
            String project, 
            String logstore, 
            String consumerGroupName, 
            boolean inOrder,
            int timeoutInSec);
/*
    删除consumer group
*/
boolean DeleteConsumerGroup(
            String project, 
            String logStore, 
            String consumerGroup);
/*
    列出logstore下所有的consumer group,包括每个consumer group的order、timeout属性
*/
List<ConsumerGroup> ListConsumerGroup(
            String project, 
            String logStore);
/*
    更新consumer group的属性,consumer group的名字不可以更新,如果将inOrder由true更新为false,那么当前所有未开始消费的shard将会被分配至各个consumer,如果inOrder由false更新为true,对于当前正在消费的shard不会生效,也就是说即使shard之间有父子关系,由于它们都已经被消费了,所以顺序对他们而言没有意义,但是inOrder属性更新之后,shard分裂和合并产生的新的shard将会被顺序消费。
*/
boolean UpdateConsumerGroup(
            String project,
            String logStore,
            String consumerGroup,
            boolean inOrder,
            int timeoutInSec
            );
/*
    更新shard消费到的位置,只有当参数中的shard当前由consumer持有的情况下才能更新成功
*/
boolean UpdateCheckPoint(
            String project, 
            String logStore, 
            String consumerGroup, 
            String consumer,
            int shard,
            String checkpoint);
/*
    更新shard消费到的位置,无论如何都可以更新成功
*/
boolean UpdateCheckPoint(
            String project, 
            String logStore, 
            String consumerGroup, 
            int shard,
            String checkpoint);
/*
    获取consumer group中shard的消费断点
*/
String GetCheckPoint(
            String project,
            String logStore,
            String consumerGroup,
            int shard);
/*
    获取consumer group中所有shard的消费断点,返回结果是一个断点的List,具体请参考sdk
*/
List<ShardCheckPoint> GetCheckPoint(
            String project,
            String logStore,
            String consumerGroup);
/*
    将当前consumer持有的shard汇报给服务端,服务端返回确认包,里面包含若干shard,假设持有的shard集合为A,返回的确认shard集合为B,A和B的差集(A-B)表示consumer应该放弃消费的shard,consumer应该尽快将(A-B)中的shard的消费断点保存到服务端,并放弃消费(A-B)中的shard,(B-A)表示consumer可以增持的shard,consumer从服务端获取(B-A)中shard的checkpoint。理想状态下下一次心跳汇报给服务端的shard集合就是B,但是如果consumer不想(没来得及)放弃A-B中的某些shard,需要将这些shard一并汇报给服务端。Heartbeat除了上面有消费负载均衡的功能以外,还用于告知服务端自己处于存活状态,不要将自己从consumer group中删除。一旦heartbeat超时,服务端就会将consumer从consumer group中删除,并将分配给其的shard重新分配给别的consumer。
*/
List<Integer> HeartBeat(
            String project,
            String logStore,
            String consumerGroup,
            String consumer,
            ArrayList<Integer> shards
            )

有限状态自动机

服务端对consumer group中每个shard都会维护一个有限状态自动机,共有五种状态,分别是already_alloc、not_alloc、wait、transfer、over,每种状态的含义如下:

already_alloc:该shard已经被某个consumer持有并消费。

not_alloc:该shard可以消费,但是尚未被任何consumer持有。

wait:该shard当前不可以消费,需要等待其祖先shard消费完。

transfer:将该shard转交给另一个消费者消费的过渡状态。只有当当前消费者在Heartbeat中放弃消费该shard,才能将该shard转交出去,因此需要这个过渡状态。

over:该shard的数据已经消费完。

wait状态需要重点说明下,假设某个时刻数据仓库所有shard的关系如下图:

screenshot

初始时有3个shard 0、1、2,每个shard下面的区间表示关联的hash值的集合,这里为了简单用数值型表示hash key,此时hash值为7的数据会被写入shard 0,随后0 split成3和4,shard 0变成read_only状态,shard 3和4变成read_write状态,这个时候hash值是7的数据不会再被写入shard 0,而是写入shard 4,再接着shard 4和5 merge成6之后,hash值为7的数据只会写入shard 6。

如果要顺序消费hash值为7的数据,必须保证在shard 0被消费完之前shard 4不应该被任何消费者消费,同理,shard 6不应该在shard 4中数据消费完之前被消费,我们把shard 0、4认为是shard 6的祖先,shard 6称为shard 0和4的后代,某个shard可以被消费当且仅当其祖先shard中的数据被消费完。基于这个原因,引入wait状态表示该shard当前不可以被消费。

状态转移图如下:

screenshot

图中每个状态用首字母缩写表示,每条边对应含义如下:

1 表示shard的起始状态只能是wait或者not_alloc。

2 表示该shard的祖先shard的数据已经被消费完,可以开始消费当前shard的数据了。

3 表示该shard被分配给了某一个消费者。

4 表示消费该shard的consumer心跳超时了,回收其持有的shard。

5 表示持有该shard的消费者持有的shard总数太多,不满足任意两个消费者持有shard数量之差的绝对值小于等于1,所以将该shard转移给别的需要的消费者消费,这时会将等待消费的消费者(next consumer)和这个shard关联起来。

6 表示收到持有该shard的consumer放弃消费的Heartbeat,并将该shard转移给关联的next consumer持有。发生该转移还有一种可能是该shard的next consumer超时,继续由持有该shard的消费者持有该shard。

7 表示当强制更新over状态的shard的消费断点到某个非数据结束位置时,该shard恢复可消费状态。执行这种更新操作要特别当心,因为该shard的后代shard可能已经被消费了,很可能导致数据无法按照hash key的顺序消费。

8 表示read_only状态的shard数据被消费完了。

9 表示持有该shard的消费者Heartbeat超时,回收该shard到not_alloc状态。

这里要注意以下几点:

  • shard 处于transfer状态时,服务端收到持有该shard的消费者的Heartbeat时,返回的确认shard集合中不会包含该shard,确认shard集合中只会包含持有者是该consumer并且shard状态是already_alloc的shard。
  • 消费者调用UpdateCheckpoint更新消费断点,如果是read_only状态的shard要检查该checkpoint是否是shard的结尾,如果是就需要将shard状态转移成over。
  • 5 标识的转移是保证任意两个消费者持有shard数量之差的绝对值小于等于1的基础,当发现有consumer持有的shard数量不满足该条件时,从shard持有数量多的消费者那里剥夺一些shard,分配给持有数量少的consumer,这个过程称为消费负载均衡。这些将要易手的shard需要设置成transfer状态,以等待持有者heartbeat中确认放弃,这主要是为了让持有者收到放弃消息时将消费断点保存到服务端,从而易手之后,新的consumer可以从服务端获取该shard的消费断点。
  • 新的shard加入时,只能由转移1进入,其状态为wait当该shard有祖先shard,并且其祖先没有消费完,否则状态为not_alloc。
  • 消费的负载均衡只会考虑not_alloc、already_alloc、transfer状态的shard,wait和over状态的shard由于不满足消费条件,所以不会被分配给任何consumer。

如何使用loghub client library

  • 实现loghub client library中的两个接口类:

    • ILogHubProcessor // 每个shard对应一个实例,每个实例只消费特定shard的数据。
    • ILogHubProcessorFactory // 负责生产实现ILogHubProcessor接口实例。
  • 填写参数配置。
  • 启动一个或多个client worker实例。

使用sample

main函数

    public static void main(String args[]) 
    {
        LogHubConfig config = new LogHubConfig(...);
                
        ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
        
        Thread thread = new Thread(worker);
        //thread运行之后,client worker会自动运行,ClientWorker扩展了Runnable接口。
        thread.start();
                Thread.sleep(60 * 60 * 1000);
        //调用worker的shutdown函数,退出消费实例,关联的线程也会自动停止。
        worker.shutdown();
        //ClientWorker运行过程中会生成多个异步的Task,shutdown之后最好等待还在执行的Task安全退出,建议30s。
        Thread.sleep(30 * 1000);
    }

ILogHubProcessor、ILogHubProcessorFactory 实现sample

  • 各个shard对应的消费实例类,实际开发过程中用户主要需要关注数据消费逻辑,同一个ClientWorker实例是串行消费数据的,只会产生一个ILogHubProcessor实例,ClientWorker退出的时候会调用ILogHubProcessor的shutdown函数。
public class SampleLogHubProcessor implements ILogHubProcessor 
{
    private int mShardId;
    // 记录上次持久化check point的时间
    private long mLastCheckTime = 0; 
    
    public void initialize(int shardId) 
    {
        mShardId = shardId;
    }

    // 消费数据的主逻辑
    public String process(List<LogGroupData> logGroups,
            ILogHubCheckPointTracker checkPointTracker) 
    {
        for (LogGroupData group : logGroups) 
        {
            List<LogItem> items = group.GetAllLogs();
            for (LogItem item : items) 
            {
                // 打印loggroup中的数据
                System.out.println("shard_id:" + mShardId + " " + item.ToJsonString());
            }
        }
        long curTime = System.currentTimeMillis();
        // 每隔60秒,写一次check point到服务端,如果60秒内,worker crash,
        // 新启动的worker会从上一个checkpoint其消费数据,有可能有重复数据
        if (curTime - mLastCheckTime >  60 * 1000) 
        {
            try 
            {
                checkPointTracker.saveCheckPoint(true);
            } 
            catch (LogHubCheckPointException e) 
            {
                e.printStackTrace();
            }
            mLastCheckTime = curTime;
        } 
        else 
        {
            try 
            {
                checkPointTracker.saveCheckPoint(false);
            } 
            catch (LogHubCheckPointException e) 
            {
                e.printStackTrace();
            }
        }
        // 返回空表示正常处理数据, 如果需要回滚到上个check point的点进行重试的话,可以return checkPointTracker.getCheckpoint()
        return null;  
    }
    // 当worker退出的时候,会调用该函数,用户可以在此处做些清理工作。
    public void shutdown(ILogHubCheckPointTracker checkPointTracker) 
    {
        //将消费断点保存到服务端。
        try {
            checkPointTracker.saveCheckPoint(true);
        } catch (LogHubCheckPointException e) {
            e.printStackTrace();
        }
    }
}
  • 生成 ILogHubProcessor的工厂类 :
public class SampleLogHubProcessorFactory implements ILogHubProcessorFactory 
{
    public ILogHubProcessor generatorProcessor()
    {   
        // 生成一个消费实例
        return new SampleLogHubProcessor();
    }
}

配置说明:

public class LogHubConfig 
{
    //worker默认的拉取数据的时间间隔
    public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
    //consumer group的名字
    private String mConsumerGroupName;
    //consumer的名字,必须确保同一个consumer group下面的各个consumer不重名
    private String mWorkerInstanceName;
    //loghub数据接口地址
    private String mLogHubEndPoint;
    //项目名称
    private String mProject;
    //日志库名称
    private String mLogStore;
    //云账号的access key id
    private String mAccessId;
    //云账号的access key
    private String mAccessKey;
    //用于指出在服务端没有记录shard的checkpoint的情况下应该从什么位置消费shard,取值可以是[BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一个
    private LogHubCursorPosition mCursorPosition;
    //当mCursorPosition取值为SPECIAL_TIMER_CURSOR时,指定消费时间,单位是秒。
    private int  mLoghubCursorStartTime = 0;
    // 轮询获取loghub数据的时间间隔,间隔越小,抓取越快,单位是毫秒,默认是DEFAULT_DATA_FETCH_INTERVAL_MS,建议时间间隔200ms以上。
    private long mDataFetchIntervalMillis;
    // worker向服务端汇报心跳的时间间隔,单位是毫秒,建议取值10000ms。
    private long mHeartBeatIntervalMillis;
    //是否按序消费
    private boolean mConsumeInOrder; 
}

Maven配置

<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>loghub-client-lib</artifactId>
  <version>0.6.4</version>
</dependency>

Storm, Spark Streaming Library

  • storm
  • spark streaming

常见问题&注意事项

  • LogHubConfig 中 consumerGroupName表一个消费组,consumerGroupName相同的consumer分摊消费logstore中的shard,同一个consumerGroupName中的consumer,通过workerInstance name进行区分。
假设logstore中有shard 0 ~ shard 34shard。
3个worker,其consumerGroupName和workerinstance name分别是 : 
<consumer_group_name_1 , worker_A>,
<consumer_group_name_1 , worker_B>,
<consumer_group_name_2 , worker_C>
则,这些worker和shard的分配关系是:
<consumer_group_name_1 , worker_A>: shard_0, shard_1
<consumer_group_name_1 , worker_B>: shard_2, shard_3
<consumer_group_name_2 , worker_C>: shard_0, shard_1, shard_2, shard_3  # group name不同的worker相互不影响
  • 确保实现的ILogHubProcessor process()接口每次都能顺利执行,并退出,这点很重要。
  • ILogHubCheckPointTracker的saveCheckPoint()接口,无论传递的参数是true,还是false,都表示当前处理的数据已经完成,参数为true,则立刻持久化至服务端,false则每隔60秒同步一次到服务端。
  • 如果LogHubConfig中配置的是子用户的accessKeyId、accessKey,需要在RAM中进行以下授权,详细内容请参考API文档

    |Action|Resource|
log:GetCursorOrData acs:log:{regionName}:{projectOwnerAliUid}:project/{projectName}/logstore/{logstoreName}
log:CreateConsumerGroup acs:log:{regionName}:{projectOwnerAliUid}:project/{projectName}/logstore/{logstoreName}/consumergroup/*
log:ListConsumerGroup acs:log:{regionName}:{projectOwnerAliUid}:project/{projectName}/logstore/{logstoreName}/consumergroup/*
log:ConsumerGroupUpdateCheckPoint acs:log:{regionName}:{projectOwnerAliUid}:project/{projectName}/logstore/{logstoreName}/consumergroup/{consumerGroupName}
log:ConsumerGroupHeartBeat acs:log:{regionName}:{projectOwnerAliUid}:project/{projectName}/logstore/{logstoreName}/consumergroup/{consumerGroupName}
log:GetConsumerGroupCheckPoint acs:log:{regionName}:{projectOwnerAliUid}:project/{projectName}/logstore/{logstoreName}/consume
相关实践学习
通过日志服务实现云资源OSS的安全审计
本实验介绍如何通过日志服务实现云资源OSS的安全审计。
相关文章
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
4162 31
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
8月前
|
监控 容灾 算法
阿里云 SLS 多云日志接入最佳实践:链路、成本与高可用性优化
本文探讨了如何高效、经济且可靠地将海外应用与基础设施日志统一采集至阿里云日志服务(SLS),解决全球化业务扩展中的关键挑战。重点介绍了高性能日志采集Agent(iLogtail/LoongCollector)在海外场景的应用,推荐使用LoongCollector以获得更优的稳定性和网络容错能力。同时分析了多种网络接入方案,包括公网直连、全球加速优化、阿里云内网及专线/CEN/VPN接入等,并提供了成本优化策略和多目标发送配置指导,帮助企业构建稳定、低成本、高可用的全球日志系统。
920 54
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
420 9
|
11月前
|
存储 SQL 关系型数据库
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
901 35
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
|
11月前
|
存储 缓存 关系型数据库
图解MySQL【日志】——Redo Log
Redo Log(重做日志)是数据库中用于记录数据页修改的物理日志,确保事务的持久性和一致性。其主要作用包括崩溃恢复、提高性能和保证事务一致性。Redo Log 通过先写日志的方式,在内存中缓存修改操作,并在适当时候刷入磁盘,减少随机写入带来的性能损耗。WAL(Write-Ahead Logging)技术的核心思想是先将修改操作记录到日志文件中,再择机写入磁盘,从而实现高效且安全的数据持久化。Redo Log 的持久化过程涉及 Redo Log Buffer 和不同刷盘时机的控制参数(如 `innodb_flush_log_at_trx_commit`),以平衡性能与数据安全性。
603 5
图解MySQL【日志】——Redo Log
|
10月前
|
监控 Java 应用服务中间件
Tomcat log日志解析
理解和解析Tomcat日志文件对于诊断和解决Web应用中的问题至关重要。通过分析 `catalina.out`、`localhost.log`、`localhost_access_log.*.txt`、`manager.log`和 `host-manager.log`等日志文件,可以快速定位和解决问题,确保Tomcat服务器的稳定运行。掌握这些日志解析技巧,可以显著提高运维和开发效率。
1175 13
|
10月前
|
缓存 Java 编译器
|
SQL 关系型数据库 MySQL
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
693 7
MySQL事务日志-Undo Log工作原理分析
|
10月前
|
SQL 存储 关系型数据库
简单聊聊MySQL的三大日志(Redo Log、Binlog和Undo Log)各有什么区别
在MySQL数据库管理中,理解Redo Log(重做日志)、Binlog(二进制日志)和Undo Log(回滚日志)至关重要。Redo Log确保数据持久性和崩溃恢复;Binlog用于主从复制和数据恢复,记录逻辑操作;Undo Log支持事务的原子性和隔离性,实现回滚与MVCC。三者协同工作,保障事务ACID特性。文章还详细解析了日志写入流程及可能的异常情况,帮助深入理解数据库日志机制。
1265 0