這篇文章將為大家詳細(xì)講解有關(guān)RocketMQ中怎么實現(xiàn)權(quán)限控制,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
創(chuàng)新互聯(lián)10多年企業(yè)網(wǎng)站制作服務(wù);為您提供網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)頁設(shè)計及高端網(wǎng)站定制服務(wù),企業(yè)網(wǎng)站制作及推廣,對成都電動窗簾等多個方面擁有豐富的網(wǎng)站制作經(jīng)驗的網(wǎng)站建設(shè)公司。
1、簡單使用
ACL是access control list的簡稱,俗稱訪問控制列表。訪問控制,基本上會涉及到用戶、資源、權(quán)限、角色等概念,那在RocketMQ中上述會對應(yīng)哪些對象呢?
用戶:用戶是訪問控制的基礎(chǔ)要素,RocketMQ ACL必然也會引入用戶的概念,即支持用戶名、密碼。 資源:需要保護的對象,消息發(fā)送涉及的Topic、消息消費涉及的消費組,應(yīng)該進(jìn)行保護,故可以抽象成資源。 權(quán)限:針對資源,能進(jìn)行的操作。 角色:RocketMQ中,只定義兩種角色:是否是管理員。
acl默認(rèn)的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目錄下
需要使用acl必須在服務(wù)端開啟此功能,在Broker的配置文件中配置,aclEnable = true開啟此功能
配置plain_acl.yml文件
globalWhiteRemoteAddresses: - 10.10.15.* - 192.168.0.* accounts: - accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true
下面我們介紹一下plain_acl.yml文件中相關(guān)的參數(shù)含義及使用
字段 | 取值 | 含義 |
---|---|---|
globalWhiteRemoteAddresses | *;192.168.*.*;192.168.0.1 | 全局IP白名單 |
accessKey | 字符串 | Access Key 用戶名 |
secretKey | 字符串 | Secret Key 密碼 |
whiteRemoteAddress | *;192.168.*.*;192.168.0.1 | 用戶IP白名單 |
admin | true;false | 是否管理員賬戶 |
defaultTopicPerm | DENY;PUB;SUB;PUB|SUB | 默認(rèn)的Topic權(quán)限 |
defaultGroupPerm | DENY;PUB;SUB;PUB|SUB | 默認(rèn)的ConsumerGroup權(quán)限 |
topicPerms | topic=權(quán)限 | 各個Topic的權(quán)限 |
groupPerms | group=權(quán)限 | 各個ConsumerGroup的權(quán)限 |
權(quán)限標(biāo)識符的含義
權(quán)限 | 含義 |
---|---|
DENY | 拒絕 |
ANY | PUB 或者 SUB 權(quán)限 |
PUB | 發(fā)送權(quán)限 |
SUB | 訂閱權(quán)限 |
處理流程
特殊的請求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 賬戶進(jìn)行操作;
對于某個資源,如果有顯性配置權(quán)限,則采用配置的權(quán)限;如果沒有顯性配置權(quán)限,則采用默認(rèn)的權(quán)限
RocketMQ的權(quán)限控制存儲的默認(rèn)實現(xiàn)是基于yml配置文件。用戶可以動態(tài)修改權(quán)限控制定義的屬性,而不需重新啟動Broker服務(wù)節(jié)點
如果ACL與高可用部署(Master/Slave架構(gòu))同時啟用,那么需要在Broker Master節(jié)點的${ROCKETMQ_HOME}/store/conf/plain_acl.yml配置文件中 設(shè)置全局白名單信息,即為將Slave節(jié)點的ip地址設(shè)置至Master節(jié)點plain_acl.yml配置文件的全局白名單中
public class AclProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook()); producer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876"); producer.start(); for (int i = 0; i < 10; i++) { try { Message msg = new Message("topicA" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678")); } }
查看結(jié)果
報錯提示topicA沒有權(quán)限,我們在plain_acl.yml文件中配置的也確實是RocketMQ用戶拒絕,生產(chǎn)消費topicA主題信息,我們改變主題為topicB,則發(fā)現(xiàn)發(fā)送消息成功,topicB=PUB|SUB設(shè)置的權(quán)限是生產(chǎn)消費都可以。
查看結(jié)果
public class AclConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupA", getAclRPCHook(),new AllocateMessageQueueAveragely()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topicB", "*"); consumer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678")); } }
查看結(jié)果:發(fā)現(xiàn)沒有任何消息被消費,也沒有報錯信息,對于RocketMQ用戶topicB設(shè)置的就是可以可以生產(chǎn)可以消費的,但是我們發(fā)現(xiàn)其groupA=DENY是拒絕的,說明消費組是groupA則拒絕消費任何消息,我們改成groupB或者groupC查看結(jié)果。
Broker端ACL原理圖
Broker服務(wù)啟動時創(chuàng)建BrokerController并初始化initialize()時調(diào)用acl相關(guān)的初始化方法initialAcl()
private void initialAcl() { //broker配置文件中是否開啟ACL功能,默認(rèn)關(guān)閉 if (!this.brokerConfig.isAclEnable()) { log.info("The broker dose not enable acl"); return; } //獲取權(quán)限訪問校驗器的列表,加載的META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中指向 //org.apache.rocketmq.acl.plain.PlainAccessValidator,默認(rèn)只有一個 List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The broker dose not load the AccessValidator"); return; } for (AccessValidator accessValidator: accessValidators) { final AccessValidator validator = accessValidator; //注冊服務(wù)端就的“鉤子”對象,對權(quán)限進(jìn)行校驗 this.registerServerRPCHook(new RPCHook() { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { //Do not catch the exception validator.validate(validator.parse(request, remoteAddr)); } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { } }); } }
源碼中有相關(guān)的注解,我們查看一下注冊registerServerRPCHook方法
public void registerServerRPCHook(RPCHook rpcHook) { //服務(wù)端的NettyRemotingServer服務(wù)注冊“鉤子”函數(shù) getRemotingServer().registerRPCHook(rpcHook); this.fastRemotingServer.registerRPCHook(rpcHook); }
關(guān)于NettyRemotingServer服務(wù)和NettyRemotingClient服務(wù)配合使用,后面章節(jié)RocketMQ Remoting會重點分析
PlainAccessValidator.parse(),根據(jù)客戶端不同的請求Code其需要的檢驗資源也不一樣
switch (request.getCode()) { //發(fā)送消息需要校驗當(dāng)前的賬戶的topic是否具有PUB權(quán)限 case RequestCode.SEND_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB); break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); break; //拉取消息時需要知道該consumer賬戶下拉取的topic是否具有SUB權(quán)限,并且還要知道訂閱組consumerGroup是否有sub權(quán)限 case RequestCode.PULL_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB); break; case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB); for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) { accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB); } } break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB); accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB); break; default: break; }
根據(jù)request.getCode()獲取當(dāng)前的操作需要的權(quán)限標(biāo)識集合,供后面與系統(tǒng)的權(quán)限配置文件plain_acl.yml中的權(quán)限標(biāo)識符校驗時使用
Broker初始化相關(guān)服務(wù)的時候創(chuàng)建了PlainAccessValidator,我們發(fā)現(xiàn)其默認(rèn)的構(gòu)造方法中調(diào)用了其權(quán)限資源加載器PlainPermissionLoader
public PlainAccessValidator() { aclPlugEngine = new PlainPermissionLoader(); }
創(chuàng)建PlainPermissionLoader對象
public PlainPermissionLoader() { //加載服務(wù)端的權(quán)限文件plain_acl.yml load(); //開啟線程每500ms檢測權(quán)限文件是否改變,若改變則執(zhí)行l(wèi)oad()從新加載權(quán)限文件 watch(); }
查看load方法流程
public void load() { Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>(); List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>(); JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class); if (plainAclConfData == null || plainAclConfData.isEmpty()) { throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName)); } log.info("Broker plain acl conf data is : ", plainAclConfData.toString()); //獲取全局白名單IP集合 JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) { for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) { globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory. getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i))); } } //獲取賬戶權(quán)限集合 JSONArray accounts = plainAclConfData.getJSONArray("accounts"); if (accounts != null && !accounts.isEmpty()) { List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { //構(gòu)建每個賬戶的權(quán)限資源 PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig); //放入Map中AccessKey作為key,該賬戶的權(quán)限資源作為value plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource); } } this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.plainAccessResourceMap = plainAccessResourceMap; }
加載資源文件,解析其中的權(quán)限標(biāo)識,等待權(quán)限校驗器PlainAccessValidator調(diào)用其validate()對權(quán)限校驗
核心的校驗方法PlainPermissionLoader.validate()
public void validate(PlainAccessResource plainAccessResource) { //全局的白名單IP進(jìn)行校驗 for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { //匹配成功說明是全局的白名單IP,具有所有權(quán)限,直接返回。 if (remoteAddressStrategy.match(plainAccessResource)) { return; } } //判斷用戶名是否為空,null則拋出AclException異常 if (plainAccessResource.getAccessKey() == null) { throw new AclException(String.format("No accessKey is configured")); } //校驗賬戶是否存在于服務(wù)端的權(quán)限資源文件中plain_acl.yml,不在則拋出異常 if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); } PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); //檢查該賬戶的白名單IP是否匹配上客戶端IP,匹配成功具有所有權(quán)限,除UPDATE_AND_CREATE_TOPIC等特殊權(quán)限需要管理員權(quán)限 if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { return; } //校驗簽名 String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) { throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); } //校驗賬戶內(nèi)的資源權(quán)限 checkPerm(plainAccessResource, ownedAccess); }
查看其對于當(dāng)前賬戶內(nèi)部的資源校驗
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) { //判斷請求的命令的Code是否需要管理員權(quán)限,并判斷該用戶是否是管理員 if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); } Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap(); Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap(); if (needCheckedPermMap == null) { // If the needCheckedPermMap is null,then return return; } for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) { String resource = needCheckedEntry.getKey(); Byte neededPerm = needCheckedEntry.getValue(); //判斷是否是group,在構(gòu)建resourcePermMap時候,group的key=RETRY_GROUP_TOPIC_PREFIX + consumerGroup boolean isGroup = PlainAccessResource.isRetryTopic(resource); //系統(tǒng)的權(quán)限配置文件中配置項包不含該客戶端命令請求需要的權(quán)限 if (!ownedPermMap.containsKey(resource)) { //判斷其是否是topic還是group的權(quán)限標(biāo)識,獲取該類型的全局的權(quán)限是什么 byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() : needCheckedAccess.getDefaultTopicPerm(); //核對權(quán)限 if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } continue; } //系統(tǒng)的權(quán)限配置文件中配置項包含該客戶端命令請求需要的權(quán)限,則直接判斷其權(quán)限 if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } } }
所有的檢驗流程如果有一項不滿足則拋出AclException異常
上面圖中只是分析了Broker服務(wù)端的處理流程,客戶端如何調(diào)用我們具體分析下我們以發(fā)送消息為例:
我們之前分析過Producer的消息發(fā)送的核心方法是DefaultMQProducerImpl.sendKernelImpl()該方法
//是否注冊了“鉤子” if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } //封裝其ACL請求的參數(shù)信息 this.executeSendMessageHookBefore(context); }
hasSendMessageHook(),我們在構(gòu)建Producer的時候創(chuàng)建了該對象,加入到DefaultMQProducerImpl的sendMessageHookList屬性中。
我們查看其發(fā)送消息NettyRemotingClient類中調(diào)用AclClientRPCHook.doBeforeRequest()發(fā)送前的數(shù)據(jù)準(zhǔn)備
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); request.addExtField(SIGNATURE, signature); request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); // The SecurityToken value is unneccessary,user can choose this one. if (sessionCredentials.getSecurityToken() != null) { request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); } }
關(guān)于RocketMQ中怎么實現(xiàn)權(quán)限控制就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
標(biāo)題名稱:RocketMQ中怎么實現(xiàn)權(quán)限控制
URL鏈接:http://jinyejixie.com/article22/gdpsjc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供面包屑導(dǎo)航、移動網(wǎng)站建設(shè)、營銷型網(wǎng)站建設(shè)、品牌網(wǎng)站制作、小程序開發(fā)、微信公眾號
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)