通过sts token 实现跨账户消费日志服务资源

简介: 阿里云账号可以通过创建并授权用户角色的方式赋予其他云账号一定的资源权限,其他云账号扮演该角色,并为其名下的RAM用户授予AssumeRole权限之后,其他云账号或其子账号可以通过访问STS接口获取临时AK和Token函数,调用日志服务API接口。

背景信息

出于业务隔离或项目外包等需求,云账号A希望将部分日志服务业务授权给云账号B,由云账号B操作维护这部分业务。基本需求及详细操作如下:

一、云账号B拥有向企业A的日志服务中写入数据和使用消费组的权限

1、企业A主账户ram控制台创建自定义角色

图片.png

图片.png

图片.png

2、登录日志服务控制台,到对应的project下使用权限助手生成策略脚本

图片.png

图片.png

3、ram控制台脚本方式添加自定义策略

图片.png

4、为角色添加生成的策略

图片.png

至此,企业账户A的操作全部结束。

二、云账号B的指定RAM用户也拥有日志服务的写入和消费组权限

1、登录企业主账户B,添加RAM子账户

图片.png

2、为创建的子账户添加调用STS服务AssumeRole接口的权限(AliyunSTSAssumeRoleAccess)

图片.png

3、为RAM子账户创建AccessKey&AccessSecret

图片.png

三、云账号B可获取STS临时凭证,访问日志服务API接口

1、pom.xml

        <!-- https://mvnrepository.com/artifact/com.aliyun.openservices/aliyun-log -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>aliyun-log</artifactId>
            <version>0.6.70</version>
        </dependency>


        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-sts</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.4.6</version>
        </dependency>


        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>loghub-client-lib</artifactId>
            <version>0.6.33</version>
        </dependency>

2、主程序


import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;

public class StsSample {

    public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
        // 子账户的ak,sk信息
        String accessKeyId = "*********";
        String accessKeySecret = "*********";
        // 企业账户A创建角色的ARN
        String roleArn = "acs:ram::172144026*******:role/***";
        // 企业账户A创建角色的名称
        String roleSessionName = "*******";
        AssumeRoleResponse response = new AssumeRoleResponse();
        try {
            //构建一个阿里云客户端,用于发起请求。
            //构建阿里云客户端时需要设置AccessKey ID和AccessKey Secret。
            DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret);
            IAcsClient client = new DefaultAcsClient(profile);

            //构造请求,设置参数。
            AssumeRoleRequest request = new AssumeRoleRequest();
            request.setRoleArn(roleArn);
            request.setRoleSessionName(roleSessionName);
            request.setDurationSeconds(3600L); //过期时间,单位为秒,过期时间最小值为900秒,最大值为MaxSessionDuration设置的时间。默认值为3600秒。

            response = client.getAcsResponse(request);
            System.out.println("Expiration: " + response.getCredentials().getExpiration());
            System.out.println("Access Key Id: " + response.getCredentials().getAccessKeyId());
            System.out.println("Access Key Secret: " + response.getCredentials().getAccessKeySecret());
            System.out.println("Security Token: " + response.getCredentials().getSecurityToken());
        } catch (ClientException e) {
            System.out.println("Failed to get a token.");
            System.out.println("Error code: " + e.getErrCode());
            System.out.println("Error message: " + e.getErrMsg());
        }

        // 消费日志程序
        String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        // means project region must be cn-hangzhou
        String sProject = "taros******";
        String sLogstore = "aiops_a*****";
        String sConsumerGroup = "token_group789";

        LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_2", sEndpoint, sProject, sLogstore, response.getCredentials().getAccessKeyId(), response.getCredentials().getAccessKeySecret(), LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
        config.setStsToken(response.getCredentials().getSecurityToken());
        ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
        Thread thread = new Thread(worker);
        //Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
        thread.start();
        Thread.sleep(60 * 60 * 1000);
        //调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
        worker.shutdown();
        //ClientWorker运行过程中会生成多个异步的任务,Shutdown完成后请等待还在执行的任务安全退出,建议sleep配置为30秒。
        Thread.sleep(30 * 1000);
    }
}

3、SampleLogHubProcessor.java


import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;

import java.util.List;

public class SampleLogHubProcessor implements ILogHubProcessor {
    private int shardId;
    // 记录上次持久化Checkpoint的时间。
    private long mLastCheckTime = 0;

    public void initialize(int shardId) {
        this.shardId = shardId;
    }

    // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
    public String process(List<LogGroupData> logGroups,
                          ILogHubCheckPointTracker checkPointTracker) {
        // 打印已获取的数据。
        for (LogGroupData logGroup : logGroups) {
            FastLogGroup flg = logGroup.GetFastLogGroup();
            System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                    flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
            System.out.println("Tags");
            for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                FastLogTag logtag = flg.getLogTags(tagIdx);
                System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
            }
            for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                FastLog log = flg.getLogs(lIdx);
                System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                    FastLogContent content = log.getContents(cIdx);
                    System.out.println(content.getKey() + "\t:\t" + content.getValue());
                }
            }
        }
        long curTime = System.currentTimeMillis();
        // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
        if (curTime - mLastCheckTime > 30 * 1000) {
            try {
                //参数为true表示立即将Checkpoint更新到服务端;false表示将Checkpoint缓存在本地,默认间隔60秒会将Checkpoint更新到服务端。
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            mLastCheckTime = curTime;
        }
        return null;
    }

    // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
    public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
        //将Checkpoint立即保存到服务端。
        try {
            checkPointTracker.saveCheckPoint(true);
        } catch (LogHubCheckPointException e) {
            e.printStackTrace();
        }
    }
}

class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
    public ILogHubProcessor generatorProcessor() {
        // 生成一个消费实例。
        return new SampleLogHubProcessor();
    }
}

4、logstore控制台消费者组查看

图片.png

更多参考

RocketMQ阿里云跨账户授权访问示例
阿里云基于STS获取临时访问权限使用示例
通过STS实现跨账号访问日志服务资源

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
5天前
|
存储 监控 Apache
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
网易的灵犀办公和云信利用 Apache Doris 改进了大规模日志和时序数据处理,取代了 Elasticsearch 和 InfluxDB。Doris 实现了更低的服务器资源消耗和更高的查询性能,相比 Elasticsearch,查询速度提升至少 11 倍,存储资源节省达 70%。Doris 的列式存储、高压缩比和倒排索引等功能,优化了日志和时序数据的存储与分析,降低了存储成本并提高了查询效率。在灵犀办公和云信的实际应用中,Doris 显示出显著的性能优势,成功应对了数据增长带来的挑战。
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
|
11天前
|
SQL DataWorks 安全
DataWorks产品使用合集之DataWorks资源里python运行时候,查看中途打印日志如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
26 0
|
监控 Kubernetes 测试技术
使用基于访问日志分析自动推荐的Sidecar资源
在默认情况下,由于不能确定网格内服务之间的调用依赖关系,Sidecar的配置中保存了数据平面内所有服务的信息;同时,一次针对控制平面或数据平面的修改都会引起控制平面向数据平面所有Sidecar的一次配置推送。您可以使用Sidecar资源配置Sidecar、来调优与应用实例的出口和入口通信。您可以参考管理Sidecar资源,来自行创建并管理Sidecar资源。除此之外,服务网格ASM可以通过分析数据
|
Kubernetes 容器
DataStreams+logstash+ILM进行日志定时删除,节省硬盘资源
目前所有的K8S上的容器日志都被收集到了我们的ELK上,随着时间的推移,ELK上的日志所占的存储空间越来越多,我们需要一个定时清理的策略,以节约硬盘资源。 我们主要配置以下ELK里的这几个地方 1.通过kibana新增一个lifecycle policies 2.通过kibana新增一个index template,注意配置DataStreams 3.logstash 的logstashPipeline 4.filebeat的filebeat.yml文件
791 0
DataStreams+logstash+ILM进行日志定时删除,节省硬盘资源
|
4天前
|
C++
JNI Log 日志输出
JNI Log 日志输出
12 1
|
4天前
|
存储 运维 大数据
聊聊日志硬扫描,阿里 Log Scan 的设计与实践
泛日志(Log/Trace/Metric)是大数据的重要组成,伴随着每一年业务峰值的新脉冲,日志数据量在快速增长。同时,业务数字化运营、软件可观测性等浪潮又在对日志的存储、计算提出更高的要求。
|
10天前
|
XML Java Maven
Springboot整合与使用log4j2日志框架【详解版】
该文介绍了如何在Spring Boot中切换默认的LogBack日志系统至Log4j2。首先,需要在Maven依赖中排除`spring-boot-starter-logging`并引入`spring-boot-starter-log4j2`。其次,创建`log4j2-spring.xml`配置文件放在`src/main/resources`下,配置包括控制台和文件的日志输出、日志格式和文件切分策略。此外,可通过在不同环境的`application.yml`中指定不同的log4j2配置文件。最后,文章提到通过示例代码解释了日志格式中的各种占位符含义。
|
11天前
|
运维 监控 Go
Golang深入浅出之-Go语言中的日志记录:log与logrus库
【4月更文挑战第27天】本文比较了Go语言中标准库`log`与第三方库`logrus`的日志功能。`log`简单但不支持日志级别配置和多样化格式,而`logrus`提供更丰富的功能,如日志级别控制、自定义格式和钩子。文章指出了使用`logrus`时可能遇到的问题,如全局logger滥用、日志级别设置不当和过度依赖字段,并给出了避免错误的建议,强调理解日志级别、合理利用结构化日志、模块化日志管理和定期审查日志配置的重要性。通过这些实践,开发者能提高应用监控和故障排查能力。
87 1