在rocketmq中rocketmqTool作为可视化和二次开发使用比较多的类org.apache.rocketmq.tools.admin.MQAdminExt,这个类在admin里面:
一、池化
在rocketmq中:
而我们知道创建rocketmq创建的过程中,会启动很多东西,这个连接的过程涉及到的内容比较多,可以看到rocketmq的rocketmq-dashboard和mqcloud里面都使用了池化技术-对象池。rocketmq-dashboard池化org.apache.rocketmq.dashboard.admin.MqAdminExtObjectPool:
publicGenericObjectPool<MQAdminExt>mqAdminExtPool() { GenericObjectPoolConfiggenericObjectPoolConfig=newGenericObjectPoolConfig(); genericObjectPoolConfig.setTestWhileIdle(true); genericObjectPoolConfig.setMaxWaitMillis(10000); genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(20000); MQAdminPooledObjectFactorymqAdminPooledObjectFactory=newMQAdminPooledObjectFactory(); MQAdminFactorymqAdminFactory=newMQAdminFactory(rmqConfigure); mqAdminPooledObjectFactory.setMqAdminFactory(mqAdminFactory); GenericObjectPool<MQAdminExt>genericObjectPool=newGenericObjectPool<MQAdminExt>( mqAdminPooledObjectFactory, genericObjectPoolConfig); returngenericObjectPool; }
同时进行借和还:
publicstaticvoidcreateMQAdmin(GenericObjectPool<MQAdminExt>mqAdminExtPool) { try { // Get the mqAdmin instance from the object poolMQAdminExtmqAdminExt=mqAdminExtPool.borrowObject(); MQ_ADMIN_EXT_THREAD_LOCAL.set(mqAdminExt); } catch (Exceptione) { LOGGER.error("get mqAdmin from pool error", e); } } publicstaticvoidreturnMQAdmin(GenericObjectPool<MQAdminExt>mqAdminExtPool) { MQAdminExtmqAdminExt=MQ_ADMIN_EXT_THREAD_LOCAL.get(); if (mqAdminExt!=null) { try { // After execution, return the mqAdmin instance to the object poolmqAdminExtPool.returnObject(mqAdminExt); } catch (Exceptione) { LOGGER.error("return mqAdmin to pool error", e); } } MQ_ADMIN_EXT_THREAD_LOCAL.remove(); }
同时mqcloud再次基础上做了自己的封装模板,方便每次调用的时候,进行回调:
privateGenericKeyedObjectPool<Cluster, MQAdminExt>mqPool; /*** 执行操作* @param callback* @return* @throws Exception*/public<T>Texecute(MQAdminCallback<T>callback) { MQAdminExtmqAdmin=null; try { // 获取mqAdmin实例mqAdmin=mqPool.borrowObject(callback.mqCluster()); if(mqAdmin==null) { logger.warn("cluster:{} cannot get mqadmin!", callback.mqCluster()); returnnull; } // 触发回调Tt=callback.callback(mqAdmin); returnt; } catch (Exceptione) { try { // 触发异常情况回调returncallback.exception(e); } catch (Exceptionex) { logger.warn("cluster:{} exception err:{}", callback.mqCluster(), ex.getMessage()); returnnull; } } finally { if(mqAdmin!=null) { try { mqPool.returnObject(callback.mqCluster(), mqAdmin); } catch (Exceptione) { logger.warn("cluster:{} shutdown err:{}", callback.mqCluster(), e.getMessage()); } } } }
解决了rocketmq实列对象的复用外,还需要知道rocketmq提供了哪些api可以进行扩展。
二、主要使用的api分类
mqClientInstance启动和关闭broker操作BrokerCluster操作PlainAccess操作GlobalWhiteAddr白名单topic主题操作SubscriptionGroup订阅组操作consume消费操作producer生产者操作Offset偏移量NameServer操作message操作subscription订阅task清理任务trace链路操作stat统计DefaultMQAdminExtImpl默认mqAdmin扩展实现
通过rocketmq-tool提供的api相关功能,我们可以进行rocketmq的mqAdmin扩展的启动、关闭,同时针对broker、producer、consume、offset、nameServer、message、trace相关操作和统计,同时触发task进行相关清理工作。
三、rocketmq-dashboard
在rocketmq-dashboard中,可以看到使用了它们的使用:
start启动getBrokerConfig获取broker配置examineBrokerClusterInfobrokerClusterInfo信息fetchBrokerRuntimeStats获取broker运行时统计信息viewMessage查看消息queryMessage查询消息messageTrackDetail链路详情consumeMessageDirectly直接消费消息examineConsumerConnectionInfo消费者连接信息examineProducerConnectionInfo生产者连接信息examineTopicStatsfetchAllTopicListfetchBrokerRuntimeStatsexamineConsumeStatsexamineTopicRouteInfogetNameServerAddressListwipeWritePermOfBroker.....
可以看到dashBoard基于rocketmq-tool模块,做了很多功能,而且这些功能都是可视化的。