开发者社区> taro_秋刀鱼> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

通过sts token 实现跨账户消费日志服务资源

简介: 阿里云账号可以通过创建并授权用户角色的方式赋予其他云账号一定的资源权限,其他云账号扮演该角色,并为其名下的RAM用户授予AssumeRole权限之后,其他云账号或其子账号可以通过访问STS接口获取临时AK和Token函数,调用日志服务API接口。
+关注继续查看

背景信息

出于业务隔离或项目外包等需求,云账号A希望将部分日志服务业务授权给云账号B,由云账号B操作维护这部分业务。基本需求及详细操作如下:

一、云账号B拥有向企业A的日志服务中写入数据和使用消费组的权限

1、企业A主账户ram控制台创建自定义角色

图片.png

图片.png

图片.png

2、登录日志服务控制台,到对应的project下使用权限助手生成策略脚本

图片.png

图片.png

3、ram控制台脚本方式添加自定义策略

图片.png

4、为角色添加生成的策略

图片.png

至此,企业账户A的操作全部结束。

二、云账号B的指定RAM用户也拥有日志服务的写入和消费组权限

1、登录企业主账户B,添加RAM子账户

图片.png

2、为创建的子账户添加调用STS服务AssumeRole接口的权限(AliyunSTSAssumeRoleAccess)

图片.png

3、为RAM子账户创建AccessKey&AccessSecret

图片.png

三、云账号B可获取STS临时凭证,访问日志服务API接口

1、pom.xml

        <!-- https://mvnrepository.com/artifact/com.aliyun.openservices/aliyun-log -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>aliyun-log</artifactId>
            <version>0.6.70</version>
        </dependency>


        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-sts</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.4.6</version>
        </dependency>


        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>loghub-client-lib</artifactId>
            <version>0.6.33</version>
        </dependency>

2、主程序


import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;

public class StsSample {

    public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
        // 子账户的ak,sk信息
        String accessKeyId = "*********";
        String accessKeySecret = "*********";
        // 企业账户A创建角色的ARN
        String roleArn = "acs:ram::172144026*******:role/***";
        // 企业账户A创建角色的名称
        String roleSessionName = "*******";
        AssumeRoleResponse response = new AssumeRoleResponse();
        try {
            //构建一个阿里云客户端,用于发起请求。
            //构建阿里云客户端时需要设置AccessKey ID和AccessKey Secret。
            DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret);
            IAcsClient client = new DefaultAcsClient(profile);

            //构造请求,设置参数。
            AssumeRoleRequest request = new AssumeRoleRequest();
            request.setRoleArn(roleArn);
            request.setRoleSessionName(roleSessionName);
            request.setDurationSeconds(3600L); //过期时间,单位为秒,过期时间最小值为900秒,最大值为MaxSessionDuration设置的时间。默认值为3600秒。

            response = client.getAcsResponse(request);
            System.out.println("Expiration: " + response.getCredentials().getExpiration());
            System.out.println("Access Key Id: " + response.getCredentials().getAccessKeyId());
            System.out.println("Access Key Secret: " + response.getCredentials().getAccessKeySecret());
            System.out.println("Security Token: " + response.getCredentials().getSecurityToken());
        } catch (ClientException e) {
            System.out.println("Failed to get a token.");
            System.out.println("Error code: " + e.getErrCode());
            System.out.println("Error message: " + e.getErrMsg());
        }

        // 消费日志程序
        String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        // means project region must be cn-hangzhou
        String sProject = "taros******";
        String sLogstore = "aiops_a*****";
        String sConsumerGroup = "token_group789";

        LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_2", sEndpoint, sProject, sLogstore, response.getCredentials().getAccessKeyId(), response.getCredentials().getAccessKeySecret(), LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
        config.setStsToken(response.getCredentials().getSecurityToken());
        ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
        Thread thread = new Thread(worker);
        //Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
        thread.start();
        Thread.sleep(60 * 60 * 1000);
        //调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
        worker.shutdown();
        //ClientWorker运行过程中会生成多个异步的任务,Shutdown完成后请等待还在执行的任务安全退出,建议sleep配置为30秒。
        Thread.sleep(30 * 1000);
    }
}

3、SampleLogHubProcessor.java


import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;

import java.util.List;

public class SampleLogHubProcessor implements ILogHubProcessor {
    private int shardId;
    // 记录上次持久化Checkpoint的时间。
    private long mLastCheckTime = 0;

    public void initialize(int shardId) {
        this.shardId = shardId;
    }

    // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
    public String process(List<LogGroupData> logGroups,
                          ILogHubCheckPointTracker checkPointTracker) {
        // 打印已获取的数据。
        for (LogGroupData logGroup : logGroups) {
            FastLogGroup flg = logGroup.GetFastLogGroup();
            System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                    flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
            System.out.println("Tags");
            for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                FastLogTag logtag = flg.getLogTags(tagIdx);
                System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
            }
            for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                FastLog log = flg.getLogs(lIdx);
                System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                    FastLogContent content = log.getContents(cIdx);
                    System.out.println(content.getKey() + "\t:\t" + content.getValue());
                }
            }
        }
        long curTime = System.currentTimeMillis();
        // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
        if (curTime - mLastCheckTime > 30 * 1000) {
            try {
                //参数为true表示立即将Checkpoint更新到服务端;false表示将Checkpoint缓存在本地,默认间隔60秒会将Checkpoint更新到服务端。
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            mLastCheckTime = curTime;
        }
        return null;
    }

    // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
    public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
        //将Checkpoint立即保存到服务端。
        try {
            checkPointTracker.saveCheckPoint(true);
        } catch (LogHubCheckPointException e) {
            e.printStackTrace();
        }
    }
}

class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
    public ILogHubProcessor generatorProcessor() {
        // 生成一个消费实例。
        return new SampleLogHubProcessor();
    }
}

4、logstore控制台消费者组查看

图片.png

更多参考

RocketMQ阿里云跨账户授权访问示例
阿里云基于STS获取临时访问权限使用示例
通过STS实现跨账号访问日志服务资源

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
日志服务(SLS)数据模拟器初体验
日志服务SLS是阿里集团自研的一站式日志平台,用户无需开发就能能够开箱即用地使用它来提升运维、运营效率,建立 DT 时代海量日志处理能力。SLS数据模拟器是SLS提供的一个用于接入模拟数据的数据接入方式,支持丰富的数据模拟场景,包含各类阿里云云产品日志、自建开源/商业软件日志以及Metric日志等,助力用户一键式导入模拟数据。
579 0
使用模拟日志轻松上手日志服务——数据实验室实践
数据实验室是日志服务(Log Service,简称 SLS)最新推出的工具,为用户提供各种场景的模拟日志数据,以及各种公共数据集数据。同时也根据场景建立对应的报表模板,方便用户从这些数据和报表模版入手以熟悉日志服务的查询,报表编辑等各种操作。
1398 0
日志服务 - 数据加工- Nginx日志解析实践
以nginx日志为例,简单介绍日志服务的数据加工功能
1099 0
导入MaxCompute数据到日志服务实战
日志服务支持将MaxCompute 中的数据导入到日志服务,利用日志服务的查询和可视化功能,对数据进行分析和可视化展示,使用数据加工对数据进一步处理,充分发掘数据的价值
1148 0
基于日志服务数据加工与RDS MySQL做数据富化以及数据分析
准备基于sls日志服务对共享单车租赁信息进行加工分析。sls日志服务上记录2019年8月上海地区某共享单车的数据,已脱敏处理,供研究之用。因RDS数据库里保存的是每辆自行车的编号、品牌以及投放批次。因此需要使用日志服务数据加工将单车实时动态记录日志与保存在RDS上的静态数据做富化和数据分析处理。
955 0
使用dataworks投递日志服务数据到MaxComputer
1. 为什么使用dataworks投递数据 日志服务提供了多种投递数据的方式,如: 在控制台直接配置投递任务,通过消费组获取数据然后再投递,以及使用dataworks配置投递任务等方式。控制台直接投递配置方式与dataworks类似,不过配置项更少,更容易操作。
3866 0
[日志服务][数据加工]e_anchor函数与e_regex函数的使用总结
简单介绍日志服务数据加工e_anchor函数与e_regex使用场景总结
774 0
使用Kafka Connect 同步Kafka数据到日志服务
使用Kafka Connect 同步Kafka数据到日志服务 简介 Kafka作为最流行的消息队列,在业界有这非常广泛的使用。不少用户把日志投递到Kafka之后,再使用其他的软件如ElasticSearch进行分析。
5702 0
日志服务数据加工最佳实践: 多子键为数组的复杂JSON加工
程序构建的日志经常会以一种统计性质的JSON格式写入, 通常其包含一个基础信息, 以及多个子健为数组的形式. 本篇如何使用日志服务数据加工处理多子键为数组的复杂JSON.
951 0
日志服务数据加工最佳实践: 加工多层数组对象嵌套的复杂JSON
许多程序的数据结构是一个复杂的包括多层数组嵌套的对象, 本篇介绍使用日志服务数据加工处理多层数组对象嵌套的复杂JSON.
1167 0
+关注
taro_秋刀鱼
博客园主页:https://home.cnblogs.com/u/taro/
文章
问答
来源圈子
更多
文章排行榜
最热
最新
相关电子书
更多
Python 系列直播——深入Python与日志服务,玩转大规模数据分析处理实战第二讲
立即下载
Python第四讲——使用IPython/Jupyter Notebook与日志服务玩转超大规模数据分析与可视化
立即下载
yqdh_58c13478654...1510469921.pdf
立即下载