背景信息
出于业务隔离或项目外包等需求,云账号A希望将部分日志服务业务授权给云账号B,由云账号B操作维护这部分业务。基本需求及详细操作如下:
一、云账号B拥有向企业A的日志服务中写入数据和使用消费组的权限
1、企业A主账户ram控制台创建自定义角色
2、登录日志服务控制台,到对应的project下使用权限助手生成策略脚本
3、ram控制台脚本方式添加自定义策略
4、为角色添加生成的策略
至此,企业账户A的操作全部结束。
二、云账号B的指定RAM用户也拥有日志服务的写入和消费组权限
1、登录企业主账户B,添加RAM子账户
2、为创建的子账户添加调用STS服务AssumeRole接口的权限(AliyunSTSAssumeRoleAccess)
3、为RAM子账户创建AccessKey&AccessSecret
三、云账号B可获取STS临时凭证,访问日志服务API接口
1、pom.xml
<!-- https://mvnrepository.com/artifact/com.aliyun.openservices/aliyun-log -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.70</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-sts</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.4.6</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.33</version>
</dependency>
2、主程序
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;
public class StsSample {
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// 子账户的ak,sk信息
String accessKeyId = "*********";
String accessKeySecret = "*********";
// 企业账户A创建角色的ARN
String roleArn = "acs:ram::172144026*******:role/***";
// 企业账户A创建角色的名称
String roleSessionName = "*******";
AssumeRoleResponse response = new AssumeRoleResponse();
try {
//构建一个阿里云客户端,用于发起请求。
//构建阿里云客户端时需要设置AccessKey ID和AccessKey Secret。
DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret);
IAcsClient client = new DefaultAcsClient(profile);
//构造请求,设置参数。
AssumeRoleRequest request = new AssumeRoleRequest();
request.setRoleArn(roleArn);
request.setRoleSessionName(roleSessionName);
request.setDurationSeconds(3600L); //过期时间,单位为秒,过期时间最小值为900秒,最大值为MaxSessionDuration设置的时间。默认值为3600秒。
response = client.getAcsResponse(request);
System.out.println("Expiration: " + response.getCredentials().getExpiration());
System.out.println("Access Key Id: " + response.getCredentials().getAccessKeyId());
System.out.println("Access Key Secret: " + response.getCredentials().getAccessKeySecret());
System.out.println("Security Token: " + response.getCredentials().getSecurityToken());
} catch (ClientException e) {
System.out.println("Failed to get a token.");
System.out.println("Error code: " + e.getErrCode());
System.out.println("Error message: " + e.getErrMsg());
}
// 消费日志程序
String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
// means project region must be cn-hangzhou
String sProject = "taros******";
String sLogstore = "aiops_a*****";
String sConsumerGroup = "token_group789";
LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_2", sEndpoint, sProject, sLogstore, response.getCredentials().getAccessKeyId(), response.getCredentials().getAccessKeySecret(), LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
config.setStsToken(response.getCredentials().getSecurityToken());
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
//Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
thread.start();
Thread.sleep(60 * 60 * 1000);
//调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
worker.shutdown();
//ClientWorker运行过程中会生成多个异步的任务,Shutdown完成后请等待还在执行的任务安全退出,建议sleep配置为30秒。
Thread.sleep(30 * 1000);
}
}
3、SampleLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
import java.util.List;
public class SampleLogHubProcessor implements ILogHubProcessor {
private int shardId;
// 记录上次持久化Checkpoint的时间。
private long mLastCheckTime = 0;
public void initialize(int shardId) {
this.shardId = shardId;
}
// 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
public String process(List<LogGroupData> logGroups,
ILogHubCheckPointTracker checkPointTracker) {
// 打印已获取的数据。
for (LogGroupData logGroup : logGroups) {
FastLogGroup flg = logGroup.GetFastLogGroup();
System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
System.out.println("Tags");
for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
FastLogTag logtag = flg.getLogTags(tagIdx);
System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
}
for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
FastLog log = flg.getLogs(lIdx);
System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
FastLogContent content = log.getContents(cIdx);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
if (curTime - mLastCheckTime > 30 * 1000) {
try {
//参数为true表示立即将Checkpoint更新到服务端;false表示将Checkpoint缓存在本地,默认间隔60秒会将Checkpoint更新到服务端。
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
mLastCheckTime = curTime;
}
return null;
}
// 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
//将Checkpoint立即保存到服务端。
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// 生成一个消费实例。
return new SampleLogHubProcessor();
}
}
4、logstore控制台消费者组查看