有关RocketMQ ACL的使用请查看上一篇RocketMQ ACL使用指南,本文从源码的角度,分析一下RocketMQ ACL的实现原理。
备注:RocketMQ在4.4.0时引入了ACL机制,本文代码基于RocketMQ4.5.0版本。
根据RocketMQ ACL使用手册,我们应该首先看一下Broker服务器在开启ACL机制时如何加载配置文件,并如何工作的。
Broker端ACL的入口代码为:BrokerController#initialAcl
1private void initialAcl() { 2 if (!this.brokerConfig.isAclEnable()) { // @1 3 log.info("The broker dose not enable acl"); 4 return; 5 } 6 7 List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); // @2 8 if (accessValidators == null || accessValidators.isEmpty()) { 9 log.info("The broker dose not load the AccessValidator"); 10 return; 11 } 12 13 for (AccessValidator accessValidator: accessValidators) { // @3 14 final AccessValidator validator = accessValidator; 15 this.registerServerRPCHook(new RPCHook() { 16 17 @Override 18 public void doBeforeRequest(String remoteAddr, RemotingCommand request) { 19 //Do not catch the exception 20 validator.validate(validator.parse(request, remoteAddr)); // @4 21 } 22 23 @Override 24 public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { 25 } 26 }); 27 } 28}
本方法的实现共4个关键点。
代码@1:首先判断Broker是否开启了acl,通过配置参数aclEnable指定,默认为false。
代码@2:使用类似SPI机制,加载配置的AccessValidator,该方法返回一个列表,其实现逻辑时读取META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中配置的访问验证器,默认配置内容如下:
代码@3:遍历配置的访问验证器(AccessValidator),并向Broker处理服务器注册钩子函数,RPCHook的doBeforeRequest方法会在服务端接收到请求,将其请求解码后,执行处理请求之前被调用;RPCHook的doAfterResponse方法会在处理完请求后,将结果返回之前被调用,其调用如图所示:
代码@4:在RPCHook#doBeforeRequest方法中调用AccessValidator#validate, 在真实处理命令之前,先执行ACL的验证逻辑,如果拥有该操作的执行权限,则放行,否则抛出AclException。
接下来,我们将重点放到Broker默认实现的访问验证器:PlainAccessValidator。
2.1 类图
- AccessValidator
访问验证器接口,主要定义两个接口。
1)AccessResource parse(RemotingCommand request, String remoteAddr)
从请求头中解析本次请求对应的访问资源,即本次请求需要的访问权限。
2)void validate(AccessResource accessResource)
根据本次需要访问的权限,与请求用户拥有的权限进行对比验证,判断是拥有权限,如果没有访问该操作的权限,则抛出异常,否则放行。 - PlainAccessValidator
RocketMQ默认提供的基于yml配置格式的访问验证器。
接下来我们重点看一下PlainAccessValidator的parse方法与validate方法的实现细节。在讲解该方法之前,我们首先认识一下RocketMQ封装访问资源的PlainAccessResource。
2.1.2 PlainAccessResource类图
我们对其属性一一做个介绍:
- private String accessKey
访问Key,用户名。 - private String secretKey
用户密码。 - private String whiteRemoteAddress
远程IP地址白名单。 - private boolean admin
是否是管理员角色。 - private byte defaultTopicPerm = 1
默认topic访问权限,即如果没有配置topic的权限,则Topic默认的访问权限为1,表示为DENY。 - private byte defaultGroupPerm = 1
默认的消费组访问权限,默认为DENY。 - private Map resourcePermMap
资源需要的访问权限映射表。 - private RemoteAddressStrategy remoteAddressStrategy
远程IP地址验证策略。 - private int requestCode
当前请求的requestCode。 - private byte[] content
请求头与请求体的内容。 - private String signature
签名字符串,这是通常的套路,在客户端时,首先将请求参数排序,然后使用secretKey生成签名字符串,服务端重复这个步骤,然后对比签名字符串,如果相同,则认为登录成功,否则失败。 - private String secretToken
密钥token。 - private String recognition
目前作用未知,代码中目前未被使用。
2.2 构造方法
1public PlainAccessValidator() { 2 aclPlugEngine = new PlainPermissionLoader(); 3}
构造函数,直接创建PlainPermissionLoader对象,从命名上来看,应该是触发acl规则的加载,即解析plain_acl.yml,接下来会重点探讨,即acl启动流程之配置文件的解析。
2.3 parse方法
该方法的作用就是从请求命令中解析出本次访问所需要的访问权限,最终构建AccessResource对象,为后续的校验权限做准备。
1PlainAccessResource accessResource = new PlainAccessResource(); 2if (remoteAddr != null && remoteAddr.contains(":")) { 3 accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]); 4} else { 5 accessResource.setWhiteRemoteAddress(remoteAddr); 6}
Step1:首先创建PlainAccessResource,从远程地址中提取出远程访问IP地址。
1if (request.getExtFields() == null) { 2 throw new AclException("request's extFields value is null"); 3} 4accessResource.setRequestCode(request.getCode()); 5accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY)); 6accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE)); 7accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
Step2:如果请求头中的扩展字段为空,则抛出异常,如果不为空,则从请求头中读取requestCode、accessKey(请求用户名)、签名字符串(signature)、secretToken。
1try { 2 switch (request.getCode()) { 3 case RequestCode.SEND_MESSAGE: 4 accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); 5 break; 6 case RequestCode.SEND_MESSAGE_V2: 7 accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB); 8 break; 9 case RequestCode.CONSUMER_SEND_MSG_BACK: 10 accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB); 11 accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); 12 break; 13 case RequestCode.PULL_MESSAGE: 14 accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); 15 accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB); 16 break; 17 case RequestCode.QUERY_MESSAGE: 18 accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); 19 break; 20 case RequestCode.HEART_BEAT: 21 HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); 22 for (ConsumerData data : heartbeatData.getConsumerDataSet()) { 23 accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB); 24 for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) { 25 accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB); 26 } 27 } 28 break; 29 case RequestCode.UNREGISTER_CLIENT: 30 final UnregisterClientRequestHeader unregisterClientRequestHeader = 31 (UnregisterClientRequestHeader) request 32 .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); 33 accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB); 34 break; 35 case RequestCode.GET_CONSUMER_LIST_BY_GROUP: 36 final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = 37 (GetConsumerListByGroupRequestHeader) request 38 .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); 39 accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB); 40 break; 41 case RequestCode.UPDATE_CONSUMER_OFFSET: 42 final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = 43 (UpdateConsumerOffsetRequestHeader) request 44 .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); 45 accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB); 46 accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB); 47 break; 48 default: 49 break; 50 51 } 52 } catch (Throwable t) { 53 throw new AclException(t.getMessage(), t); 54 }
Step3:根据请求命令,设置本次请求需要拥有的权限,上述代码比较简单,就是从请求中得出本次操作的Topic、消息组名称,为了方便区分topic与消费组,消费组使用消费者对应的重试主题,当成资源的Key,从这里也可以看出,当前版本需要进行ACL权限验证的请求命令如下:
- SEND_MESSAGE
- SEND_MESSAGE_V2
- CONSUMER_SEND_MSG_BACK
- PULL_MESSAGE
- QUERY_MESSAGE
- HEART_BEAT
- UNREGISTER_CLIENT
- GET_CONSUMER_LIST_BY_GROUP
- UPDATE_CONSUMER_OFFSET
1// Content 2SortedMap<String, String> map = new TreeMap<String, String>(); 3for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) { 4 if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) { 5 map.put(entry.getKey(), entry.getValue()); 6 } 7} 8accessResource.setContent(AclUtils.combineRequestContent(request, map)); 9return accessResource;
Step4:对扩展字段进行排序,便于生成签名字符串,然后将扩展字段与请求体(body)写入content字段。完成从请求头中解析出本次请求需要验证的权限。
2.4 validate 方法
1public void validate(AccessResource accessResource) { 2 aclPlugEngine.validate((PlainAccessResource) accessResource); 3}
验证权限,即根据本次请求需要的权限与当前用户所拥有的权限进行对比,如果符合,则正常执行;否则抛出AclException。
为了揭开配置文件的解析与验证,我们将目光投入到PlainPermissionLoader。
该类的主要职责:加载权限,即解析acl主要配置文件plain_acl.yml。
3.1 类图
下面对其核心属性与核心方法一一介绍:
- DEFAULT_PLAIN_ACL_FILE
默认acl配置文件名称,默认值为conf/plain_acl.yml。 - String fileName
acl配置文件名称,默认为DEFAULT_PLAIN_ACL_FILE ,可以通过系统参数-Drocketmq.acl.plain.file=fileName指定。 - Map plainAccessResourceMap
解析出来的权限配置映射表,以用户名为键。 - RemoteAddressStrategyFactory remoteAddressStrategyFactory
远程IP解析策略工厂,用于解析白名单IP地址。 - boolean isWatchStart
是否开启了文件监听,即自动监听plain_acl.yml文件,一旦该文件改变,可在不重启服务器的情况下自动生效。 - public PlainPermissionLoader()
构造方法。 - public void load()
加载配置文件。 - public void validate(PlainAccessResource plainAccessResource)
验证是否有权限访问待访问资源。
3.2 PlainPermissionLoader构造方法
1public PlainPermissionLoader() { 2 load(); 3 watch(); 4}
在构造方法中调用load与watch方法。
3.3 load
1Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>(); 2List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>(); 3String path = fileHome + File.separator + fileName; 4JSONObject plainAclConfData = AclUtils.getYamlDataObject(path,JSONObject.class);
Step1:初始化plainAccessResourceMap(用户配置的访问资源,即权限容器)、globalWhiteRemoteAddressStrategy:全局IP白名单访问策略。配置文件,默认为${ROCKETMQ_HOME}/conf/plain_acl.yml。
1JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses"); 2if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) { 3 for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) { 4 globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory. 5 getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i))); 6 } 7}
Step2:globalWhiteRemoteAddresses:全局白名单,类型为数组。根据配置的规则,使用remoteAddressStrategyFactory获取一个访问策略,下文会重点介绍其配置规则。
1JSONArray accounts = plainAclConfData.getJSONArray("accounts"); 2if (accounts != null && !accounts.isEmpty()) { 3 List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); 4 for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { 5 PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig); 6 plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource); 7 } 8} 9this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; 10this.plainAccessResourceMap = plainAccessResourceMap;
Step3:解析plain_acl.yml文件中的另外一个根元素accounts,用户定义的权限信息。从PlainAccessConfig的定义来看,accounts标签下支持如下标签:
- accessKey
- secretKey
- whiteRemoteAddress
- admin
- defaultTopicPerm
- defaultGroupPerm
- topicPerms
- groupPerms
上述标签的说明,请参考:RocketMQ ACL使用指南 。具体的解析过程比较容易,就不再细说。
load方法主要完成acl配置文件的解析,将用户定义的权限加载到内存中。
3.4 watch
1private void watch() { 2 try { 3 String watchFilePath = fileHome + fileName; 4 FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() { 5 @Override 6 public void onChanged(String path) { 7 log.info("The plain acl yml changed, reload the context"); 8 load(); 9 } 10 }); 11 fileWatchService.start(); 12 log.info("Succeed to start AclWatcherService"); 13 this.isWatchStart = true; 14 } catch (Exception e) { 15 log.error("Failed to start AclWatcherService", e); 16 } 17}
监听器,默认以500ms的频率判断文件的内容是否变化。在文件内容发生变化后调用load()方法,重新加载配置文件。那FileWatchService是如何判断两个文件的内容发生了变化呢?
1FileWatchService#hash 2private String hash(String filePath) throws IOException, NoSuchAlgorithmException { 3 Path path = Paths.get(filePath); 4 md.update(Files.readAllBytes(path)); 5 byte[] hash = md.digest(); 6 return UtilAll.bytes2string(hash); 7}
获取文件md5签名来做对比,这里为什么不在启动时先记录上一次文件的修改时间,然后先判断其修改时间是否变化,再判断其内容是否真正发生变化。
3.5 validate
1// Check the global white remote addr 2for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { 3 if (remoteAddressStrategy.match(plainAccessResource)) { 4 return; 5 } 6}
Step1:首先使用全局白名单对资源进行验证,只要一个规则匹配,则返回,表示认证成功。
1if (plainAccessResource.getAccessKey() == null) { 2 throw new AclException(String.format("No accessKey is configured")); 3} 4if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { 5 throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); 6} 7Step2:如果请求信息中,没有设置用户名,则抛出未配置AccessKey异常;如果Broker中并为配置该用户的配置信息,则抛出AclException。 8 9// Check the white addr for accesskey 10PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); 11if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { 12 return; 13}
Step3:如果用户配置的白名单与待访问资源规则匹配的话,则直接发认证通过。
1// Check the signature 2String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); 3if (!signature.equals(plainAccessResource.getSignature())) { 4 throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); 5}
Step4:验证签名。
1checkPerm(plainAccessResource, ownedAccess);
Step5:调用checkPerm方法,验证需要的权限与拥有的权限是否匹配。
3.5.1 checkPerm
1if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { 2 throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); 3}
Step6:如果当前的请求命令属于必须是Admin用户才能访问的权限,并且当前用户并不是管理员角色,则抛出异常,如下命令需要admin角色才能进行的操作:
1Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap(); 2Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap(); 3if (needCheckedPermMap == null) { 4 // If the needCheckedPermMap is null,then return 5 return; 6} 7if (ownedPermMap == null && ownedAccess.isAdmin()) { 8 // If the ownedPermMap is null and it is an admin user, then return 9 return; 10}
Step7:如果该请求不需要进行权限验证,则通过认证,如果当前用户的角色是管理员,并且没有配置用户权限,则认证通过,返回。
1for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) { 2 String resource = needCheckedEntry.getKey(); 3 Byte neededPerm = needCheckedEntry.getValue(); 4 boolean isGroup = PlainAccessResource.isRetryTopic(resource); 5 6 if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) { 7 // Check the default perm 8 byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm(); 9 if (!Permission.checkPermission(neededPerm, ownedPerm)) { 10 throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); 11 } 12 continue; 13 } 14 if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) { 15 throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); 16 } 17}
Step8:遍历需要权限与拥有的权限进行对比,如果配置对应的权限,则判断是否匹配;如果未配置权限,则判断默认权限时是否允许,不允许,则抛出AclException。
验证逻辑就介绍到这里了,下面给出其匹配流程图:
上述阐述了从Broker服务器启动、加载acl配置文件流程、动态监听配置文件、服务端权限验证流程,接下来我们看一下客户端关于ACL需要处理的事情。
回顾一下,我们引入ACL机制后,客户端的代码示例如下:
其在创建DefaultMQProducer时,注册AclClientRPCHook钩子,会在向服务端发送远程命令前后执行其钩子函数,接下来我们重点分析一下AclClientRPCHook。
4.1 doBeforeRequest
1public void doBeforeRequest(String remoteAddr, RemotingCommand request) { 2 byte[] total = AclUtils.combineRequestContent(request, 3 parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); // @1 4 String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); // @2 5 request.addExtField(SIGNATURE, signature); // @3 6 request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); 7 // The SecurityToken value is unneccessary,user can choose this one. 8 if (sessionCredentials.getSecurityToken() != null) { 9 request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); 10 } 11}
代码@1:将Request请求参数进行排序,并加入accessKey。
代码@2:对排好序的请参数,使用用户配置的密码生成签名,并最近到扩展字段Signature,然后服务端也会按照相同的算法生成Signature,如果相同,则表示签名验证成功(类似于实现登录的效果)。
代码@3:将Signature、AccessKey等加入到请求头的扩展字段中,服务端拿到这些元数据,结合请求头中的信息,根据配置的权限,进行权限校验。
关于ACL客户端生成签名是一种通用套路,就不在细讲了。
源码分析ACL的实现就介绍到这里了,下文将介绍RocketMQ 消息轨迹的使用与实现原理分析。