通过sts token 实现跨账户消费日志服务资源

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 阿里云账号可以通过创建并授权用户角色的方式赋予其他云账号一定的资源权限,其他云账号扮演该角色,并为其名下的RAM用户授予AssumeRole权限之后,其他云账号或其子账号可以通过访问STS接口获取临时AK和Token函数,调用日志服务API接口。

背景信息

出于业务隔离或项目外包等需求,云账号A希望将部分日志服务业务授权给云账号B,由云账号B操作维护这部分业务。基本需求及详细操作如下:

一、云账号B拥有向企业A的日志服务中写入数据和使用消费组的权限

1、企业A主账户ram控制台创建自定义角色

图片.png

图片.png

图片.png

2、登录日志服务控制台,到对应的project下使用权限助手生成策略脚本

图片.png

图片.png

3、ram控制台脚本方式添加自定义策略

图片.png

4、为角色添加生成的策略

图片.png

至此,企业账户A的操作全部结束。

二、云账号B的指定RAM用户也拥有日志服务的写入和消费组权限

1、登录企业主账户B,添加RAM子账户

图片.png

2、为创建的子账户添加调用STS服务AssumeRole接口的权限(AliyunSTSAssumeRoleAccess)

图片.png

3、为RAM子账户创建AccessKey&AccessSecret

图片.png

三、云账号B可获取STS临时凭证,访问日志服务API接口

1、pom.xml

        <!-- https://mvnrepository.com/artifact/com.aliyun.openservices/aliyun-log -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>aliyun-log</artifactId>
            <version>0.6.70</version>
        </dependency>


        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-sts</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.4.6</version>
        </dependency>


        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>loghub-client-lib</artifactId>
            <version>0.6.33</version>
        </dependency>

2、主程序


import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;

public class StsSample {

    public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
        // 子账户的ak,sk信息
        String accessKeyId = "*********";
        String accessKeySecret = "*********";
        // 企业账户A创建角色的ARN
        String roleArn = "acs:ram::172144026*******:role/***";
        // 企业账户A创建角色的名称
        String roleSessionName = "*******";
        AssumeRoleResponse response = new AssumeRoleResponse();
        try {
            //构建一个阿里云客户端,用于发起请求。
            //构建阿里云客户端时需要设置AccessKey ID和AccessKey Secret。
            DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret);
            IAcsClient client = new DefaultAcsClient(profile);

            //构造请求,设置参数。
            AssumeRoleRequest request = new AssumeRoleRequest();
            request.setRoleArn(roleArn);
            request.setRoleSessionName(roleSessionName);
            request.setDurationSeconds(3600L); //过期时间,单位为秒,过期时间最小值为900秒,最大值为MaxSessionDuration设置的时间。默认值为3600秒。

            response = client.getAcsResponse(request);
            System.out.println("Expiration: " + response.getCredentials().getExpiration());
            System.out.println("Access Key Id: " + response.getCredentials().getAccessKeyId());
            System.out.println("Access Key Secret: " + response.getCredentials().getAccessKeySecret());
            System.out.println("Security Token: " + response.getCredentials().getSecurityToken());
        } catch (ClientException e) {
            System.out.println("Failed to get a token.");
            System.out.println("Error code: " + e.getErrCode());
            System.out.println("Error message: " + e.getErrMsg());
        }

        // 消费日志程序
        String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        // means project region must be cn-hangzhou
        String sProject = "taros******";
        String sLogstore = "aiops_a*****";
        String sConsumerGroup = "token_group789";

        LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_2", sEndpoint, sProject, sLogstore, response.getCredentials().getAccessKeyId(), response.getCredentials().getAccessKeySecret(), LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
        config.setStsToken(response.getCredentials().getSecurityToken());
        ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
        Thread thread = new Thread(worker);
        //Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
        thread.start();
        Thread.sleep(60 * 60 * 1000);
        //调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
        worker.shutdown();
        //ClientWorker运行过程中会生成多个异步的任务,Shutdown完成后请等待还在执行的任务安全退出,建议sleep配置为30秒。
        Thread.sleep(30 * 1000);
    }
}

3、SampleLogHubProcessor.java


import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;

import java.util.List;

public class SampleLogHubProcessor implements ILogHubProcessor {
    private int shardId;
    // 记录上次持久化Checkpoint的时间。
    private long mLastCheckTime = 0;

    public void initialize(int shardId) {
        this.shardId = shardId;
    }

    // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
    public String process(List<LogGroupData> logGroups,
                          ILogHubCheckPointTracker checkPointTracker) {
        // 打印已获取的数据。
        for (LogGroupData logGroup : logGroups) {
            FastLogGroup flg = logGroup.GetFastLogGroup();
            System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                    flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
            System.out.println("Tags");
            for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                FastLogTag logtag = flg.getLogTags(tagIdx);
                System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
            }
            for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                FastLog log = flg.getLogs(lIdx);
                System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                    FastLogContent content = log.getContents(cIdx);
                    System.out.println(content.getKey() + "\t:\t" + content.getValue());
                }
            }
        }
        long curTime = System.currentTimeMillis();
        // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
        if (curTime - mLastCheckTime > 30 * 1000) {
            try {
                //参数为true表示立即将Checkpoint更新到服务端;false表示将Checkpoint缓存在本地,默认间隔60秒会将Checkpoint更新到服务端。
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            mLastCheckTime = curTime;
        }
        return null;
    }

    // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
    public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
        //将Checkpoint立即保存到服务端。
        try {
            checkPointTracker.saveCheckPoint(true);
        } catch (LogHubCheckPointException e) {
            e.printStackTrace();
        }
    }
}

class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
    public ILogHubProcessor generatorProcessor() {
        // 生成一个消费实例。
        return new SampleLogHubProcessor();
    }
}

4、logstore控制台消费者组查看

图片.png

更多参考

RocketMQ阿里云跨账户授权访问示例
阿里云基于STS获取临时访问权限使用示例
通过STS实现跨账号访问日志服务资源

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
3月前
|
运维 DataWorks 安全
DataWorks产品使用合集之任务日志中显示等待gateway调度资源,该如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
存储 监控 Apache
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
网易的灵犀办公和云信利用 Apache Doris 改进了大规模日志和时序数据处理,取代了 Elasticsearch 和 InfluxDB。Doris 实现了更低的服务器资源消耗和更高的查询性能,相比 Elasticsearch,查询速度提升至少 11 倍,存储资源节省达 70%。Doris 的列式存储、高压缩比和倒排索引等功能,优化了日志和时序数据的存储与分析,降低了存储成本并提高了查询效率。在灵犀办公和云信的实际应用中,Doris 显示出显著的性能优势,成功应对了数据增长带来的挑战。
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
|
6月前
|
SQL DataWorks 安全
DataWorks产品使用合集之DataWorks资源里python运行时候,查看中途打印日志如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
60 0
|
Kubernetes 容器
DataStreams+logstash+ILM进行日志定时删除,节省硬盘资源
目前所有的K8S上的容器日志都被收集到了我们的ELK上,随着时间的推移,ELK上的日志所占的存储空间越来越多,我们需要一个定时清理的策略,以节约硬盘资源。 我们主要配置以下ELK里的这几个地方 1.通过kibana新增一个lifecycle policies 2.通过kibana新增一个index template,注意配置DataStreams 3.logstash 的logstashPipeline 4.filebeat的filebeat.yml文件
934 0
DataStreams+logstash+ILM进行日志定时删除,节省硬盘资源
|
监控 Kubernetes 测试技术
使用基于访问日志分析自动推荐的Sidecar资源
在默认情况下,由于不能确定网格内服务之间的调用依赖关系,Sidecar的配置中保存了数据平面内所有服务的信息;同时,一次针对控制平面或数据平面的修改都会引起控制平面向数据平面所有Sidecar的一次配置推送。您可以使用Sidecar资源配置Sidecar、来调优与应用实例的出口和入口通信。您可以参考管理Sidecar资源,来自行创建并管理Sidecar资源。除此之外,服务网格ASM可以通过分析数据
|
2天前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
64 30
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
28天前
|
XML JSON Java
Logback 与 log4j2 性能对比:谁才是日志框架的性能王者?
【10月更文挑战第5天】在Java开发中,日志框架是不可或缺的工具,它们帮助我们记录系统运行时的信息、警告和错误,对于开发人员来说至关重要。在众多日志框架中,Logback和log4j2以其卓越的性能和丰富的功能脱颖而出,成为开发者们的首选。本文将深入探讨Logback与log4j2在性能方面的对比,通过详细的分析和实例,帮助大家理解两者之间的性能差异,以便在实际项目中做出更明智的选择。
176 3
|
29天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1607 14