
spring-data-redis中JedisCluster不支持pipelined问题解决 引言了解Jedis的童鞋可能清楚,Jedis中JedisCluster是不支持pipeline操作的,如果使用了redis集群,在spring-boot-starter-data-redis中又正好用到的pipeline,那么会接收到Pipeline is currently not supported for JedisClusterConnection.这样的报错。错误来自于org.springframework.data.redis.connection.jedis.JedisClusterConnection: /* * (non-Javadoc) * @see org.springframework.data.redis.connection.RedisConnection#openPipeline() */ @Override public void openPipeline() { throw new UnsupportedOperationException("Pipeline is currently not supported for JedisClusterConnection."); } org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration会帮我们自动配置,无论你redis使用的是standalone、sentinel、cluster配置。这个源码很容易理解,读者可自行阅读,不理解的可以一起讨论。 Lettuce中的pipelinespring boot 2.0开始,配置spring-boot-starter-data-redis将不依赖Jedis,而是依赖Lettuce,在Lettuce中,redis cluster使用pipeline不会有问题。 知识储备再往下看可能需要读者具备如下的能力: redis cluster hash slotJedisCluster & Jedis的关系pipeline和*mset等命令的区别哈希槽(hash slot)redis cluster一共有16384个桶(hash slot),用来装数据,建立集群的时候每个集群节点会负责一些slot的数据存储,比如我负责0-1000,你负责1001-2000,他负责2001-3000……数据存储时,每个key在存入redis cluster前,会利用CRC16计算出一个值,这个值就是对应redis cluster的hash slot,就知道这个key会被放到哪个服务器上了。 参考文档:Redis 集群教程Redis 集群规范 JedisCluster & Jedis的关系JedisCluster本质上是使用Jedis来和redis集群进行打交道的,具体过程是: 获取该key的slot值:JedisClusterCRC16.getSlot(key)从JedisClusterConnectionHandler实例中获取到该slot对应的Jedis实例:Jedis connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));利用connection操作。pipeline和*mset等命令的区别redis提供了mset,hmset之类的命令,或者说集合操作可以使用sadd key 1 2 3 4 5 6 ..... 10000000000这种一口气传一堆数据的命令。有时候你甚至会发现*mset这种一口气操作一堆数据的速度更快。那么这种使用场景会有什么弊端呢?答案是:阻塞。操作这一堆数据需要多久,就会阻塞多久。 Redis Cluster下pipeline使用的思考由于JedisCluster中的所有操作本质上是使用Jedis,而Jedis是支持pipeline操作的,所以,要在redis cluster中使用pipeline是有可能的,只要你操作同一个键即可,准确的说,应该是你操作的键位于同一台服务器,更直白的,你操作的键是同一个Jedis实例。ok,如果你已经晕了,那你需要回看一下“知识储备”。说说笔者的使用场景吧,我们是把csv文件的一批数据读到内存中,同一批数据是存储到同一个key中的,最后的操作会类似于: set key member1set key member2set key member3...set key member100000操作的是同一个key,可以利用JedisCluster获取到该key的Jedis实例,然后利用pipeline操作。 让spring-data-redis也支持pipeline的思路提供一下代码思路。 RedisConnectionFactory factory = redisTemplate.getConnectionFactory();RedisConnection redisConnection = factory.getConnection();JedisClusterConnection jedisClusterConnection = (JedisClusterConnection) redisConnection;// 获取到原始到JedisCluster连接JedisCluster jedisCluster = jedisClusterConnection.getNativeConnection();// 通过key获取到具体的Jedis实例// 计算hash slot,根据特定的slot可以获取到特定的Jedis实例int slot = JedisClusterCRC16.getSlot(key);/** 不建议这么使用,官方在2.10版本已经修复此问题 2.10版本中,官方会直接提供JedisCluster#getConnectionFromSlot*/ Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class);field.setAccessible(true);JedisSlotBasedConnectionHandler jedisClusterConnectionHandler = (JedisSlotBasedConnectionHandler) field.get(jedisCluster);Jedis jedis = jedisClusterConnectionHandler.getConnectionFromSlot(slot);// 接下来就是pipeline操作了Pipeline pipeline = jedis.pipelined();...pipeline.syncAndReturnAll();// jedis会自动将资源归还到连接池jedis.close();RedisConnectionUtils.releaseConnection(redisConnection, factory);以上代码完全可以模仿spring-data-redis中RedisTemplate#executePipelined方法写成一个通用的方法,供使用者调用。 原文地址https://my.oschina.net/u/4554374/blog/4306457
【asp.net core 系列】8 实战之 利用 EF Core 完成数据操作层的实现 前言通过前两篇,我们创建了一个项目,并规定了一个基本的数据层访问接口。这一篇,我们将以EF Core为例演示一下数据层访问接口如何实现,以及实现中需要注意的地方。 添加EF Core先在数据层实现层引入 EF Core: cd Domain.Implementsdotnet add package Microsoft.EntityFrameworkCore当前项目以SqlLite为例,所以再添加一个SqlLite数据库驱动: dotnet add package Microsoft.EntityFrameworkCore.SQLite删除 Domain.Implements 里默认的Class1.cs 文件,然后添加Insfrastructure目录,创建一个 DefaultContext: using Microsoft.EntityFrameworkCore; namespace Domain.Implements.Insfrastructure{ public class DefaultContext : DbContext { private string ConnectStr { get; } public DefaultContext(string connectStr) { ConnectStr = connectStr; } protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { optionsBuilder.UseSqlite(ConnectStr);//如果需要别的数据库,在这里进行修改 } } } EF Core 批量加载模型通常情况下,在使用ORM的时候,我们不希望过度的使用特性来标注实体类。因为如果后期需要变更ORM或者出现其他变动的时候,使用特性来标注实体类的话,会导致迁移变得复杂。而且大部分ORM框架的特性都依赖于框架本身,并非是统一的特性结构,这样就会造成一个后果:本来应该是对调用方隐藏的实现就会被公开,而且在项目引用关系中容易出现循环引用。 所以,我在开发中会寻找是否支持配置类,如果使用配置类或者在ORM框架中设置映射关系,那么就可以保证数据层的纯净,也能实现对调用方隐藏实现。 EF Core的配置类我们在《C# 数据访问系列》中关于EF的文章中介绍过,这里就不做过多介绍了(没来得及看的小伙伴们不着急,后续会有一个简单版的介绍)。 通常情况下,配置类我也会放在Domain.Implements项目中。现在我给大家介绍一下如何快速批量加载配置类: protected override void OnModelCreating(ModelBuilder modelBuilder){ modelBuilder.ApplyConfigurationsFromAssembly(Assembly.GetAssembly(this.GetType()), t => t.GetInterfaces().Any(i => t.Name.Contains("IEntityTypeConfiguration"))); }现在版本的EF Core支持通过Assembly加载配置类,可以指定加载当前上下文类所在的Assembly,然后筛选实现接口中包含IEntityTypeConfiguration的类即可。 使用EF Core实现数据操作我们已经创建好了一个EF Context,那么现在就带领大家一起看一下,如何使用EF来实现 上一篇《「asp.net core」7 实战之 数据访问层定义》中介绍的数据访问接口: 新建一个BaseRepository类,在Domain.Implements项目的Insfrastructure 目录下: using Domain.Infrastructure;using Microsoft.EntityFrameworkCore; namespace Domain.Implements.Insfrastructure{ public abstract class BaseRepository<T> : ISearchRepository<T>, IModifyRepository<T> where T : class { public DbContext Context { get; } protected BaseRepository(DbContext context) { Context = context; } } }先创建以上内容,这里给Repository传参的时候,使用的是EFCore的默认Context类不是我们自己定义的。这是我个人习惯,实际上并没有其他影响。主要是为了对实现类隐藏具体的EF 上下文实现类。 在实现各接口方法之前,创建如下属性: public DbSet Set { get => Context.Set(); }这是EF操作数据的核心所在。 3.1 实现IModifyRepository接口先实现修改接口: public T Insert(T entity){ return Set.Add(entity).Entity; } public void Insert(params T[] entities){ Set.AddRange(entities); } public void Insert(IEnumerable entities){ Set.AddRange(entities); }public void Update(T entity){ Set.Update(entity); } public void Update(params T[] entities){ Set.UpdateRange(entities); } public void Delete(T entity){ Set.Remove(entity); } public void Delete(params T[] entities){ Set.RemoveRange(entities); }在修改接口里,我预留了几个方法没有实现,因为这几个方法使用EF Core自身可以实现,但实现会比较麻烦,所以这里借助一个EF Core的插件: dotnet add package Z.EntityFramework.Plus.EFCore这是一个免费开源的插件,可以直接使用。在Domain.Implements 中添加后,在BaseRepository 中添加如下引用: using System.Linq;using System.Linq.Expressions;实现方法: public void Update(Expression> predicate, Expression> updator){ Set.Where(predicate).UpdateFromQuery(updator); } public void Delete(Expression> predicate){ Set.Where(predicate).DeleteFromQuery(); } public void DeleteByKey(object key){ Delete(Set.Find(key)); } public void DeleteByKeys(params object[] keys){ foreach (var k in keys) { DeleteByKey(k); } }这里根据主键删除的方法有个问题,我们无法根据条件进行删除,实际上如果约定泛型T是BaseEntity的子类,我们可以获取到主键,但是这样又会引入另一个泛型,为了避免引入多个泛型根据主键的删除就采用了这种方式。 3.2 实现ISearchRepository 接口获取数据以及基础统计接口: public T Get(object key){ return Set.Find(key); } public T Get(Expression> predicate){ return Set.SingleOrDefault(predicate); } public int Count(){ return Set.Count(); } public long LongCount(){ return Set.LongCount(); } public int Count(Expression> predicate){ return Set.Count(predicate); } public long LongCount(Expression> predicate){ return Set.LongCount(predicate); } public bool IsExists(Expression> predicate){ return Set.Any(predicate); }这里有一个需要关注的地方,在使用条件查询单个数据的时候,我使用了SingleOrDefault而不是FirstOrDefault。这是因为我在这里做了规定,如果使用条件查询,调用方应该能预期所使用条件是能查询出最多一条数据的。不过,这里可以根据实际业务需要修改方法: Single 返回单个数据,如果数据大于1或者等于0,则抛出异常SingleOrDefault 返回单个数据,如果结果集没有数据,则返回null,如果多于1,则抛出异常First 返回结果集的第一个元素,如果结果集没有数据,则抛出异常FirstOrDefault 返回结果集的第一个元素,如果没有元素则返回null实现查询方法: public List Search(){ return Query().ToList(); } public List Search(Expression> predicate){ return Query(predicate).ToList(); } public IEnumerable Query(){ return Set; } public IEnumerable Query(Expression> predicate){ return Set.Where(predicate); } public List Search (Expression> predicate, Expression> order){ return Search(predicate, order, false); } public List Search (Expression> predicate, Expression> order, bool isDesc){ var source = Set.Where(predicate); if (isDesc) { source = source.OrderByDescending(order); } else { source = source.OrderBy(order); } return source.ToList(); }这里我尽量通过调用了参数最多的方法来实现查询功能,这样有一个好处,小伙伴们可以想一下哈。当然了,这是我自己觉得这样会好一点。 实现分页: 在实现分页之前,我们知道当时我们定义的分页参数类的排序字段用的是字符串,而不是lambda表达式,而Linq To EF需要一个Lambda表示才可以进行排序。这里就有两种方案,可以自己写一个方法,实现字符串到Lambda表达式的转换;第二种就是借用三方库来实现,正好我们之前引用的EF Core增强插件里有这个功能: var list = context.Customers.OrderByDescendingDynamic(x => "x.Name").ToList();这是它给出的示例。 我们可以先依此来写一份实现方法: public PageModel Search(PageCondition condition){ var result = new PageModel<T> { TotalCount = LongCount(condition.Predicate), CurrentPage = condition.CurrentPage, PerpageSize = condition.PerpageSize, }; var source = Query(condition.Predicate); if (condition.Sort.ToUpper().StartsWith("a")) // asc { source = source.OrderByDynamic(t => $"t.{condition.OrderProperty}"); } else // desc { source = source.OrderByDescendingDynamic(t => $"t.{condition.OrderProperty}"); } var items = source.Skip((condition.CurrentPage -1)* condition.PerpageSize).Take(condition.PerpageSize); result.Items = items.ToList(); return result; }回到第一种方案: 我们需要手动写一个字符串的处理方法,先在Utils项目创建以下目录:Extend>Lambda,并在目录中添加一个ExtLinq类,代码如下: using System.Linq;using System.Linq.Expressions;using System.Text.RegularExpressions; namespace Utils.Extend.Lambda{ public static class ExtLinq { public static IQueryable<T> CreateOrderExpression<T>(this IQueryable<T> source, string orderBy, string orderAsc) { if (string.IsNullOrEmpty(orderBy)|| string.IsNullOrEmpty(orderAsc)) return source; var isAsc = orderAsc.ToLower() == "asc"; var _order = orderBy.Split(','); MethodCallExpression resultExp = null; foreach (var item in _order) { var orderPart = item; orderPart = Regex.Replace(orderPart, @"\s+", " "); var orderArry = orderPart.Split(' '); var orderField = orderArry[0]; if (orderArry.Length == 2) { isAsc = orderArry[1].ToUpper() == "ASC"; } var parameter = Expression.Parameter(typeof(T), "t"); var property = typeof(T).GetProperty(orderField); var propertyAccess = Expression.MakeMemberAccess(parameter, property); var orderByExp = Expression.Lambda(propertyAccess, parameter); resultExp = Expression.Call(typeof(Queryable), isAsc ? "OrderBy" : "OrderByDescending", new[] {typeof(T), property.PropertyType}, source.Expression, Expression.Quote(orderByExp)); } return resultExp == null ? source : source.Provider.CreateQuery<T>(resultExp); } } }暂时不用关心为什么这样写,后续会为大家分析的。 然后回过头来再实现我们的分页,先添加Utils 到Domain.Implements项目中 cd ../Domain.Implements # 进入Domain.Implements 项目目录dotnet add reference ../Utilspublic PageModel Search(PageCondition condition){ var result = new PageModel<T> { TotalCount = LongCount(condition.Predicate), CurrentPage = condition.CurrentPage, PerpageSize = condition.PerpageSize, }; var source = Set.Where(condition.Predicate).CreateOrderExpression(condition.OrderProperty, condition.Sort); var items = source.Skip((condition.CurrentPage -1)* condition.PerpageSize).Take(condition.PerpageSize); result.Items = items.ToList(); return result; }记得添加引用: using Utils.Extend.Lambda;在做分页的时候,因为前台传入的参数大多都是字符串的排序字段,所以到后端需要进程字符串到字段的处理。这里的处理利用了C# Expression的一个技术,这里就不做过多介绍了。后续在.net core高级篇中会有介绍。 总结到目前为止,看起来我们已经成功实现了利用EF Core为我们达成 数据操作和查询的目的。但是,别忘了EF Core需要手动调用一个SaveChanges方法。下一篇,我们将为大家介绍如何优雅的执行SaveChanges方法。 这一篇介绍到这里,虽然说明不是很多,但是这也是我在开发中总结的经验。 原文地址https://www.cnblogs.com/c7jie/p/13081316.html
Spring AOP学习笔记02:如何开启AOP 上文简要总结了一些AOP的基本概念,并在此基础上叙述了Spring AOP的基本原理,并且辅以一个简单例子帮助理解。从本文开始,我们要开始深入到源码层面来一探Spring AOP魔法的原理了。 要使用Spring AOP,第一步是要将这一功能开启,一般有两种方式: 通过xml配置文件的方式;通过注解的方式; 配置文件开启AOP功能我们先来看一下配置文件的方式,这个上文也提到过,在xml文件中加上对应的标签,而且别忘了加上对应的名称空间(即下面的xmlns:aop。。。): <?xml version="1.0" encoding="UTF-8"?> xmlns="http://www.springframework.org/schema/beans" xmlns:aop = "http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"> <aop:aspectj-autoproxy/> 这里是通过标签来完成开启AOP功能,这是一个自定义标签,需要自定义其解析,而这些spring都已经实现好了,前面专门写过一篇文章讲述spring是如何解析自定义xml标签的,我们这里大致回顾一下解析流程: 定义一个XML文件来描述你的自定义标签元素;创建一个Handler,扩展自NamespaceHandlerSupport,用于注册下面的parser;创建若干个BeanDefinitionParser的实现,用来解析XML文件中的定义;将上述文件注册到Spring中,这里其实是做一下配置;我们就不照着这个步骤来了,我们直接参考spring对这个自定义标签的解析过程,上面的4个步骤只是作为参考,在整个解析过程中都会涉及到。 前面讲解析自定义xml标签时候提到过,解析的流程大致如下: 首先会去获取自定义标签对应的名称空间;然后根据名称空间找到对应的NamespaceHandler;调用自定义的NamespaceHandler进行解析;1.1 获取名称空间这里对应的名称空间是什么呢?在上面的开启aop的配置文件里面名称空间那里给出了一些线索,其实就是下面这个: http://www.springframework.org/schema/aop至于名称空间的获取,也无甚好说的,其实就是直接调用org.w3c.dom.Node提供的相应方法来完成名称空间的提取。 1.2 获取handler然后又是如何根据名称空间找到对应的NamespaceHandler呢?之前也说到过,在找对应的NamespaceHandler时会去META-INF/spring.handlers这个目录下加载资源文件,我们来找一下spring.handlers这个文件看看(需要去spring-aop对应的jar报下找): http://www.springframework.org/schema/aop=org.springframework.aop.config.AopNamespaceHandler看到没,这里是以key-value的形式维护着名称空间和对应handler的关系的,所以对应的handler就是这个AopNamespaceHandler。spring根据名称空间找到这个handler之后,会通过反射的方式将这个类加载,并缓存起来。 1.3 解析标签上面的handler只有一个自定义的方法: public void init() { // In 2.0 XSD as well as in 2.1 XSD. registerBeanDefinitionParser("config", new ConfigBeanDefinitionParser()); registerBeanDefinitionParser("aspectj-autoproxy", new AspectJAutoProxyBeanDefinitionParser()); registerBeanDefinitionDecorator("scoped-proxy", new ScopedProxyBeanDefinitionDecorator()); // Only in 2.0 XSD: moved to context namespace as of 2.1 registerBeanDefinitionParser("spring-configured", new SpringConfiguredBeanDefinitionParser()); } 这是一个初始化方法,在加载的时候会执行,主要作用就是注册一些解析器,这里我们主要关注AspectJAutoProxyBeanDefinitionParser,这就是我们要找的,它的作用就是解析标签的。主要流程就是,spring会调用上一步拿到的AopNamespaceHandler的parse()方法,在这个方法里面,会将解析的工作委托给AspectJAutoProxyBeanDefinitionParser来完成具体解析工作,我们就来看一下具体干了啥吧。 开始解析的工作从这里开始: return handler.parse(ele, new ParserContext(this.readerContext, this, containingBd));此时我们拿到的handler其实是我们自定义的AopNamespaceHandler了,但是它并没有实现parse()方法,所以这里这个应该是调用的父类(NamespaceHandlerSupport)中的parse()方法: public BeanDefinition parse(Element element, ParserContext parserContext) { // 寻找解析器并进行解析操作 return findParserForElement(element, parserContext).parse(element, parserContext); } private BeanDefinitionParser findParserForElement(Element element, ParserContext parserContext) { // 获取元素名称,也就是<aop:aspectj-autoproxy/>中的aspectj-autoproxy String localName = parserContext.getDelegate().getLocalName(element); // 根据aspectj-autoproxy找到对应的解析器,也就是在registerBeanDefinitionParser("aspectj-autoproxy", new AspectJAutoProxyBeanDefinitionParser()); // 注册的解析器 BeanDefinitionParser parser = this.parsers.get(localName); if (parser == null) { parserContext.getReaderContext().fatal( "Cannot locate BeanDefinitionParser for element [" + localName + "]", element); } return parser; } 首先是寻找元素对应的解析器,然后调用其parse()方法。结合我们前面的示例,其实就是首先获取在AopNamespaceHandler类中的init()方法中注册对应的AspectJAutoProxyBeanDefinitionParser实例,并调用其parse()方法进行进一步解析: public BeanDefinition parse(Element element, ParserContext parserContext) { AopNamespaceUtils.registerAspectJAnnotationAutoProxyCreatorIfNecessary(parserContext, element); extendBeanDefinition(element, parserContext); return null; } // 下面的代码在AopConfigUtils中public static void registerAspectJAnnotationAutoProxyCreatorIfNecessary( ParserContext parserContext, Element sourceElement) { BeanDefinition beanDefinition = AopConfigUtils.registerAspectJAnnotationAutoProxyCreatorIfNecessary( parserContext.getRegistry(), parserContext.extractSource(sourceElement)); useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement); registerComponentIfNecessary(beanDefinition, parserContext); } public static BeanDefinition registerAspectJAnnotationAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, Object source) { return registerOrEscalateApcAsRequired(AnnotationAwareAspectJAutoProxyCreator.class, registry, source); } private static BeanDefinition registerOrEscalateApcAsRequired(Class cls, BeanDefinitionRegistry registry, Object source) { Assert.notNull(registry, "BeanDefinitionRegistry must not be null"); if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) { BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME); if (!cls.getName().equals(apcDefinition.getBeanClassName())) { int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName()); int requiredPriority = findPriorityForClass(cls); if (currentPriority < requiredPriority) { apcDefinition.setBeanClassName(cls.getName()); } } return null; } RootBeanDefinition beanDefinition = new RootBeanDefinition(cls); beanDefinition.setSource(source); beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition); return beanDefinition; } 上面这一堆代码最核心的部分就在后两个方法中,就是完成了对AnnotationAwareAspectJAutoProxyCreator类的注册,到这里对自定义标签的解析也就完成了,可以看到其最核心的部分就是完成了对AnnotationAwareAspectJAutoProxyCreator类的注册,那为什么注册了这个类就开启了aop功能呢?这里先卖个关子,后面详细说。 这里再回过头来看一下上面说到的spring对自定义标签解析的4个步骤,其实第一步的schema对应的是在org.springframework.aop.config路径下的spring-aop-3.0.xsd文件,其映射关系是维护在META-INF/spring.schemas文件中的,而spring-aop-3.0.xsd的主要作用就是描述自定义标签。 当通过META-INF/spring.handlers找到对应的AopNamespaceHandler,并通过在其加载后执行init()方法过程中完成了AspectJAutoProxyBeanDefinitionParser的注册,有这个parser再来完成对自定义标签的解析工作,这对应上面4个步骤中的第二步和第三部。至于第四步的配置工作,无非就是将spring.schemas和spring.handlers这两个配置文件放在META-INF/目录下罢了。 关于这部分解析过程,写得不是非常详细,如果有不明白,可以参考之前一篇文章,讲spring是如何解析自定义xml标签。 注解方式开启aop另一种开启spring aop的方式是通过注解的方式,使用的注解是@EnableAspectJAutoProxy,可以通过配置类的方式完成注册: @Configuration@EnableAspectJAutoProxypublic class AppConfig { }也可以在启动类上直接加上这个注解,这在springboot中比较常见,其实质也是上面的方式。通过这种方式配置之后,就开启了aop功能,那具体又是如何实现的呢?我们看一下这个注解: @Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(AspectJAutoProxyRegistrar.class)public @interface EnableAspectJAutoProxy { /** * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. The default is {@code false}. */ boolean proxyTargetClass() default false; /** * Indicate that the proxy should be exposed by the AOP framework as a {@code ThreadLocal} * for retrieval via the {@link org.springframework.aop.framework.AopContext} class. * Off by default, i.e. no guarantees that {@code AopContext} access will work. * @since 4.3.1 */ boolean exposeProxy() default false; } 这里我们的关注点是其通过@Import(AspectJAutoProxyRegistrar.class)引入了AspectJAutoProxyRegistrar,那这又是什么? class AspectJAutoProxyRegistrar implements ImportBeanDefinitionRegistrar { /** * Register, escalate, and configure the AspectJ auto proxy creator based on the value * of the @{@link EnableAspectJAutoProxy#proxyTargetClass()} attribute on the importing * {@code @Configuration} class. */ @Override public void registerBeanDefinitions( AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { AopConfigUtils.registerAspectJAnnotationAutoProxyCreatorIfNecessary(registry); AnnotationAttributes enableAspectJAutoProxy = AnnotationConfigUtils.attributesFor(importingClassMetadata, EnableAspectJAutoProxy.class); if (enableAspectJAutoProxy != null) { if (enableAspectJAutoProxy.getBoolean("proxyTargetClass")) { AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry); } if (enableAspectJAutoProxy.getBoolean("exposeProxy")) { AopConfigUtils.forceAutoProxyCreatorToExposeProxy(registry); } } } } 看到这里,是不是有点眼熟了呢?是的,其实它也是和上面说的xml配置使用的方式一样,通过AopConfigUtils来完成AnnotationAwareAspectJAutoProxyCreator类的注册。是不是比xml配置文件的方式方便许多呢。 开启aop的魔法通过前面的学习我们了解了可以通过Spring自定义配置完成对AnnotationAwareAspectJAutoProxyCreator类型的自动注册,而这个类到底是做了什么工作来实现AOP的操作呢?这里还是先来看一下AnnotationAwareAspectJAutoProxyCreator的类层次结构: 这里有一个很重要的点,就是AnnotationAwareAspectJAutoProxyCreator实现了BeanPostProcessor接口。在IOC部分的文章中有详细说过,Spring在加载Bean的过程中会在实例化bean前后调用BeanPostProcessor的相关方法(相关逻辑是在initializeBean方法中,调用postProcessBeforeInitialization、postProcessAfterInitialization方法),而AOP的魔法就是从这里开始的。 每次看到这里,我内心对spring的软件架构设计都是涌现出无比的佩服,通过后处理器的方式来做扩展,对原有模块是没有任何改动,也不会产生耦合,spring亲自践行着对修改关闭,对扩展开放的原则。 总结本文我们学习了spring是如何开启aop功能的,无论是通过xml配置文件方式,还是通过Java config这种注解的方式,其最终都是完成了将AnnotationAwareAspectJAutoProxyCreator这个类注册到spring容器当中,那这个类又有什么魔法,可以达到将其注册到容器即达到开启aop的功效,其实其继承自BeanPostProcessor接口,通过后处理器的方式扩展出了开启spring aop的功能。 原文地址https://www.cnblogs.com/volcano-liu/p/12990888.html
mysql-innodb-事务 写在最前这是读书笔记,Mysql,innodb系列一共3篇。 Mysql-innodb-B+索引 Mysql-innodb-锁 Mysql-innodb-事务ACIDA:原子性,要么成功,要么失败C:一致性,事务将数据库从一种状态转换为另一种稳定状态,不违反约束条件I:隔离性,多个事务互不影响D:持久性 事务的隔离级别隔离级别 说明READ UNCOMMITTED 未提交读,会造成脏读,违反持久性DREAD COMMITTED 读已提交数据, 会造成幻读 违反一致性CREPEATABLE READ 可重复读,默认隔离级别SERIALIZABLE 不会使用mysql的mvcc机制,而是在每一个select请求下获得读锁,在每一个update操作下尝试获得写锁SELECT@@global.tx_isolation查看全局事务隔离级别 事务的实现 Force Log at Commit机制当事务提交时,必须先将该事务的所有日志写入到日志文件进行持久化,之后进行COMMIT操作完成。日志写入日志文件时,日志缓冲先写入文件系统缓存,为了确保写入磁盘,需要调用一次fsync操作。由于fsync的效率取决于磁盘的性能,因此磁盘的性能决定了事务提交的性能,也就是数据库的性能。 3种日志文件 redolog 概念实现事务的持久性。InnoDB存储引擎层产生,物理日志,记录的是对页的修改,innodb1.2版本后,最大512GB一个事务多个日志记录,每个事务内部是顺序写的。并发写入多个事务的日志,不随事务提交顺序写入两部分:重做日志缓冲(redo log buffer)易失的;重做日志文件(redo log file),持久的 log buffer刷新策略由innodb_flush_log_at_trx_commit控制 innodb_flush_log_at_trx_commit值 说明0 提交时,不写入日志文件1 默认值,提交时调用一次fsync操作2 提交时写日志文件,不进行fsync操作log buffer刷新到磁盘的规则事务提交时log buffer已经有一半空间被使用log checkpoint时innodb恢复时如何使用redologcheckpoint存储了已经刷新到磁盘页上的LSN,所以仅需恢复checkpoint开始的日志部分innodb,顺序读取,并行操作,提高性能物理日志,幂等的,恢复快LSN存储了checkpoint的位置。 undolog 基本概念存储在undo段中,位于共享表空间,逻辑日志支持mvcc,支持回滚undolog 会生产redo log回滚时,undo生产反向操作,insert对应delete,delete对应一条insert,update对应一个反向update 格式类型 说明insert undo log insert产生,事务本身可见,其他事务不可见,commit后直接删除update undo log delete,update产生,其他事务可见,commit后放入列表中,供purge操作比insert undo log大 commit后undo加入history list中,供后续purge操作判断undo页 的使用空间是否小于3/4,是新的undo log 记录到老的undo log后边 binlogMySQL数据库的上层产生的,并且二进制日志不仅仅针对于InnoDB存储引擎,逻辑日志,记录的是SQL语句事务提交后一次性写入 purgepurge是清理的delete和update之前行记录的版本。从history list中找undo log,然后再从undo page中找undo log,防止大量随机读写,提高性能相关参数 :innodb_purge_batch_size设置每次需要purge清理的undo page数量,innodb1.2以后默认为300innodb_max_purge_lag用来控制history list的长度,默认值为0,不做任何限制大于0,延后DML操作,对每行数据延缓:delay=((length(history_list)- innodb_max_purge_lag)*10)-5 group commit提高磁盘fsync的效率,一次刷新多个事务日志文件综述:5.7版本innodb开启binlog的commit过程 注意: THD是MySQL server层最核心的类 LSN: 日志序列号 重做日志写入的总量 checkpoint的位置 页的版本原文地址https://my.oschina.net/floor/blog/4296143
实现.Net程序中OpenTracing采样和上报配置的自动更新 前言OpenTracing是一个链路跟踪的开放协议,已经有开源的.net实现:opentracing-csharp,同时支持.net framework和.net core,Github地址:https://github.com/opentracing/opentracing-csharp。 这个库支持多种链路跟踪模式,不过仅提供了最基础的功能,想用在实际项目中还需要做很多增强,还好也有人做了开源项目:opentracing-contrib,Github地址:https://github.com/opentracing-contrib/csharp-netcore。 opentracing-contrib中集成了一个名为Jaeger的类库,这个库实现了链路跟踪数据的采样和上报,支持将数据上传到Jaeger进行分析统计。 为了同时保障性能和跟踪关键数据,能够远程调整采样率是很重要的,Jaeger本身也提供了远程配置采样率的支持。 不过我这里用的阿里云链路跟踪不支持,配置的设计也和想要的不同,所以自己做了一个采样和上报配置的动态更新,也才有了这篇文章。 思路使用Jaeger初始化Tracer大概是这样的: var tracer = new Tracer.Builder(serviceName) .WithSampler(sampler) .WithReporter(reporter) .Build(); GlobalTracer.Register(tracer);首先是提供当前服务的名字,然后需要提供一个采样器,再提供一个上报器,Build下生成ITracer的一个实例,最后注册到全局。 可以分析得出,采样和上报配置的更新就是更新采样器和上报器。 不过Tracer并没有提供UpdateSampler和UdapteReporter的方法,被卡住了,怎么办呢? 前文提到Jaeger是支持采样率的动态调整的,看看它怎么做的: private RemoteControlledSampler(Builder builder) { ... _pollTimer = new Timer(_ => UpdateSampler(), null, TimeSpan.Zero, builder.PollingInterval); } /// <summary> /// Updates <see cref="Sampler"/> to a new sampler when it is different. /// </summary> internal void UpdateSampler() { try { SamplingStrategyResponse response = _samplingManager.GetSamplingStrategyAsync(_serviceName) .ConfigureAwait(false).GetAwaiter().GetResult(); ... UpdateRateLimitingOrProbabilisticSampler(response); } catch (Exception ex) { ... } } private void UpdateRateLimitingOrProbabilisticSampler(SamplingStrategyResponse response) { ... lock (_lock) { if (!Sampler.Equals(sampler)) { Sampler.Close(); Sampler = sampler; ... } } } 这里只留下关键代码,可以看到核心就是:通过一个Timer定时获取采样策略,然后替换原来的Sampler。 这是一个很好理解的办法,下边就按照这个思路来搞。 方案分别提供一个可更新的Sampler和可更新的Reporter,Build Tracer时使用这两个可更新的类。这里延续开源项目中Samper和Reporter的创建方式,给出这两个类。 可更新的Sampler: internal class UpdatableSampler : ValueObject, ISampler { public const string Type = "updatable"; private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); private readonly string _serviceName; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private readonly IMetrics _metrics; internal ISampler Sampler { get; private set; } private UpdatableSampler(Builder builder) { _serviceName = builder.ServiceName; _loggerFactory = builder.LoggerFactory; _logger = _loggerFactory.CreateLogger<UpdatableSampler>(); _metrics = builder.Metrics; Sampler = builder.InitialSampler; } /// <summary> /// Updates <see cref="Sampler"/> to a new sampler when it is different. /// </summary> public void UpdateSampler(ISampler sampler) { try { _lock.EnterWriteLock(); if (!Sampler.Equals(sampler)) { Sampler.Close(); Sampler = sampler; _metrics.SamplerUpdated.Inc(1); } } catch (System.Exception ex) { _logger.LogWarning(ex, "Updating sampler failed"); _metrics.SamplerQueryFailure.Inc(1); } finally { _lock.ExitWriteLock(); } } public SamplingStatus Sample(string operation, TraceId id) { try { _lock.EnterReadLock(); var status= Sampler.Sample(operation, id); return status; } finally { _lock.ExitReadLock(); } } public override string ToString() { try { _lock.EnterReadLock(); return $"{nameof(UpdatableSampler)}(Sampler={Sampler})"; } finally { _lock.ExitReadLock(); } } public void Close() { try { _lock.EnterWriteLock(); Sampler.Close(); } finally { _lock.ExitWriteLock(); } } protected override IEnumerable<object> GetAtomicValues() { yield return Sampler; } public sealed class Builder { internal string ServiceName { get; } internal ILoggerFactory LoggerFactory { get; private set; } internal ISampler InitialSampler { get; private set; } internal IMetrics Metrics { get; private set; } public Builder(string serviceName) { ServiceName = serviceName ?? throw new ArgumentNullException(nameof(serviceName)); } public Builder WithLoggerFactory(ILoggerFactory loggerFactory) { LoggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); return this; } public Builder WithInitialSampler(ISampler initialSampler) { InitialSampler = initialSampler ?? throw new ArgumentNullException(nameof(initialSampler)); return this; } public Builder WithMetrics(IMetrics metrics) { Metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); return this; } public UpdatableSampler Build() { if (LoggerFactory == null) { LoggerFactory = NullLoggerFactory.Instance; } if (InitialSampler == null) { InitialSampler = new ProbabilisticSampler(); } if (Metrics == null) { Metrics = new MetricsImpl(NoopMetricsFactory.Instance); } return new UpdatableSampler(this); } } } 可更新的Reporter: internal class UpdatableReporter : IReporter { public const string Type = "updatable"; private readonly string _serviceName; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private readonly IMetrics _metrics; private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); internal IReporter Reporter { get; private set; } private UpdatableReporter(Builder builder) { _serviceName = builder.ServiceName; _loggerFactory = builder.LoggerFactory; _logger = _loggerFactory.CreateLogger<UpdatableReporter>(); _metrics = builder.Metrics; Reporter = builder.InitialReporter; } /// <summary> /// Updates <see cref="Reporter"/> to a new reporter when it is different. /// </summary> public void UpdateReporter(IReporter reporter) { try { _lock.EnterWriteLock(); if (!Reporter.Equals(reporter)) { Reporter.CloseAsync(CancellationToken.None).ConfigureAwait(false).GetAwaiter().GetResult(); Reporter = reporter; _metrics.SamplerUpdated.Inc(1); } } catch (System.Exception ex) { _logger.LogWarning(ex, "Updating reporter failed"); _metrics.ReporterFailure.Inc(1); } finally { _lock.ExitWriteLock(); } } public void Report(Span span) { try { _lock.EnterReadLock(); Reporter.Report(span); } finally { _lock.ExitReadLock(); } } public override string ToString() { try { _lock.EnterReadLock(); return $"{nameof(UpdatableReporter)}(Reporter={Reporter})"; } finally { _lock.ExitReadLock(); } } public async Task CloseAsync(CancellationToken cancellationToken) { try { _lock.EnterWriteLock(); await Reporter.CloseAsync(cancellationToken); } finally { _lock.ExitWriteLock(); } } public sealed class Builder { internal string ServiceName { get; } internal ILoggerFactory LoggerFactory { get; private set; } internal IReporter InitialReporter { get; private set; } internal IMetrics Metrics { get; private set; } public Builder(string serviceName) { ServiceName = serviceName ?? throw new ArgumentNullException(nameof(serviceName)); } public Builder WithLoggerFactory(ILoggerFactory loggerFactory) { LoggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); return this; } public Builder WithInitialReporter(IReporter initialReporter) { InitialReporter = initialReporter ?? throw new ArgumentNullException(nameof(initialReporter)); return this; } public Builder WithMetrics(IMetrics metrics) { Metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); return this; } public UpdatableReporter Build() { if (LoggerFactory == null) { LoggerFactory = NullLoggerFactory.Instance; } if (InitialReporter == null) { InitialReporter = new NoopReporter(); } if (Metrics == null) { Metrics = new MetricsImpl(NoopMetricsFactory.Instance); } return new UpdatableReporter(this); } } } 注意这里边用到了读写锁,因为要做到不停止服务的更新,而且大部分情况下都是读,使用lock就有点大柴小用了。 现在初始化Tracer大概是这样的: 123456789101112sampler = new UpdatableSampler.Builder(serviceName) .WithInitialSampler(BuildSampler(configuration)) .Build(); reporter = new UpdatableReporter.Builder(serviceName) .WithInitialReporter(BuildReporter(configuration)) .Build(); var tracer = new Tracer.Builder(serviceName) .WithSampler(sampler) .WithReporter(reporter) .Build(); 当配置发生改变时,调用sampler和reporter的更新方法: private void OnTracingConfigurationChanged(TracingConfiguration newConfiguration, TracingConfigurationChangedInfo changedInfo) { ... ((UpdatableReporter)_reporter).UpdateReporter(BuildReporter(newConfiguration)); ((UpdatableSampler)_sampler).UpdateSampler(BuildSampler(newConfiguration)); ... } 这里就不写如何监听配置的改变了,使用Timer或者阻塞查询等等都可以。 后记opentracing-contrib这个项目只支持.net core,如果想用在.net framwork中还需要自己搞,这个方法会单独写一篇文章,这里就不做介绍了。 原文地址https://www.cnblogs.com/bossma/p/dotnet-opentracing-sampler-reporter-remote-configuration-autoupdate.html
推荐码策略貌似正在完善。 以后都用不了推荐码了 但仍可以优惠
阿里云幸运券领取地址:https://promotion.aliyun.com/ntms/act/ambassador/sharetouser.html?userCode=q3dtcrrv&utm_source=q3dtcrrv