暂时未有相关云产品技术能力~
RocketMQ整合Spring后,可以很方便的使用。那么这里rocketmq-spring做了哪些事情,可以使得我们可以很方便使用呢?一、rocketmq-spring对消息的处理过程 rocketmq-spring-boot整合后的入口:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration,通过这个类,我们可以它的一系列动作操作,会首先将相关配置MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、ExtConsumerResetConfiguration、RocketMQTransactionConfiguration、RocketMQListenerConfiguration进行导入。在自动注入前处理事务配置RocketMQTransactionConfiguration,在自动注入后处理消息转换器配置MessageConverterConfiguration。完成之后会填充两个bean:defaultMQProducerdefaultLitePullConsumer根据当前启动的生产和消费者,进行填充。完成后,就可以根据注册的bean容器,此时会执行bean后置处理器操作,完成对带RocketMQMessageListener的注解进行增强,然后注册。注册完成后,便可以对默认rocketmq监听器容器DefaultRocketMQListenerContainer进行启动,启动的过程其实是启动DefaultMQPushConsumer的过程,也即此时会调用rocketmq的this.defaultMQPushConsumerImpl.start();完成对消费者的启动。启动消费者完成后,等待消费。根据消息是否需要顺序消费,分为顺序消费和并发消费。此时都会调用同一个 handleMessage(messageExt)来处理消息,此时可以看到分为两种:rocketMQListenerrocketMQReplyListener此时根据当前选择的监听器来做对应业务实现处理,执行消费。这里可以看到rocketmq-spring对litepull模式的消息没有demo。需要去rocketmq项目找例子看具体的过程。1.首先我们可以从rocketmq-spring-boot-sample中看到整合还是很方便的。下面我们来看一下example:二、生产者生产者demo:@SpringBootApplicationpublic class ProducerACLApplication implements CommandLineRunner { @Override public void run(String... args) throws Exception { // 发送消息 SendResult sendResult = rocketMQTemplate.syncSend(springTopic + ":acl", "Hello, ACL Msg!"); System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult); // 使用spring message发送消息 sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message & ACL Msg").build()); System.out.printf("syncSend2 to topic %s sendResult=%s %n", springTopic, sendResult); //发送事务消息 testTransaction(); }从demo中,我们可以看到通常分为三类:使用rocketmq模板发送消费使用springMessage发送消息发送事务消息 同时spring自动注入后,其实是可以通过RocketMQProperties拿到的topic信息有限,因此这里采用了值注入的方式,方便放入多个topic,从而实现对多个业务系统对接消息。三、消费者同样,也是配置好需要消费的信息后,执行监听器或者使用定时任务@SpringBootApplicationpublic class ConsumerACLApplication implements CommandLineRunner { @Resource private RocketMQTemplate rocketMQTemplate; @Override public void run(String... args) throws Exception { List<String> messages = rocketMQTemplate.receive(String.class); System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages); }}或者采用监听的方式:@Service@RocketMQMessageListener(topic = "normal_topic_define_in_cloud_MQ",consumerGroup = "group_define_in_cloud_MQ",accessKey = "AK", secretKey = "SK")public class ACLStringConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.printf("------- ACL StringConsumer received: %s \n", message); }} 配置和RocketMQProperties里面的配置名称一样,它就可以帮你读到对应的配置,并进行相关bean的注入,可以在RocketMQAutoConfiguration可以看到对应的配置信息。比如此时业务系统是消费端的时候,此时会将对应的配置信息注入到defaultLitePullConsumer,相关参数,来源于rocketMQProperties和consumerConfig中: pullConsumer 拉取消费者 nameServer nameServer地址 group 消费组 topic 主题 accessChannel 通道 messageModel 消息模式,分为广播、集群 selectorType 筛选器类型,分为sql92、tag accessKey key secretKey 密钥 pullBatchSize 拉取批次大小 tlsEnable 是否开启tls enableMsgTrace是否开启msgTrace customizedTraceTopic 自定义链路主题 namespace 命名空间 instanceName 实例名称 RocketMQListenerConfiguration会注册相关的监听器后置处理器。可以看到后置处理器RocketMQMessageListenerBeanPostProcessor中会执行后置处理方法postProcessAfterInitialization对带RocketMQMessageListener注解的class进行增强,同时在监听容器配置中注册容器: @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> targetClass = AopUtils.getTargetClass(bean); RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class); if (ann != null) { // 执行注解增强 RocketMQMessageListener enhance = enhance(targetClass, ann); //如果监听容器配置不为空,则注册容器 if (listenerContainerConfiguration != null) { listenerContainerConfiguration.registerContainer(beanName, bean, enhance); } } return bean; }注册完容器后,就可以启动容器了,因此可以在org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration#registerContainer方法中看到容器启动: container.start();也即此时我们可以看到容器是默认rocketmq监听器容器DefaultRocketMQListenerContainer启动的是默认推模式的消费DefaultMQPushConsumer,而启动的时候其实就是调用rocketmq中的消费者方法: this.defaultMQPushConsumerImpl.start();启动消费者,等待消费。推消费分为两种,可以从消费模式种看到: 在默认mq监听容器DefaultRocketMQListenerContainer中可以看到在初始化rocketmq推模式消费 initRocketMQPushConsumer,这个方法很重要,会通过onMessage执行我们写的监听方法 switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}根据消费模式分为两种,一种是按照顺序消费,一种是并发消费,可以看到里面有一个重要方法: handleMessage(messageExt);我们可以看到这个方法: private void handleMessage( MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException { //不带响应 if (rocketMQListener != null) { //执行rocketmq监听 处理消息 rocketMQListener.onMessage(doConvertMessage(messageExt)); } else if (rocketMQReplyListener != null) { //有响应,消费消息 Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt)); Message<?> message = MessageBuilder.withPayload(replyContent).build(); org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message)); //获取生产者,执行生产者发送回调 DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer(); producer.setSendMsgTimeout(replyTimeout); producer.send(replyMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { if (sendResult.getSendStatus() != SendStatus.SEND_OK) { log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus()); } else { log.debug("Consumer replies message success."); } } @Override public void onException(Throwable e) { log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage()); } }); } }如果监听器rocketMQListener存在,则执行消费消息,也即onMessage的操作,执行业务系统的onMessage接口的实现。如果响应监听器rocketMQReplyListener存在,则获取响应信息,MessageBuilder构建Message对象信息,此时的消息类型是Spring Message类型的,需要转成Rocketmq自己的类型的。从默认mq推消费者实现中获取默认的mq生产者,执行生产者发送回调。从这个接口的注释可以看到:/** * The consumer supporting request-reply should implement this interface. * * @param <T> the type of data received by the listener * @param <R> the type of data replying to producer */public interface RocketMQReplyListener<T, R> { /** * @param message data received by the listener * @return data replying to producer */ R onMessage(T message);}四、ExtRocketMQTemplate在提供的demo中,出现了ExtRocketMQTemplate这个类,用原来的RocketMQTemplate不行吗,为啥会有这个类呢?@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer", tlsEnable = "${demo.ext.consumer.tlsEnable}")public class ExtRocketMQTemplate extends RocketMQTemplate {}可以看到这个类上面的配置@ExtRocketMQConsumerConfiguration String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String GROUP_PLACEHOLDER = "${rocketmq.pull-consumer.group:}"; String TOPIC_PLACEHOLDER = "${rocketmq.pull-consumer.topic:}"; String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.pull-consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.pull-consumer.secret-key:}"; String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.pull-consumer.customized-trace-topic:}";可以看到如果有多个消息集群的时候,这里可以方便我们替换rocketmq的地址信息,从而实现对多服务的消费。也即业务系统可能出现对接多个消息,其消息的topic可能不同,方便对接,因此其注解上面可以可以方便替换成想要的rocketmq的地址信息。可以看到三个重置方法:ExtProducerResetConfigurationExtConsumerResetConfiguration都实现了SmartInitializingSingleton,其会在bean加载完成后执行。五、RocketMQTemplate在RocketMQTemplate中,我们看到的是大量的发送方法,主要包括这几类方法:同步发送异步发送只发送不管消费延迟消息 发送事务消息发送根据顺序、延迟程度、是否批量进一步分为下面这些方法:sendsendAndReceivesyncSendsyncSendDelayTimeSecondssyncSendDelayTimeMillssyncSendDeliverTimeMillssyncSendOrderlyasyncSendasyncSendOrderlysendOneWaysendOneWayOrderlysendMessageInTransaction
sentinel作为限流降级的流量防卫兵,类似于Hystix。一、SPI的使用场景通常spi可以配合责任链模式、策略模式使用。此时spi,类似于一个上下文的过程,拿到所有的实现class。与之类似的还有:SpringUtil.getBeans或者ApplicationContext.getBeansOfType。二、责任链模式的使用场景在一些固定的场景下,处理业务的流程过程通常比较明确,第一步做什么,第二步做什么,第三步做什么的时候,这个时候就可以使用责任链模式在处理。也即存在固定的模式的时候,就可以使用。同时在Netty、Sentinel中有重要使用场景,其过程是一个pipeline流水线的过程。三、sentinel是如何实现相关责任链模式的组装的呢?首先通过 entry = SphU.entry(KEY);进入到com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder#build这个方法,进行加载,进行实列化。这个过程中,包括两个过程加载load和创建实列化的过程。而这个load的过程就是执行spi获取slot列表的过程。完成之后,就可以通过链拿到entry,也即 chain.entry(context, resourceWrapper, null, count, prioritized, args);拿到链路信息,然后执行entry入口操作,进入到fireEntry。这个过程会进入入链过程,这样就可以将所有的slot串联起来。四、代码中的实现过程执行里面的请求方法:可以看到入口方法:entry = SphU.entry(KEY);进行责任链骨架实列化:List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();执行spi操作: /** * Load all Provider instances of the specified Service, sorted by order value in class's {@link Spi} annotation * * @return Sorted Provider instances list */ public List<S> loadInstanceListSorted() { load(); return createInstanceList(sortedClassList); }可以看到load的过程是会将slot进行加载:com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlotcom.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlotcom.alibaba.csp.sentinel.slots.logger.LogSlotcom.alibaba.csp.sentinel.slots.statistic.StatisticSlotcom.alibaba.csp.sentinel.slots.block.authority.AuthoritySlotcom.alibaba.csp.sentinel.slots.system.SystemSlotcom.alibaba.csp.sentinel.slots.block.flow.FlowSlotcom.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot然后创建实列列表,创建实列的过程其实是一个double check的过程。private S createInstance(Class<? extends S> clazz, boolean singleton) { S instance = null; try { if (singleton) { instance = singletonMap.get(clazz.getName()); if (instance == null) { synchronized (this) { instance = singletonMap.get(clazz.getName()); if (instance == null) { instance = service.cast(clazz.newInstance()); singletonMap.put(clazz.getName(), instance); } } } } else { instance = service.cast(clazz.newInstance()); } } catch (Throwable e) { fail(clazz.getName() + " could not be instantiated"); } return instance; }可以看到相关的slot,分为两类:第一类是负责资源指标数据统计的ProcessorSlot:com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlotcom.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot第二类是实现限流、熔断、降级的ProcessorSlot:com.alibaba.csp.sentinel.slots.logger.LogSlotcom.alibaba.csp.sentinel.slots.statistic.StatisticSlotcom.alibaba.csp.sentinel.slots.block.authority.AuthoritySlotcom.alibaba.csp.sentinel.slots.system.SystemSlotcom.alibaba.csp.sentinel.slots.block.flow.FlowSlotcom.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot其中:NodeSelectorSlot必须在ClusterBuilderSlot之前实列化,否则的话,会报错。同时我们可以看到slot是一个单向链表,而Entry则是一个双向链表,entry可进可退。既然是链表,那么必然有一个前驱节点和后继节点之间会产生联系:public interface ProcessorSlot<T> { void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable; void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable; void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);}因此可以看到SlotChain产生的联系:entry->fireEntryexit->fireExit同时可以看到fireEntry中的方法: @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { //next的过程 if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } }fireEntry的过程即是一个next的过程。那么节点的数据又在哪里呢? chainMap,从lookProcessChain可以看到答案,存在内存中。 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { //通过资源拿到处理器槽链,如果链为空,则double check ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 创建一个新的槽链 chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }数据结构为:private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();这样的话,整个骨架就可以通过entry搭起来了。与之类似的,可以实现其功能的还有:SpringUtil.getBeans或者ApplicationContext.getBeansOfType。为啥不使用这个呢?这个不是更方便吗?原因在于,如果使用这个,必须引进Spring框架。同时为了轻量级一些,方便维护。五、功能完成这个过程后,我们可以看到页面上的相关指标:没有流控前:流控后:参考书籍:吴就业 实战Alibaba Sentinel:深度解析微服务高并发流量治理
如果要实现一个自定义的starter,首先需要引入两个依赖spring-boot的jar包:spring-boot-autoconfigure和spring-boot-configuration-processor:<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>在resource中新建META-INF文件夹,创建spring.factories,比如:#定义自动装配的类=>RedissonCofiguration org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.study.configuration.RedissonConfiguration同时你的starter的名称:<groupId>com.study</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>1.0.0</version>同时需要基于存在某种条件才进行装配时,可以使用@ContionOnClass.@Target({ ElementType.TYPE, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @Conditional(OnClassCondition.class) public @interface ConditionalOnClass { Class<?>[] value() default {}; String[] name() default {}; }从源码上看,只有在classpath下能找到你需要的conditionOnClass类才会构建这个bean。比如你想写一个redisson的自动装配:/** * redisson配置:配置、自动配置、配置条件 */ @Configuration @EnableConfigurationProperties(value = RedissonProperties.class) @ConditionalOnClass(RedissonProperties.class) public class RedissonConfiguration { }使用@ConditionOnMissingBean:@ConditionalOnMissingBean,它是修饰bean的一个注解,主要实现的是,当你的bean被注册之后,如果而注册相同类型的bean,就不会成功,它会保证你的bean只有一个,即你的实例只有一个,当你注册多个相同的bean时,会出现异常,以此来告诉开发人员。通常的相关注解:@ConditionalOnBean // 当给定的bean存在时,则实例化当前Bean @ConditionalOnMissingBean // 当给定的bean不存在时,则实例化当前Bean @ConditionalOnClass // 当给定的类名在类路径上存在,则实例化当前Bean @ConditionalOnMissingClass // 当给定的类名在类路径上不存在,则实例化当前Bean创建RessionClient的方式:基于对应的方式创建客户端,因为Ression有很多模式,哨兵、主从、单例、集群、云托管模式,拿到对应的模式的配置后,创建对应的客户端bean:@Bean @ConditionalOnMissingBean(RedissonClient.class) public RedissonClient redissonClient() { // 创建Redisson客户端对象对象 return Redisson.create(config); }同时还需要一些特定的信息:相关bean:lockAop分布式锁、MQAop 发送AOP、RedissonBinary 操作对象二进制、RedissonObject操作对象、RedissonCollection操作集合、RedissonClient重要,此时就可以基于@ConditionOnMissBean的方式进行创建,从而实现自定装配。此时可以基于分布式锁Aop切面来做拦截,对分布式锁进行增强操作,也即对当前拿到的锁信息进行判断。对锁的模式进行判断,如果当前的锁模式为自动的,则此时根据你所的key进行判断,如果keys的长度>1,则使用红锁,否者使用可重入式锁。 如果锁模式不是 联锁 && 红锁 &&长度大于1,此时会抛异常如果是公平锁,则直接处理,如果是红锁,则需要变量keys,对锁进行添加到RLock中,对锁进行遍历,添加到数组中,然后将其重新赋值给红锁,否者放入到可重入式锁,或者读锁或者写锁中。执行aop.那可自动装配又是怎样实现的呢?其关键在于@SpringBootApplication这个注解上,这个组件是一个组合注解:@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @SpringBootConfiguration //springboot配置注解 @EnableAutoConfiguration //可以自动注入配置 @ComponentScan(excludeFilters = { @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class), @Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) }) //进行组件扫描,同时排掉过滤信息 public @interface SpringBootApplication { // 省略代码,主要包含的方法:排掉特定自动注入的配置,通过名称或者类方式,进行基包扫描、或者classes、代理bean方法等 }同时我们可以看到SpringBootApplication里面的所有方法,都使用了一个注解@AliasFor。那这个组件有什么用呢?这个注解用于桥接到其它注解,该注解的属性中指定的所桥接的注解类,减少用户使用多注解带来的麻烦。其关键就在@EnableAutoConfiguration这个注解中,这个注解里面有一个@Import注解,里面这个类自动配置导入选择器类:@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @AutoConfigurationPackage @Import(AutoConfigurationImportSelector.class) public @interface EnableAutoConfiguration { String ENABLED_OVERRIDE_PROPERTY = "spring.boot.enableautoconfiguration"; Class<?>[] exclude() default {}; String[] excludeName() default {}; }如果我们想使用xml进行配置的话,此时在springboot启动的时候可以使用@Import将配置进行导入,实现配置注入的目的。而AutoConfigurationImportSelector实现了DeferredImportSelector延迟导入选择器,也即ImportSelector的子类。那ImportSelector里面有什么方法呢?里面有两个方法一个是选择导入的方法、一个是排掉过滤的方法,下面可以看到选择导入方法的入参是导入类元数据。public interface ImportSelector { String[] selectImports(AnnotationMetadata importingClassMetadata); }那这个选择导入的过程又是怎样的呢?SpringFactoriesLoader加载器加载指定ClassLoader下面的所有的META-INF/spring.factories文件,并将文件解析内容存在Map中。然后通过loadFactoryNames传递过来的class的名称从map中获取该类的配置列表。通过Set集合进行去重操作。执行过滤组件操作,而这些操作都是在AutoConfigurationImportFilter接口下的组件实现的,也即FilterSpringBootCondition实现抽象类的。下面有OnBeanCondition、OnClassCondition、OnWebApplicationCondition的getOutcomes方法。执行fire操作,fireAutoConfigurationImportEvents,此时会执行事件注册。而其重要的方法就是getAutoConfigurationEntry就是自动装配的重点。
mybatis的执行的大概过程:首先需要有sqlSessionFactroy,然后通过sqlSessionFactory拿到sqlSession,然后通过sqlSession调用getMapper拿到代理的接口,然后拿到代理的接口的信息mapperInterface,从而找到需要执行的具体的方法中的sql方法,,如果执行过,同时没有发生改变的话,则直接返回结果,否则会进行更新,同时如果执行过的话,会直接返回结果,此时会看到methodCache中有我们执行过的方法。mybatis中,如果想进行sql的拦截,需要对其基于interceptor做拦截。因为mybatis的执行中,我们需要获取它的boundSql,而获取boundSql,需要获取MappedStatement,而MappedStatement可以在StatementHandler语句处理器中找到,因此可以在此基础上获取,通过反射的方式获取,此时可以用到插件模块中的invocation对象获取,然后对其进行增强。基于interceptor可以实现sql的完整打印,除了实现打印之外。其实还可以实现分页和排序,下面的分页和排序基于aop+mybatis的interceptor实现。其本质还是对mappedStament的boundSql进行增强。下面的项目来源于github,通过这个我们可以很好的学习mybatis中插件interceptor的使用。首先定义分页元注解:/** * 自定义分页注解 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Limit { /** * 当前页面 * * @return */ int page() default 0; /** * 每页显示数量 * * @return */ int pageSize() default 10; }定义排序元注解:/** * 自定义排序注解 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface OrderBy { /** * 表的别名 */ String tableAlias() default ""; /** * 排序字段 */ String orderColumn() default ""; /** * ASC/DESC 默认倒序 * @return */ boolean isAsc() default false; }定义基础切面处理类:/** * @ClassName BaseAspectAbstract * @Description 基础切面处理类,每一个Spring的切面都需要去继承此抽象类 * @Date **/ public abstract class BaseAspectAbstract { private static TreeMap<Integer, SQLLanguage> CONTAINERS = new TreeMap<>(); // 放入sql 切点、sql类型、sql public void putSQL(JoinPoint point, SQLEnums sqlEnums, SQLLanguage sqlLanguage) { CONTAINERS.put(sqlEnums.getId(), sqlLanguage); // 获取方法里的参数 Object parmas = point.getArgs()[0]; Map map = (Map)parmas; map.put("SQL", getSQL()); } public TreeMap<Integer, SQLLanguage> getSQL() { return CONTAINERS; } }进行分页切面:@Component @Aspect @Order(3) //拼接sql时的顺序 public class LimitAspect extends BaseAspectAbstract { @Pointcut("@annotation(com.mybatis.interceptor.annotation.Limit)") public void limitCut() {} @Before("limitCut()") public void limit(JoinPoint point) { StringBuilder limitBuilder = new StringBuilder(" LIMIT "); MethodSignature methodSignature = (MethodSignature)point.getSignature(); // 获得对应注解 Limit limit = methodSignature.getMethod().getAnnotation(Limit.class); if (!StringUtils.isEmpty(limit)) { limitBuilder.append(limit.page()).append(",").append(limit.pageSize()); putSQL(point, LIMIT, new SQLLanguage(limitBuilder.toString())); } } }进行排序切面:@Component @Aspect @Order(2) public class OrderByAspect extends BaseAspectAbstract { // 切点:对注解中的特定注解进行拦截,进行增强 @Pointcut("@annotation(com.mybatis.interceptor.annotation.OrderBy)") public void orderByCut() {} // 执行切点操作,将其进行增强,放入排序 @Before("orderByCut()") public void orderBy(JoinPoint point) { StringBuilder orderByBuilder = new StringBuilder(" ORDER BY "); MethodSignature methodSignature = (MethodSignature)point.getSignature(); // 获得对应注解 OrderBy orderBy = methodSignature.getMethod().getAnnotation(OrderBy.class); if (!StringUtils.isEmpty(orderBy)) { String sort = orderBy.isAsc() ? " asc " : " desc"; orderByBuilder.append(orderBy.orderColumn()).append(sort); putSQL(point, ORDERBY, new SQLLanguage(orderByBuilder.toString())); } } }枚举sql顺序:/** * sql类型枚举 */ public enum SQLEnums { /** * 数字越靠前 则拼接SQL语句越靠前执行,目前拼接顺序为 * SELECT * FROM table GROUP BY ORDER BY xxx LIMIT 0, 10 */ LIKE(1, "LIKE"), GROUPBY(2, "GROUP BY"), ORDERBY(3, "ORDER BY"), LIMIT(4, "LIMIT"); private int id; private String condition; SQLEnums(int id, String condition) { this.id = id; this.condition = condition; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getCondition() { return condition; } public void setCondition(String condition) { this.condition = condition; } }执行sql增强的注解放置在serviceImpl里面:@RequestMapping("/nba") public class PlayController { @Autowired private PlayerService playerService; @RequestMapping("/player") public List<Player> getList(Map<String, Object> params) { List<Player> players = playerService.getList(params); return players; } }执行sql增强:@Service public class PlayerServiceImpl implements PlayerService { @Autowired private PlayerMapper playerMapper; // 加入自定义注解,方便切点进行增强 @OrderBy(orderColumn = "height") @Limit() @Override public List<Player> getList(Map<String, Object> params) { return playerMapper.getList(params); } }执行到这里会执行动态代理,然后执行sql拦截。执行sql拦截:@Intercepts(@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})) @Component public class DataFilterInterceptor extends AbstractSqlParserHandler implements Interceptor { // 拦截器 @Override public Object intercept(Invocation invocation) throws Throwable { StatementHandler statementHandler = PluginUtils.realTarget(invocation.getTarget()); MetaObject metaObject = SystemMetaObject.forObject(statementHandler); // SQLLanguage 解析 sqlParser(metaObject); // 非查询操作 MappedStatement mappedStatement = (MappedStatement)metaObject.getValue("delegate.mappedStatement"); if (!SqlCommandType.SELECT.equals(mappedStatement.getSqlCommandType())) { return invocation.proceed(); } // 取出原始SQL 取出参数 BoundSql boundSql = (BoundSql)metaObject.getValue("delegate.boundSql"); String dataSql = boundSql.getSql(); Object paramObj = boundSql.getParameterObject(); Map map = (Map)paramObj; String sqlLanguage = getSQLLanguage(map); String sql = dataSql + sqlLanguage; // 重写sql metaObject.setValue("delegate.boundSql.sql", sql); return invocation.proceed(); } // 插件 @Override public Object plugin(Object target) { if (target instanceof StatementHandler) { return Plugin.wrap(target, this); } return target; } @Override public void setProperties(Properties properties) { } // 获取sql语句 private String getSQLLanguage(Map<String, Object> map) { TreeMap<Integer, SQLLanguage> sqlMap = (TreeMap)map.get("SQL"); StringBuilder sqlBuilder = new StringBuilder(); for (Map.Entry treeMap : sqlMap.entrySet()) { SQLLanguage sql = (SQLLanguage)treeMap.getValue(); if (null != sql) { sqlBuilder.append(sql); } } return sqlBuilder.toString(); } }
内容来源于alibaba的开源项目ageiport,场景:如果需要基于集群节点进行均摊,对数据进行处理分而治之的话,就可以采用。//均摊分配 public static <T> List<List<T>> averageAssign(List<T> source, int n) { List<List<T>> result = new ArrayList<>(); //对当前的大小进行取模 int remaider = source.size() % n; //对当前大小取整 int number = source.size() / n; //偏移量 int offset = 0; //对集群节点进行遍历 for (int i = 0; i < n; i++) { List<T> value; //如果模大于0,则对任务列表进行部分获取 List<E> subList(int fromIndex, int toIndex); if (remaider > 0) { value = source.subList(i * number + offset, (i + 1) * number + offset + 1); remaider--; offset++; } else { value = source.subList(i * number + offset, (i + 1) * number + offset); } result.add(value); } return result; }测试://测试均摊算法 public static void main(String[] args) { //子任务列表 List<Integer> subTaskNos = new ArrayList<>(); subTaskNos.add(1); subTaskNos.add(2); subTaskNos.add(3); subTaskNos.add(4); subTaskNos.add(5); subTaskNos.add(6); subTaskNos.add(7); subTaskNos.add(8); subTaskNos.add(9); subTaskNos.add(10); subTaskNos.add(11); subTaskNos.add(12); //当前的集群节点 Integer nodeCount = 5; //执行均摊 List<List<Integer>> subTaskAvgByNodeCount = Lists.averageAssign(subTaskNos, nodeCount); System.out.println(subTaskAvgByNodeCount); }测试结果:[1, 2, 3], [4, 5, 6], [7, 8], [9, 10], [11, 12]]这个代码设计很巧妙,使用了取模和取余来实现对任务分发到不同的机器上,这样一来处理任务的时候,就可以快速完成任务的处理了。通常适应于大批量数据的处理。类似的思想:jdk中的fork-join也是分而治之的思想。
一、问题场景想实现一个轻量级的延迟队列,此时可以考虑基于Redis来实现,如果当前的基础设施不是阿里云Mq,开源的RocketMQ只有18个等级,1ms~2h的18个等级。当然商业版的阿里云可以实现精度的延迟。二、基于redis实现延迟队列那如果基于redis实现延迟队列。首先需要考虑:业务系统调用Api:sendMsg(topic, msgId, msg, delaySeconds, TimeUnit.SECONDS, ttlSeconds, TimeUnit.SECONDS, maxRetry)相关参数说明:主题:topic消息体:msgdelaySeconds:延迟时间TimeUnit.SECONDS:延迟单位ttlSeconds:过期时间TimeUnit.SECONDS:过期单位maxRetry:重试次数使用原因:如果发送失败,需要执行重试。发送的时候,需要考虑实时和延迟的情况的处理。如果是实时的,如何处理?如果是延迟的,又如何处理?如果实时的,则可以先将消息存到Redis中,然后执行操作,基于事件,发布需要生产的消息,然后订阅需要消费的消息,执行消费。如果是非实时的话,需要基于定时任务触发器触发,触发当前的延迟队列消息,如果到了需要发送的时候,执行发送。此时的操作基于扫描定时任务和事件触发。而且获取消息的时候,需要考虑基于原子操作实现,也即可以基于lua脚本实现。三、实现数据结构而需要实现对消息和延迟的实现,可以考虑基于zset数据结构实现。其中score可以作为延迟时间。client.zadd(key, score, member);而对应实时的消息,可以考虑lpush操作 即可client.lpush(key, strings);获取可以基于eval实现client.eval(script, toByteArray(keyCount), params);四、具体实现时序图
由于最近的需求需要用到activiti审批流,因此对审批流的相关内容进行了一些了解。一、工作流生命周期一个完整的工作流生命周期会经过5步,并且迭代循环。定义:工作流生命周期总是从流程定义开始。这个过程包括收集需求,将其转化成流程定义,也就流程图、相关变量、角色定义。发布:由开发人员打包各种资源,然后在系统管理中发布流程定义。包括:bpmn.xml、自定义表单、任务监听类等。执行:具体的流程引擎按照事先定义的流程处理路线以任务驱动的方式执行业务流程。监控:此阶段是依赖执行阶段。业务人员在办理任务的同时收集每个任务(Task)的结果,然后根据结果做出相应处理。优化:在此阶段,一个完整的流程已经结束,或许能满足业务需求,或者需要优化。二、引擎Service接口Activiti引擎提供了七大Service接口,都可以通过ProcessEngine获取到,并且支持链式Api编程风格。相关Service:RepositoryService:用于管理流程仓库,如部署、删除、读取流程资源等。RuntimeService:运行时service,可以处理所有正在运行状态的流程实列、任务等。TaskService:任务Service,用于管理、查询任务,如:签收、办理、指派等。ManagementService:引擎管理Service,和具体的业务无关,主要是可以查询引擎配置、数据库、作业等。HistoryService:历史Service,可以查询所有历史数据,如:流程实列、任务、活动、变量、附件等。FormService:表单Service,用于读取和流程、任务相关的表单数据IdentifyService:身份Service,可以管理和查询用户、组之间的关系。目前新版本,FormService和IdentifyService已经删除。三、基于Spring Security实现权限控制public interface UserDetailsService { UserDetails loadUserByUsername(String username) throws UsernameNotFoundException; }可以从Activiti的example中看到,需要基于Spring Security实现角色和用户的权限控制。@Component public class SecurityUtil { private Logger logger = LoggerFactory.getLogger(SecurityUtil.class); @Autowired private UserDetailsService userDetailsService; public void logInAs(String username) { UserDetails user = userDetailsService.loadUserByUsername(username); if (user == null) { throw new IllegalStateException("User " + username + " doesn't exist, please provide a valid user"); } logger.info("> Logged in as: " + username); SecurityContextHolder.setContext(new SecurityContextImpl(new Authentication() { @Override public Collection<? extends GrantedAuthority> getAuthorities() { return user.getAuthorities(); } @Override public Object getCredentials() { return user.getPassword(); } @Override public Object getDetails() { return user; } @Override public Object getPrincipal() { return user; } @Override public boolean isAuthenticated() { return true; } @Override public void setAuthenticated(boolean isAuthenticated) throws IllegalArgumentException { } @Override public String getName() { return user.getUsername(); } })); org.activiti.engine.impl.identity.Authentication.setAuthenticatedUserId(username); } }可以看到这个工具类中,需要获取用户的信息和角色信息,getAuthorities则是获取角色信息。public String getAuthority() { return this.role; }因此获取角色和用户信息,需要将这个进行修改,使用认证中心的用户信息和角色信息。四、设计流程信息需要在IDE中安装插件才可以进行流程可视化的设计,如果是idea,需要安装actiBPM插件。看到这样的设计流程效果,可以将其后缀改成.xml文件,将其部署起来。五、启动流程和完成流程启动流程需要用到:ProcessInstance startProcessInstanceByKeyAndTenantId(String processDefinitionKey, String businessKey, Map<String, Object> variables, String tenantId);启动流程后,需要处理流程:void complete(String taskId, Map<String, Object> variables, boolean localScope);处理任务的过程中会涉及到用户拾取任务void claim(String taskId, String userId);
一、如果要实现一个动态线程池,如何实现?1)如果要实现一个动态线程池,首先需要考虑的是将线程池的相关配置信息外置。这样出现问题的时候,能够基于配置修改,实现热部署。修改配置后,就能生效。因此,可以考虑的配置方式有多种:nacos、apollo、zookeeper、consul、etcd等。2)如果线程池出现问题或者完成修改后,能够基于监控的信息,进行通知和告警。这样就需要考虑通知和告警的方式的多样性:比如基于钉钉、微信、飞书、电子邮件等渠道进行通知和告警。二、 dynamic-tp动态线程池的实现思路1.事件发布根据引入的dynamic-tp-spring-cloud-starter-nacos或者dynamic-tp-spring-boot-starter-nacos依赖以nacos为例:<dependency> <groupId>cn.dynamictp</groupId> <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId> <version>1.0.9</version> </dependency>可以看到自动装配的文件:DtpAutoConfiguration与NacosRefresher.在DtpAutoConfiguration中,我们可以看到导入的配置信息:@ImportAutoConfiguration({BaseBeanAutoConfiguration.class})基于这个注解,关注BaseBeanAutoConfiguration这个类:这个类主要干了下面这几件事请DtpProperties 线程池相关配置 上下文holder dtpApplicationContextHolder dtpBanner打印 dtpBannerPrinter dtp后置处理器 dtpPostProcessor dtp注册 dtpRegistry dtp监控 dtpMonitor dtpEndpoint dtpEndpoint其中1)dtpBanner是做控制台启动项目时的banner打印操作。2)dtpPostProcessor dtp后置处理器 处理所有相关bean如果bean是执行器,则注册dtp,此时会注册到DTP_REGISTRY 中, 数据结构:Map否则会基于ApplicationHolder拿到基于DynamicTp注解的class,如果当前基于DynamicTp的methodMetadata 为空,则返回bean,否则拿到dtpAnnotationVal。poolName也即dtpAnnotationVal。如果当前的bean属于线程池任务执行器,则注册task执行器。包装执行器,放入通知信息notifyItems。registerCommon 执行注册。3)dtpRegistry 注册dtp DTP_REGISTRY 数据结构:Map其中最为重要的方法是刷新方法:获取dtp执行器,对执行器进行转换为DtpMainProp。执行刷新。4)dtp监控 dtpMonitor会执行监控发布:里面有有2个方法需要注意:检查监控 checkAlarmcollect 收集监控指标信息其中:检查监控的时候,会基于当前的发送告警的信息:基于对应的渠道进行消息发送。此时会发布两个事件:publishAlarmCheckEvent、publishCollectEventNacosRefresher中存在的方法Refresh:刷新当监听到配置发生改变的时候,doNoticeAsync 执行异步通知,通知业务方,此时发生了配置的变更。刷新完成后,执行RefreshEvent刷新事件发布。2.事件监听发布完成后,可以看到对应的监听是在com.dtp.starter.adapter.common.autoconfigure.AdapterCommonAutoConfiguration适配器公共自动装配中@Override public void onApplicationEvent(@NonNull ApplicationEvent event) { try { if (event instanceof RefreshEvent) { doRefresh(((RefreshEvent) event).getDtpProperties()); } else if (event instanceof CollectEvent) { doCollect(((CollectEvent) event).getDtpProperties()); } else if (event instanceof AlarmCheckEvent) { doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties()); } } catch (Exception e) { log.error("DynamicTp adapter, event handle failed.", e); } }可以看到我们关心的三个发布事件,都在此进行了监听:doRefresh、doCollect、doAlarmCheck其中刷新事件会执行相关渠道的通知收集日志会执行对应的打印告警信息会执行告警三、使用使用方式:以nacos为例,可以看到其基于@EnableDynamicTp实现对dtp相关bean的注册。DtpBeanDefinitionRegistrar即是完成注册的类。其主要是创建dtp配置对象DtpProperties,绑定dtp配置,获取执行器。拿到执行器后,遍历执行,绑定对应的信息,构建构造函数,注册bean信息。方便后续对线程池的操作。@Resource private ThreadPoolExecutor dtpExecutor1; @GetMapping("/dtp-nacos-example/test") public String test() throws InterruptedException { task(); return "success"; } //获取dtp执行器 public void task() throws InterruptedException { //获取dtp执行器 DtpExecutor dtpExecutor2 = DtpRegistry.getDtpExecutor("dtpExecutor2"); for (int i = 0; i < 100; i++) { Thread.sleep(100); dtpExecutor1.execute(() -> { log.info("i am dynamic-tp-test-1 task"); }); dtpExecutor2.execute(NamedRunnable.of(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("i am dynamic-tp-test-2 task"); }, "task-" + i)); } }由此可以看到实现了两个最为主要的功能:对线程池进行动态变更和对线程池的监控告警。使用了观察者模式、适配器模式。
下面的内容基于https://github.com/alibaba/COLA。COLA 是 Clean Object-Oriented and Layered Architecture的缩写,代表“整洁面向对象分层架构”。 目前COLA已经发展到COLA v4。一、如何实现一个状态机? 首先需要考虑涉及到哪些状态节点和哪些事件,如何方便状态节点的获取、状态节点如何串联起来呢?串联的方式下,如何拿到下一个状态节点?如果基于角色,如何实现?我们知道工作流可以实现基于角色进行流程的流转,但是此时我们涉及到事件和状态,会出现多个分支,如果使用工作流实现,流程处理上,比如activiti上,可能比较复杂,因此考虑比较轻量级的状态机来实现的话,相对来说要方便一些。1)相关状态初始化,比如:STATE1, STATE2, STATE3, STATE42)相关事件:比如:EVENT1, EVENT2, EVENT3, EVENT4, INTERNAL_EVENT3)状态节点上下文:context,主要包括状态节点类型、状态节点构建状态节点分支相关接口信息:From 从哪个状态节点开始To 需要到的目标状态节点When 定义过渡期间要执行的操作 performCondition 满足条件时,可以从from到to状态节点因此可以想到我们需要构建状态机的构建器必然需要:StateMachineBuilder 状态机构建器数据结构:状态、事件方便构建对应的transition其中transition中包括的方法:构建方法:build指定初始状态 initialState内部过渡: internalTransition外部过渡:externalTransition外部过渡列表:external Transitions没有匹配策略:noMatchStrategyStateMachineFactory 状态机工厂涉及的方法:构建状态机注册状态机展示状态机展示状态机uml节点操作:基于node的数据结构进行构建,配合使用from、to、condtion、linkwith设置对应的布局使用状态的步骤:创建stateMachineBuilder对象基于条件进行状态节点指向构建 initiaState、from、to、on、when、perform基于状态机id进行构建 stateMachine执行fireEvent操作,fire的过程中拿到下一个状态节点setNextState(source,request)二、状态机信息流程可以参考COLA里面的test,可以看到COLA的具体代码实现。三、展示状态机信息效果总体来说cola的状态机还是蛮实用的。基于cola的状态机还可以实现复杂的状态-事件流转。如下图所示:四、状态机的使用场景审批流程、订单状态流转等。
2023年03月
2023年02月
2023年01月
2022年12月