getProperty操作的性能测试与优化
1. 性能对比测试基于JMH(1.32版本)进行性能测试(机器:MacBook Pro 13,16GB内存,M1芯片;JDK版本:zulu-8,1.8.0_302),分别比较原生的getXXX(),Apache BeanUtils的getProperty(),基于原生反射机制的field.get()和unsafe的getObject(),测试代码如下:package com.xycode.paramcheck.service.domain;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
/**
* @author: lianguang
* @date: 2021/8/15
*/
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class Domain extends BaseDomain {
private Integer age;
private String name;
public Domain(Long id, Integer age, String name) {
super(id);
this.age = age;
this.name = name;
}
}package com.xycode.paramcheck.benchmark;
import com.xycode.paramcheck.service.domain.Domain;
import org.apache.commons.beanutils.BeanUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author: lianguang
* @date: 2021/11/3
*/
@Warmup(iterations = 3)
@Measurement(iterations = 5, time = 5)
@Fork(2)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
public class BeanGetterBenchMark {
public Unsafe unsafe;
public Unsafe getUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
return null;
}
private Domain source = new Domain(1000L, 100, "source");
@Setup
@BeforeEach
public void init() throws Exception {
unsafe = getUnsafe();
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void testApacheBeanUtilsGetProperty() throws Exception {
BeanUtils.getProperty(source, "id");
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void getPropertyReflect() throws Exception {
Field afield = Domain.class.getSuperclass().getDeclaredField("id");
afield.setAccessible(true);
afield.get(source);
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void getPropertyUnsafe() throws Exception {
long aFieldOffset = unsafe.objectFieldOffset(Domain.class.getSuperclass().getDeclaredField("id"));
unsafe.getObject(source, aFieldOffset);
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void testNativeGetter() {
source.getId();
}
public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(BeanGetterBenchMark.class.getSimpleName())
.build();
new Runner(options).run();
}测试结果:Benchmark Mode Cnt Score Error Units
BeanGetterBenchMark.getPropertyReflect thrpt 10 4.140 ± 0.019 ops/us
BeanGetterBenchMark.getPropertyUnsafe thrpt 10 2.914 ± 0.126 ops/us
BeanGetterBenchMark.testApacheBeanUtilsGetProperty thrpt 10 0.464 ± 0.014 ops/us
BeanGetterBenchMark.testNativeGetter thrpt 10 1794.462 ± 14.348 ops/us可见,Apache的BeanUtils的性能最差,原生反射的性能稍好一些,意外的是,unsafe方式性能低于原生反射方式。原生get的方式性能最好,是反射方式与unsafe方式性能的数百倍,有没有什么方法能够提升反射或unsafe的性能呢?稍加分析,反射方式和unsafe方式的过程大致都分为两个步骤,第一个步骤先获取属性的field,第二个步骤则根据field去获取属性值,又因为一个类的某个属性的field是相对不变的,由此可得一种提升性能的方式是将field缓存起来,当需要field时直接从缓存中获取,经过测试(在16GB,M1芯片的MacBook Pro13机器上),Java中Map的get性能大概在 200~300 ops/us,因此是有提升性能的空间的。模拟缓存情况下的性能,测试代码如下:package com.xycode.paramcheck.benchmark;
import com.xycode.paramcheck.service.domain.Domain;
import org.apache.commons.beanutils.BeanUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author: lianguang
* @date: 2021/11/3
*/
@Warmup(iterations = 3)
@Measurement(iterations = 5, time = 5)
@Fork(2)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
public class BeanGetterBenchMark {
public Unsafe unsafe;
public Unsafe getUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
return null;
}
private Domain source = new Domain(1000L, 100, "source");
private Map<String, Long> offsetMap = new ConcurrentHashMap<>();
private Map<String, Field> fieldMap = new ConcurrentHashMap<>();
@Setup
public void init() throws Exception {
unsafe = getUnsafe();
//模拟缓存优化
Field field= Domain.class.getSuperclass().getDeclaredField("id");
field.setAccessible(true);
fieldMap.put("id",field);
offsetMap.put("id",unsafe.objectFieldOffset(field));
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void testApacheBeanUtilsGetProperty() throws Exception {
BeanUtils.getProperty(source, "id");
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void getPropertyReflectWithCache() throws Exception {
fieldMap.get("id").get(source);
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Test
public void getPropertyUnsafeWithCache() {
unsafe.getObject(source, offsetMap.get("id"));
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Test
public void testNativeGetter() {
source.getId();
}
public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(BeanGetterBenchMark.class.getSimpleName())
.build();
new Runner(options).run();
}
}测试结果如下:Benchmark(考虑到ConcurrentHashMap的开销) Mode Cnt Score Error Units
BeanGetterBenchMark.getPropertyReflectWithCache thrpt 10 175.756 ± 3.043 ops/us
BeanGetterBenchMark.getPropertyUnsafeWithCache thrpt 10 258.211 ± 3.122 ops/us
BeanGetterBenchMark.testApacheBeanUtilsGetProperty thrpt 10 0.435 ± 0.056 ops/us
BeanGetterBenchMark.testNativeGetter thrpt 10 1771.469 ± 30.345 ops/us可以看出,在命中缓存的情况下,反射方式与unsafe方式的性能得到了极大地提升,有一点比较有意思,unsafe方式在缓存情况下的性能超过了反射方式。因为实际上unsafe.getObject()是一个Native方法,其性能是相当优秀的,在fieldOffset是现成的基础上,unsafe.getObject()性能与原生的getXXX相当,经过测试,它们之间的性能对比如下:(甚至有时的测试结果高于原生的getXXX)BeanUtilsBenchMark.testNativeGetter 1784.362 ± 10.617 ops/us
BeanUtilsBenchMark.testUnsafeGetter 1802.107 ± 4.319 ops/us2. BeanOperationUtils的实现通过以上的分析,加了缓存后能够极大地提升反射方式与unsafe方式的get的性能,并且在缓存下的unsafe方式性能反超原生反射方式,由此这里实现了一个工具类,代码如下所示:package com.xycode.paramcheck.utils;
import java.util.Objects;
/**
* 缓存key
*
* @author: lianguang
* @date: 2021/11/3
*/
public class FieldCacheKey {
/**
* 类对象
*/
private Class<?> clazz;
/**
* 类的属性名
*/
private String propertyName;
public FieldCacheKey(Class<?> clazz, String propertyName) {
this.clazz = clazz;
this.propertyName = propertyName;
}
public Class<?> getClazz() {
return clazz;
}
public void setClazz(Class<?> clazz) {
this.clazz = clazz;
}
public String getPropertyName() {
return propertyName;
}
public void setPropertyName(String propertyName) {
this.propertyName = propertyName;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldCacheKey that = (FieldCacheKey) o;
return Objects.equals(clazz, that.clazz) && Objects.equals(propertyName, that.propertyName);
}
@Override
public int hashCode() {
return Objects.hash(clazz, propertyName);
}
}package com.xycode.paramcheck.utils;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* bean的相关操作
*
* @author: lianguang
* @date: 2021/11/3
*/
public class BeanOperationUtils {
private static final Unsafe unsafe;
private static final Map<FieldCacheKey, Long> fieldOffsetCache = new ConcurrentHashMap<>(1024);
private static final Map<FieldCacheKey, Field> fieldCache = new ConcurrentHashMap<>(1024);
static {
unsafe = getUnsafe();
}
private static Unsafe getUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
return null;
}
/**
* 因为可能有嵌套属性的存在, 需要递归向上获取field
*/
private static void collectField(final Class<?> clazz, final String name, final Field[] fields) throws Exception {
if (Objects.equals(Object.class, clazz) || Objects.nonNull(fields[0])) {
return;
}
try {
fields[0] = clazz.getDeclaredField(name);
} catch (Exception ignored) {
}
collectField(clazz.getSuperclass(), name, fields);
}
/**
* 反射方式获取属性值, 性能远高于Apache的BeanUtils.getProperty(), 低于命中缓存情况下的unsafe方式
*
* @param clazz 类
* @param name 属性名
*/
private static Field getFieldWithCache(final Class<?> clazz, final String name) throws Exception {
Field[] fields = new Field[1];
FieldCacheKey key = new FieldCacheKey(clazz, name);
fields[0] = fieldCache.get(key);
if (Objects.isNull(fields[0])) {
collectField(clazz, name, fields);
if (Objects.nonNull(fields[0])) {
fields[0].setAccessible(true);
//设置缓存
fieldCache.put(key, fields[0]);
}
}
return fields[0];
}
/**
* 反射方式获取属性值, 性能远高于Apache的BeanUtils.getProperty(), 低于命中缓存情况下的unsafe方式
*
* @param clazz 类
* @param name 属性名
*/
private static Field getField(final Class<?> clazz, final String name) throws Exception {
Field[] fields = new Field[1];
fields[0] = null;
collectField(clazz, name, fields);
if (Objects.nonNull(fields[0])) {
fields[0].setAccessible(true);
}
return fields[0];
}
/**
* 检查对象中是否包含属性定义, 在进行getProperty()操作时, 需要先检查一下
*
* @param clazz 类
* @param name 属性名
*/
public static boolean containsProperty(Class<?> clazz, String name) throws Exception {
if (Objects.isNull(clazz) || Objects.isNull(name)) {
return false;
}
Field field = getFieldWithCache(clazz, name);
if (Objects.nonNull(field)) {
return true;
} else {
return false;
}
}
/**
* 反射方式获取属性值(带缓存FieldCache)
* notice: 在进行getProperty()操作时, 需要调用containsProperty先检查一下, 若存在,才调用getProperty()
*
* @param bean 对象
* @param name 属性名
*/
public static Object getPropertyWithFieldCache(Object bean, String name) throws Exception {
if (Objects.isNull(bean) || Objects.isNull(name)) {
return null;
}
Field field = getFieldWithCache(bean.getClass(), name);
if (Objects.nonNull(field)) {
return field.get(bean);
} else {
return null;
}
}
/**
* unsafe方式获取属性值(带fieldOffsetCache)
*
* @param bean 对象
* @param name 属性名
* notice 在进行getProperty()操作时, 需要调用containsProperty先检查一下, 若存在,才调用getProperty()
*/
public static Object getPropertyWithFieldOffsetCache(Object bean, String name) throws Exception {
if (Objects.isNull(bean) || Objects.isNull(name)) {
return null;
}
FieldCacheKey key = new FieldCacheKey(bean.getClass(), name);
Long offset = fieldOffsetCache.get(key);
if (Objects.isNull(offset)) {
//已经有fieldOffsetCache了,不用重复缓存了
Field field = getField(bean.getClass(), name);
//设置缓存
fieldOffsetCache.put(key, unsafe.objectFieldOffset(field));
return field.get(bean);
} else {
/**
* unsafe.getObject()是native方法,性能与原生的object.getXXX()相差无几, 基于jmh的性能测试如下:
* BeanUtilsBenchMark.testNativeGetter 1784.362 ± 10.617 ops/us
* BeanUtilsBenchMark.testUnsafeGetter 1802.107 ± 4.319 ops/us
*/
return unsafe.getObject(bean, offset);
}
}
}3. BeanOperationUtils的性能测试BeanOperationUtils的性能测试代码如下:package com.xycode.paramcheck.benchmark;
import com.xycode.paramcheck.service.domain.Domain;
import com.xycode.paramcheck.utils.BeanOperationUtils;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.TimeUnit;
/**
* @author: lianguang
* @date: 2021/11/3
*/
@Warmup(iterations = 3)
@Measurement(iterations = 5, time = 5)
@Fork(2)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
public class BeanOperationBenchmark {
private Domain source = new Domain(1000L, 100, "source");
@Setup
public void init() throws Exception {
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void testReflectGetterWithCache() throws Exception {
//反射方式获取属性值
BeanOperationUtils.getPropertyWithFieldCache(source, "id");
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void testUnsafeGetterWithCache() throws Exception {
//unsafe方式获取属性值
BeanOperationUtils.getPropertyWithFieldOffsetCache(source, "id");
}
public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(BeanOperationBenchmark.class.getSimpleName())
.build();
new Runner(options).run();
}
}测试结果如下:Benchmark Mode Cnt Score Error Units
BeanOperationBenchmark.testReflectGetterWithCache thrpt 10 86.110 ± 2.601 ops/us
BeanOperationBenchmark.testUnsafeGetterWithCache thrpt 10 137.352 ± 2.046 ops/us可见,BeanOperationUtils的性能远高于Apache BeanUtils的getProperty(),其中unsafe方式(命中fieldOffset缓存)的性能高于反射方式(命中field缓存)。
Curator实现分布式锁(可重入 不可重入 读写 联锁 信号量 栅栏 计数器)
前言Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。Curator主要解决了三类问题:封装ZooKeeper client与ZooKeeper server之间的连接处理提供了一套Fluent风格的操作API提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,这些实现都遵循了zk的最佳实践,并考虑了各种极端情况Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-framework和curator-recipes:curator-framework:提供了常见的zk相关的底层操作curator-recipes:提供了一些zk的典型使用场景的参考。本节重点关注的分布式锁就是该包提供的代码实践curator 4.3.0支持zookeeper 3.4.x和3.5,但是需要注意curator传递进来的依赖,需要和实际服务器端使用的版本相符,以使用zookeeper 3.4.14为例。<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>1. 配置添加curator客户端配置:@Configuration
public class CuratorConfig {
@Bean
public CuratorFramework curatorFramework(){
// 重试策略,这里使用的是指数补偿重试策略,重试3次,初始重试间隔1000ms,每次重试之后重试间隔递增。
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
// 初始化Curator客户端:指定链接信息 及 重试策略
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.1.111:2181", retry);
client.start(); // 开始链接,如果不调用该方法,很多方法无法工作
return client;
}
}2. 可重入锁InterProcessMutexReentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。它是由类InterProcessMutex来实现。// 常用构造方法
public InterProcessMutex(CuratorFramework client, String path)
// 获取锁
public void acquire();
// 带超时时间的可重入锁
public boolean acquire(long time, TimeUnit unit);
// 释放锁
public void release();测试方法:@Autowired
private CuratorFramework curatorFramework;
public void checkAndLock() {
InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/curator/lock");
try {
// 加锁
mutex.acquire();
// 处理业务
// 例如查询库存 扣减库存
// this.testSub(mutex); 如想重入,则需要使用同一个InterProcessMutex对象
// 释放锁
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
public void testSub(InterProcessMutex mutex) {
try {
mutex.acquire();
System.out.println("测试可重入锁。。。。");
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}注意:如想重入,则需要使用同一个InterProcessMutex对象。3. 不可重入锁InterProcessSemaphoreMutex具体实现:InterProcessSemaphoreMutex与InterProcessMutex调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入。public InterProcessSemaphoreMutex(CuratorFramework client, String path);
public void acquire();
public boolean acquire(long time, TimeUnit unit);
public void release();案例:@Autowired
private CuratorFramework curatorFramework;
public void deduct() {
InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curatorFramework, "/curator/lock");
try {
mutex.acquire();
// 处理业务
// 例如查询库存 扣减库存
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}4. 可重入读写锁InterProcessReadWriteLock类似JDK的ReentrantReadWriteLock。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁。从读锁升级成写锁是不成的。主要实现类InterProcessReadWriteLock:// 构造方法
public InterProcessReadWriteLock(CuratorFramework client, String basePath);
// 获取读锁对象
InterProcessMutex readLock();
// 获取写锁对象
InterProcessMutex writeLock();注意:写锁在释放之前会一直阻塞请求线程,而读锁不会public void testZkReadLock() {
try {
InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/curator/rwlock");
rwlock.readLock().acquire(10, TimeUnit.SECONDS);
// TODO:一顿读的操作。。。。
//rwlock.readLock().unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
public void testZkWriteLock() {
try {
InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/curator/rwlock");
rwlock.writeLock().acquire(10, TimeUnit.SECONDS);
// TODO:一顿写的操作。。。。
//rwlock.writeLock().unlock();
} catch (Exception e) {
e.printStackTrace();
}
}5. 联锁InterProcessMultiLockMulti Shared Lock是一个锁的容器。当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。实现类InterProcessMultiLock:// 构造函数需要包含的锁的集合,或者一组ZooKeeper的path
public InterProcessMultiLock(List<InterProcessLock> locks);
public InterProcessMultiLock(CuratorFramework client, List<String> paths);
// 获取锁
public void acquire();
public boolean acquire(long time, TimeUnit unit);
// 释放锁
public synchronized void release();6. 信号量InterProcessSemaphoreV2一个计数的信号量类似JDK的Semaphore。JDK中Semaphore维护的一组许可(permits),而Cubator中称之为租约(Lease)。注意,所有的实例必须使用相同的numberOfLeases值。调用acquire会返回一个租约对象。客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。主要实现类InterProcessSemaphoreV2:// 构造方法
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);
// 注意一次你可以请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。
// 同时还提供了超时的重载方法
public Lease acquire();
public Collection<Lease> acquire(int qty);
public Lease acquire(long time, TimeUnit unit);
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
// 租约还可以通过下面的方式返还
public void returnAll(Collection<Lease> leases);
public void returnLease(Lease lease);案例代码:StockController中添加方法:
@GetMapping("test/semaphore")
public String testSemaphore(){
this.stockService.testSemaphore();
return "hello Semaphore";
}StockService中添加方法:public void testSemaphore() {
// 设置资源量 限流的线程数
InterProcessSemaphoreV2 semaphoreV2 = new InterProcessSemaphoreV2(curatorFramework, "/locks/semaphore", 5);
try {
Lease acquire = semaphoreV2.acquire();// 获取资源,获取资源成功的线程可以继续处理业务操作。否则会被阻塞住
this.redisTemplate.opsForList().rightPush("log", "10010获取了资源,开始处理业务逻辑。" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(10 + new Random().nextInt(10));
this.redisTemplate.opsForList().rightPush("log", "10010处理完业务逻辑,释放资源=====================" + Thread.currentThread().getName());
semaphoreV2.returnLease(acquire); // 手动释放资源,后续请求线程就可以获取该资源
} catch (Exception e) {
e.printStackTrace();
}
}7. 栅栏barrierDistributedBarrier构造函数中barrierPath参数用来确定一个栅栏,只要barrierPath参数相同(路径相同)就是同一个栅栏。通常情况下栅栏的使用如下:主client设置一个栅栏其他客户端就会调用waitOnBarrier()等待栅栏移除,程序处理线程阻塞主client移除栅栏,其他客户端的处理程序就会同时继续运行。DistributedBarrier类的主要方法如下:setBarrier() - 设置栅栏
waitOnBarrier() - 等待栅栏移除
removeBarrier() - 移除栅栏DistributedDoubleBarrier双栅栏,允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算,当计算完成时,离开栅栏。DistributedDoubleBarrier实现了双栅栏的功能。构造函数如下:// client - the client
// barrierPath - path to use
// memberQty - the number of members in the barrier
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty);
enter()、enter(long maxWait, TimeUnit unit) - 等待同时进入栅栏
leave()、leave(long maxWait, TimeUnit unit) - 等待同时离开栅栏memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。当leave方法被调用时,它也阻塞调用线程,直到所有的成员都调用了leave。注意:参数memberQty的值只是一个阈值,而不是一个限制值。当等待栅栏的数量大于或等于这个值栅栏就会打开!与栅栏(DistributedBarrier)一样,双栅栏的barrierPath参数也是用来确定是否是同一个栅栏的,双栅栏的使用情况如下:从多个客户端在同一个路径上创建双栅栏(DistributedDoubleBarrier),然后调用enter()方法,等待栅栏数量达到memberQty时就可以进入栅栏。栅栏数量达到memberQty,多个客户端同时停止阻塞继续运行,直到执行leave()方法,等待memberQty个数量的栅栏同时阻塞到leave()方法中。memberQty个数量的栅栏同时阻塞到leave()方法中,多个客户端的leave()方法停止阻塞,继续运行。8. 共享计数器利用ZooKeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数,一个用long来计数。8.1. SharedCount共享计数器SharedCount相关方法如下:// 构造方法
public SharedCount(CuratorFramework client, String path, int seedValue);
// 获取共享计数的值
public int getCount();
// 设置共享计数的值
public void setCount(int newCount) throws Exception;
// 当版本号没有变化时,才会更新共享变量的值
public boolean trySetCount(VersionedValue<Integer> previous, int newCount);
// 通过监听器监听共享计数的变化
public void addListener(SharedCountListener listener);
public void addListener(final SharedCountListener listener, Executor executor);
// 共享计数在使用之前必须开启
public void start() throws Exception;
// 关闭共享计数
public void close() throws IOException;使用案例:StockController:
@GetMapping("test/zk/share/count")
public String testZkShareCount(){
this.stockService.testZkShareCount();
return "hello shareData";
}StockService:public void testZkShareCount() {
try {
// 第三个参数是共享计数的初始值
SharedCount sharedCount = new SharedCount(curatorFramework, "/curator/count", 0);
// 启动共享计数器
sharedCount.start();
// 获取共享计数的值
int count = sharedCount.getCount();
// 修改共享计数的值
int random = new Random().nextInt(1000);
sharedCount.setCount(random);
System.out.println("我获取了共享计数的初始值:" + count + ",并把计数器的值改为:" + random);
sharedCount.close();
} catch (Exception e) {
e.printStackTrace();
}
}8.2. DistributedAtomicNumberDistributedAtomicNumber接口是分布式原子数值类型的抽象,定义了分布式原子数值类型需要提供的方法。DistributedAtomicNumber接口有两个实现:DistributedAtomicLong 和 DistributedAtomicInteger这两个实现将各种原子操作的执行委托给了DistributedAtomicValue,所以这两种实现是类似的,只不过表示的数值类型不同而已。这里以DistributedAtomicLong 为例进行演示DistributedAtomicLong除了计数的范围比SharedCount大了之外,比SharedCount更简单易用。它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。此计数器有一系列的操作:get(): 获取当前值increment():加一decrement(): 减一add():增加特定的值subtract(): 减去特定的值trySet(): 尝试设置计数值forceSet(): 强制设置计数值最后必须检查返回结果的succeeded(), 代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。
HBase 简介
1 HBase 定义Apache HBase 是以 hdfs 为数据存储的,一种分布式、可扩展的 NoSQL 数据库。2 HBase 数据模型HBase 的设计理念依据 Google 的 BigTable 论文,论文中对于数据模型的首句介绍。Bigtable 是一个 稀疏的、分布式的、持久的 多维排序 map。之后对于映射的解释如下:该映射由行键、列键和时间戳索引;映射中的每个值都是一个未解释的字节数组。最终 HBase 关于数据模型和 BigTable 的对应关系如下:HBase 使用与 Bigtable 非常相似的数据模型。用户将数据行存储在带标签的表中。数据行具有可排序的键和任意数量的列。该表存储稀疏,因此如果用户喜欢,同一表中的行可以具有疯狂变化的列。最终理解 HBase 数据模型的关键在于 稀疏、分布式、多维、排序 的映射。其中映射 map指代非关系型数据库的 key-Value 结构。2.1 HBase 逻辑结构HBase 可以用于存储多种结构的数据,以 JSON 为例,存储的数据原貌为:{
"row_key1":{
"personal_info":{
"name":"zhangsan",
"city":"北京",
"phone":"131********"
},
"office_info":{
"tel":"010-1111111",
"address":"xx区"
}
},
"row_key11":{
"personal_info":{
"city":"上海",
"phone":"132********"
},
"office_info":{
"tel":"010-1111111"
}
},
"row_key2":{
......
}HBase 逻辑结构2.2 HBase 物理存储结构物理存储结构即为数据映射关系,而在概念视图的空单元格,底层实际根本不存储。HBase 物理存储结构2.3 数据模型1)Name Space命名空间,类似于关系型数据库的 database 概念,每个命名空间下有多个表。HBase 两个自带的命名空间,分别是 hbase 和 default,hbase 中存放的是 HBase 内置的表,default表是用户默认使用的命名空间。2)Table类似于关系型数据库的表概念。不同的是,HBase 定义表时只需要声明列族即可,不需要声明具体的列。因为数据存储时稀疏的,所有往 HBase 写入数据时,字段可以 动态、按需指定。因此,和关系型数据库相比,HBase 能够轻松应对字段变更的场景。3)RowHBase 表中的每行数据都由一个 RowKey 和多个 Column(列)组成,数据是按照 RowKey的字典顺序存储的,并且查询数据时只能根据 RowKey 进行检索,所以 RowKey 的设计十分重要。4)ColumnHBase 中的每个列都由 Column Family(列族)和 Column Qualifier(列限定符)进行限定,例如 info:name,info:age。建表时,只需指明列族,而列限定符无需预先定义。5)Time Stamp用于标识数据的不同版本(version),每条数据写入时,系统会自动为其加上该字段,其值为写入 HBase 的时间。6)Cell由{rowkey, column Family:column Qualifier, timestamp} 唯一确定的单元。cell 中的数据全部是字节码形式存贮。3 HBase 基本架构架构角色:1)Master实现类为 HMaster,负责监控集群中所有的 RegionServer 实例。主要作用如下: (1)管理元数据表格 hbase:meta,接收用户对表格创建修改删除的命令并执行 (2)监控 region 是否需要进行负载均衡,故障转移和 region 的拆分。通过启动多个后台线程监控实现上述功能: ①LoadBalancer 负载均衡器 周期性监控 region 分布在 regionServer 上面是否均衡,由参数 hbase.balancer.period 控制周期时间,默认 5 分钟。 ②CatalogJanitor 元数据管理器 定期检查和清理 hbase:meta 中的数据。 ③MasterProcWAL master 预写日志处理器 把 master 需要执行的任务记录到预写日志 WAL 中,如果 master 宕机,让 backupMaster读取日志继续干2)Region ServerRegion Server 实现类为 HRegionServer,主要作用如下: (1)负责数据 cell 的处理,例如写入数据 put,查询数据 get 等 (2)拆分合并 region 的实际执行者,有 master 监控,有 regionServer 执行。3)Zookeeper HBase 通过 Zookeeper 来做 master 的高可用、记录 RegionServer 的部署信息、并且存储有 meta 表的位置信息。 HBase 对于数据的读写操作时直接访问 Zookeeper 的,在 2.3 版本推出 Master Registry模式,客户端可以直接访问 master。使用此功能,会加大对 master 的压力,减轻对Zookeeper的压力4)HDFSHDFS 为 Hbase 提供最终的底层数据存储服务,同时为 HBase 提供高容错的支持。
Gradle 入门
1、Gradle 入门1.1 Gradle 简介Gradle 是一款 Google 推出的基于JVM、通用灵活的项目构建工具,支持 Maven,JCenter 多种第三方仓库;支持传递性 依赖管理、废弃了繁杂的 xml 文件,转而使用简洁的、支持多种语言(例如:java、groovy 等)的 build 脚本文件。官网地址Gradle有用吗?目前已经有相当一部分公司在逐渐使用Gradle作为项目构建工具了作为Java开发程序员,如果想下载Spring、SpringBoot等Spring家族的源码,基本上基于Gradle构建的虽然目前市面上常见的项目构建工具有 Ant、Maven、Gradle,主流还是 Maven,但是未来趋势 Gradle1.2 常见的项目构建工具Ant: 2000 年 Apache 推出的纯 Java 编写构建工具,通过 xml[build.xml]文件管理项目 优点:使用灵活,速度快(快于 gradle 和 maven), 缺点:Ant 没有强加任何编码约定的项目目录结构,开发人员需编写繁杂 XML 文件构建指令,对开发人员是一个挑战.Maven: 2004 年 Apache 组织推出的再次使用 xml 文件[pom.xml]管理项目的构建工具。 优点: 遵循一套约定大于配置的项目目录结构,使用统一的 GAV 坐标进行依赖管理,侧重于包管理。 缺点:项目构建过程僵化,配置文件编写不够灵活、不方便自定义组件,构建速度慢于 gradle。Gradle: 2012 年 Google 推出的基于 Groovy 语言的全新项目构建工具,集合了 Ant 和 Maven 各自的优势。 优点:集 Ant 脚本的灵活性+Maven 约定大于配置的项目目录优势,支持多种远程仓库和插件,侧重于大项目构建。 缺点:学习成本高、资料少、脚本灵活、版本兼容性差等。1.3 Gradle 安装SpringBoot 官方文档明确指出: Spring Boot的Gradle插件需要Gradle 6.8, 6.9或7.x 查看其中 SpringBoot 与 Gradle 存在版本兼容问题,Gradle 与 Idea 也存在兼容问题,所以考虑到 java 程序员会使用 SpringBoot, 所以要选择 6.8 版本及高于 6.8 版本的 Gradle,那么相应的 idea 版本也要升级,不能太老.通过idea安装根目录plugins\gradle\lib查看idea对应Gradle版本1.3.1安装Gradle安装Gradle需要安装jdk8或以上版本的jdk下载Gradle并解压1.3.2配置环境变量接着再配置一个 GRALE_USER_HOME 环境变量:GRALE_USER_HOME 相当于配置 Gradle 本地仓库位置和 Gradle Wrapper 缓存目录。所以我这里直接配置的maven仓库路径1.4 检测是否安装成功gradle -v 或者 gradle --version: 通过gradle -v或者 gradle --version检测是否安装成功1.5 Gradle 项目目录结构Gradle 项目默认目录结构和 Maven 项目的目录结构一致,都是基于约定大于配置。 其完整项目目录结构如下所示:只有war工程才有webapp目录,对于普通的jar工程并没有webapp目录gradlew与gradlew.bat执行的指定wrapper版本中的gradle指令,不是本地安装的gradle指令。1.6 Gradle 中的常用指令常用Gradle指令作用gradle clean清楚build目录gradle classes编译业务代码和配置文件gradle test编译测试代码,生成测试报告gradle build构建项目gradle build -x test跳过测试,构建1.7 修改 maven 下载源认识 init.d 文件夹我们可以在 gradle 的 init.d 目录下创建以.gradle 结尾的文件,.gradle 文件可以实现在 build 开始之前执行,所以你可以在 这个文件配置一些你想预先加载的操作。在 init.d 文件夹创建 init.gradle 文件allprojects {
repositories {
mavenLocal()
maven{name "Alibaba" ; url "https://maven.aliyun.com/repository/public" }
maven { name "Bstek" ; url "https://nexus.bsdn.org/content/groups/public/" }
mavenCentral()
}
buildscript {
repositories {
maven { name "Alibaba" ; url 'https://maven.aliyun.com/repository/public'}
maven { name "Bstek" ; url 'https://nexus.bsdn.org/content/groups/public'}
maven { name "M2" ; url 'https://plugins.gradle.org/m2/' }
}
}
}启用 init.gradle 文件的方法有:在命令行指定文件,例如:gradle --init-script yourdir/init.gradle -q taskName。你可以多次输入此命令来指定多个init文件把init.gradle文件放到 USER_HOME/.gradle/ 目录下把以.gradle结尾的文件放到 USER_HOME/.gradle/init.d/ 目录下把以.gradle结尾的文件放到 GRADLE_HOME/init.d/ 目录下 如果存在上面的4种方式的2种以上,gradle会按上面的1-4序号依次执行这些文件,如果给定目录下存在多个init脚本,会 按拼音a-z顺序执行这些脚本,每个init脚本都存在一个对应的gradle实例,你在这个文件中调用的所有方法和属性,都会 委托给这个gradle实例,每个init脚本都实现了Script接口。仓库地址说明使用M2_HOME()需要配置M2_HOME环境变量,值为maven根目录mavenLocal(): 指定使用maven本地仓库,而本地仓库在配置maven时settings文件指定的仓库位置。如E:/repository,gradle 查找jar包顺序如下:USER_HOME/.m2/settings.xml >> M2_HOME/conf/settings.xml >> USER_HOME/.m2/repositorymaven { url 地址},指定maven仓库,一般用私有仓库地址或其它的第三方库【比如阿里镜像仓库地址】。 mavenCentral():这是Maven的中央仓库,无需配置,直接声明就可以使用。 jcenter():JCenter中央仓库,实际也是是用的maven搭建的,但相比Maven仓库更友好,通过CDN分发,并且支持https访 问,在新版本中已经废弃了,替换为了mavenCentral()。总之, gradle可以通过指定仓库地址为本地maven仓库地址和远程仓库地址相结合的方式,避免每次都会去远程仓库下载 依赖库。这种方式也有一定的问题,如果本地maven仓库有这个依赖,就会从直接加载本地依赖,如果本地仓库没有该 依赖,那么还是会从远程下载。但是下载的jar不是存储在本地maven仓库中,而是放在自己的缓存目录中,默认在 USER_HOME/.gradle/caches目录,当然如果我们配置过GRADLE_USER_HOME环境变量,则会放在 GRADLE_USER_HOME/caches目录,那么可不可以将gradle caches指向maven repository。我们说这是不行的,caches下载 文件不是按照maven仓库中存放的方式。阿里云仓库地址参考1.8 Wrapper 包装器Gradle Wrapper 实际上就是对 Gradle 的一层包装,用于解决实际开发中可能会遇到的不同的项目需要不同版本的 Gradle例如: 把自己的代码共享给其他人使用对方电脑没有安装 gradle对方电脑安装过 gradle,但是版本太旧了这时候,我们就可以考虑使用 Gradle Wrapper 了。这也是官方建议使用 Gradle Wrapper 的原因。实际上有了 Gradle Wrapper 之后,我们本地是可以不配置 Gradle 的,下载 Gradle 项目后,使用 gradle 项目自带的 wrapper 操作也是可以的。如何使用 Gradle Wrapper?项目中的gradlew、gradlew.cmd脚本用的就是wrapper中规定的gradle版本。而我们上面提到的gradle指令用的是本地gradle,所以gradle指令和gradlew指令所使用的gradle版本有可能是不一样的。 gradlew、gradlew.cmd的使用方式与gradle使用方式完全一致,只不过把gradle指令换成了gradlew指令。我们也可在终端执行 gradlew 指令时,指定指定一些参数,来控制 Wrapper 的生成,比如依赖的版本等参数名说明–gradle-version用于指定使用的Gradle版本–gradle-distribution-url用于指定下载Gradle发行版的url地址例如:gradle wrapper --gradle-version=4.4:升级wrapper版本号,只是修改gradle.properties中wrapper版本,未实际下载gradle wrapper --gradle-version 5.2.1 --distribution-type all :关联源码用GradleWrapper 的执行流程:1.当我们第一次执行 ./gradlew build 命令的时候,gradlew 会读取 gradle-wrapper.properties 文件的配置信息2.准确的将指定版本的 gradle 下载并解压到指定的位置(GRADLE_USER_HOME目录下的wrapper/dists目录中)3.并构建本地缓存(GRADLE_USER_HOME目录下的caches目录中),下载再使用相同版本的gradle就不用下载了4.之后执行的 ./gradlew 所有命令都是使用指定的 gradle 版本。gradle-wrapper.properties 文件解读:字段名说明distributionBase下载的Gradle压缩包解压后存储的主目录distributionPath相对于distributionBase的解压后的Gradle压缩包的路径zipStoreBase同distributionBase,只不过是存放zip压缩包的zipStroePath同distributionPath,只不过是存放zip压缩包的distributionUrlGradle发行版压缩包的下载地址注意: 前面提到的 GRALE_USER_HOME 环境变量用于这里的 Gradle Wrapper 下载的特定版本的 gradle 存储目录。如 果我们没有配置过 GRALE_USER_HOME 环境变量,默认在当前用户家目录下的.gradle 文件夹中。什么时候选择使用 gradle wrapper、什么时候选择使用本地 gradle?le Wrapper 下载的特定版本的 gradle 存储目录。如 果我们没有配置过 GRALE_USER_HOME 环境变量,默认在当前用户家目录下的.gradle 文件夹中。什么时候选择使用 gradle wrapper、什么时候选择使用本地 gradle?下载别人的项目或者使用操作以前自己写的不同版本的gradle项目时:用Gradle wrapper,也即:gradlew 什么时候使用本地gradle?新建一个项目时: 使用gradle指令即可。
WampServer 3 访问403 Forbidden You don't have permission to access this resource 解决
大多数百度到的答案仅适用于WampServer3以下的版本,这里我介绍一种适用于WampServer3版本解决方案问题:安装好WampServer3后无法通过公网IP进行访问,出现403 Forbidden错误解决方案:首先左键单击右下角WampServer3的图标(绿色的),然后依次找到httpd-vhosts.conf,注意不是httpd.conf,因为WampServer3中预定义了虚拟主机,所以要修改虚拟主机的配置才会起作用。打开httpd-vhosts.conf后,内容大概是这样的<VirtualHost *:80>
ServerName localhost
DocumentRoot C:/wamp/www
<Directory "C:/wamp/www/">
Options +Indexes +FollowSymLinks +MultiViews
AllowOverride All
Require local
</Directory>
</VirtualHost>修改倒数第3行Require local为Require all granted即可,修改后的配置文件如下<VirtualHost *:80>
ServerName localhost
DocumentRoot C:/wamp/www
<Directory "C:/wamp/www/">
Options +Indexes +FollowSymLinks +MultiViews
AllowOverride All
Require all granted
</Directory>
</VirtualHost>然后重新启动WampServer即可。需要注意的是,403 Forbidden也有可能是因为端口配置错误引起的,如果上述的方法无效,那么有两种方法可以参考。1、考虑修改httpd-vhosts.conf文件的第一行<VirtualHost *:80>后面的"80"为其他端口号,如<VirtualHost *:8080>。2、重新安装Apache,选择如图所示选项即可(需要先点击上面的"停止服务")重新安装完成之后重新启动WampServer即可,此时Apache的访问端口会被重置为80。
WAMP localhost/phpmyadmin 无法进入
安装wamp后,若遇到在浏览器地址栏输入localhost/phpmyadmin却无法进入,首先看一看右下角的wamp程序的图标是红色还是绿色(正常运行状态),如果是红色,右键点击它,再点击Tools->test 80 port如果出现Your port 80 is actually used by
....那就只需要重新启动wamp即可。如果出现Your port 80 seems not actually used Unable to
initiate a socket connection那就说明80端口被占用了。解决办法有两个更换端口。右键点击wamp->Tools->Use a port other than 80,更换一个端口即可重新安装Apache服务。既可以直接重新安装WampServer,也可以通过左键点击WampServer小图标,然后选择下图中的选项来重新安装(需要先点上面的"停止服务")
分布式事物-全面详解(学习总结---从入门到深化)(5)
Hmily实现TCC事务_转入转出微服务实现Confirm阶段 编写转出微服务Confirm阶段 /**
* 确认阶段
* @param userAccountDTO
*/
public void sayConfrim(UserAccountDTO userAccountDTO) {
String txNo = userAccountDTO.getTxNo();
log.info("********** 执行bank01 的 Confrim方法 ,事务id={}",txNo);
// 1、幂等处理
ConfirmLog confirmLog = confirmLogMapper.selectById(txNo);
if (confirmLog != null){
return ;
}
// 2、根据账户id查询账户
UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
baseMapper.updateById(userAccount);
// 3、 确认日志记录
ConfirmLog confirmLog1 = new ConfirmLog();
confirmLog1.setTxNo(userAccountDTO.getTxNo());
confirmLog1.setCreateTime(LocalDateTime.now());
confirmLogMapper.insert(confirmLog1);
}编写转入微服务Confirm阶段 /**
* 确认阶段
* @param userAccountDTO
*/
public void sayConfrim(UserAccountDTO userAccountDTO) {
String txNo = userAccountDTO.getTxNo();
log.info("********** 执行bank02 的Confrim方法 ,事务id={}",txNo);
// 1、幂等处理
ConfirmLog confirmLog = confirmLogMapper.selectById(txNo);
if (confirmLog != null) {
return;
}
// 2、根据账户id查询账户
UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo());
userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
userAccountMapper.updateById(userAccount);
// 3、 确认日志记录
ConfirmLog confirmLog1 = new ConfirmLog();
confirmLog1.setTxNo(userAccountDTO.getTxNo());
confirmLog1.setCreateTime(LocalDateTime.now());
confirmLogMapper.insert(confirmLog1);
}Hmily实现TCC分布式事务_转入转出微服务实现Cancel阶段转入微服务Cananl阶段 /**
* 回滚
* @param userAccountDto
*/
@Transactional(rollbackFor = Exception.class)
public void cancelMethod(UserAccountDto userAccountDto){
String txNo = userAccountDto.getTxNo();
log.info("执行bank02的cancel方法,事务id: {}, 参数为:{}",txNo,JSONObject.toJSONString(userAccountDto));
CancelLog cancelLog = iCancelLogService.findByTxNo(txNo);
if(cancelLog != null){
log.info("bank02已经执行过Cancel方法,txNo:{}", txNo);
return;
}
// 保存记录
iCancelLogService.saveCancelLog(txNo);
userAccountMapper.cancelUserAccountBalanceBank02(userAccountDto.getAmount(),
userAccountDto.getTargetAccountNo());
}转出微服务Cancel阶段 /**
* 取消阶段
* @param userAccountDTO
*/
public void sayCancel(UserAccountDTO userAccountDTO) {
String txNo = userAccountDTO.getTxNo();
log.info("********** 执行bank01 的 Cancel方法 ,事务id={}",txNo);
// 1. 幂等处理
CancelLog cancelLog = cancelLogMapper.selectById(txNo);
if (cancelLog != null ){
return;
}
// 2、根据账户id查询账户
UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
baseMapper.updateById(userAccount);
// 3、记录回滚日志
CancelLog cancelLog1 = new CancelLog();
cancelLog1.setTxNo(txNo);
cancelLog1.setCreateTime(LocalDateTime.now());
cancelLogMapper.insert(cancelLog1);
}最终一致性分布式事务解决方案_什么是可靠消息最终一致性事务 可靠消息最终一致性的基本原理是事务发起方(消息发送者)执行 本地事务成功后发出一条消息,事务参与方(消息消费者)接收到 事务发起方发送过来的消息,并成功执行本地事务。事务发起方和事务参与方最终的数据能够达到一致的状态。两种实现方式:1、基于本地消息表2、基于支持分布式事务的消息中间件,如RocketMQ等 基本原理 在使用可靠消息最终一致性方案解决分布式事务的问题时,需要确保消息发送和消息消费的一致性,从而确保消息的可靠性。可靠消息最终一致性分布式事务实现_本地消息表 本地消息表模式的核心通过本地事务保证数据业务操作和消息的一 致性,然后通过定时任务发送给消费方或者中间加一层MQ的方 式,保障数据最终一致性。库表设计订单微服务中出库本地消息表: 基础功能 分析 Task微服务的任务 可靠消息最终一致性分布式事务实现_RocketMQ事务消息 RocketMQ是阿里巴巴开源的一款支持事务消息的消息中间件,于 2012年正式开源,2017年成为Apache基金会的顶级项目。实现原理RocketMQ 4.3版之后引入了完整的事务消息机制,其内部实现了完 整的本地消息表逻辑,使用RocketMQ实现可靠消息分布式事务就 不用用户再实现本地消息表的逻辑了,极大地减轻了开发工作量。 可靠消息最终一致性分布式事务实战_案列业务介绍 业务介绍通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟 商城业务中的下单扣减库存场景。订单微服务和库存微服务分别独立开发和部署。 流程 架构选型 数据库表设计orders订单数据表orders数据表存储于tx-msg-orders订单数据库。DROP TABLE IF EXISTS `orders`;
CREATE TABLE `order` (
`id` bigint(20) NOT NULL COMMENT '主键',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`order_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '订单
编号',
`product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id',
`pay_count` int(11) NULL DEFAULT NULL COMMENT '购买数量',
PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
CREATE TABLE `tx_log` (
`tx_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;stock库存数据表DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
`id` bigint(20) NOT NULL COMMENT '主键id',
`product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id',
`total_count` int(11) NULL DEFAULT NULL COMMENT '商品总库存',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
-- ----------------------------
-- Table structure for tx_log
-- ----------------------------
DROP TABLE IF EXISTS `tx_log`;
CREATE TABLE `tx_log` (
`tx_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;tx_log事务记录表 可靠消息最终一致性分布式事务实战_Docker安装 RocketMQ 在安装RocketMQ之前,我们先了解一下RocketMQ的部署架构,了 解一下RocketMQ的组件,然后基于当前主流的Docker安装 RocketMQ,我们这里安装单台RocketMQ,但为了防止单节点故 障、保障高可用,生产环境建议安装RocketMQ集群。 安装NameServer拉取镜像docker pull rocketmqinc/rocketmq创建数据存储目录mkdir -p /docker/rocketmq/data/namesrv/logs
/docker/rocketmq/data/namesrv/store启动NameServerdocker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v
/docker/rocketmq/data/namesrv/logs:/root/logs \
-v
/docker/rocketmq/data/namesrv/store:/root/store
\
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq \
sh mqnamesrv 安装Brokerborder配置:创建 broker.conf 配置文件vim /docker/rocketmq/conf/broker.conf# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的
主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的
slave brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和
异步表示Master和Slave之间同步数据的机 制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷
盘和异步刷盘;SYNC_FLUSH消息写入磁盘后 才返回成功状
态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
brokerIP1 = 192.168.66.100
#剩余磁盘比例
diskMaxUsedSpaceRatio=99启动brokerdocker run -d --restart=always --name rmqbroker
--link rmqnamesrv:namesrv -p 10911:10911 -p
10909:10909 --privileged=true -v
/docker/rocketmq/data/broker/logs:/root/logs -v
/docker/rocketmq/data/broker/store:/root/store
-v
/docker/rocketmq/conf/broker.conf:/opt/rocketmq
-4.4.0/conf/broker.conf -e
"NAMESRV_ADDR=namesrv:9876" -e
"MAX_POSSIBLE_HEAP=200000000"
rocketmqinc/rocketmq sh mqbroker -c
/opt/rocketmq-4.4.0/conf/broker.conf 报错: 部署RocketMQ的管理工具 RocketMQ提供了UI管理工具,名为rocketmq-console,我们选择 docker安装#创建并启动容器
docker run -d --restart=always --name rmqadmin
-e "JAVA_OPTS=-
Drocketmq.namesrv.addr=192.168.66.100:9876 -
Dcom.rocketmq.sendMessageWithVIPChannel=false"
-p 8080:8080 pangliang/rocketmq-console-ng关闭防火墙(或者开放端口)#关闭防火墙
systemctl stop firewalld.service
#禁止开机启动
systemctl disable firewalld.service测试访问:http://192.168.66.101:8080/#/ (可以切换中文)可靠消息最终一致性分布式事务实战_实现订单微服务创建父工程rocketmq-msg 创建订单微服务子工程 引入依赖 <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starterweb</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connectorjava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-bootstarter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-bootstarter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>编写配置文件server:
port: 9090
spring:
application:
name: tx-msg-stock
datasource:
url: jdbc:mysql://192.168.66.100:3306/txmsg-order?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
name-server: 192.168.66.100:9876
producer:
group: order-group编写主启动类/**
* 订单微服务启动成功
*/
@Slf4j
@MapperScan("com.itbaizhan.order.mapper")
@SpringBootApplication
public class OrderMain9090 {
public static void main(String[] args) {
SpringApplication.run(OrderMain9090.class,args);
log.info("************* 订单微服务启动成功*******");
}
}代码生成package com.itbaizhan.utils;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
import java.util.Arrays;
import java.util.List;
public class CodeGenerator {
public static void main(String[] args) {
FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-order", "root", "123456")
.globalConfig(builder -> {
builder.author("itbaizhan")// 设置作者
.commentDate("MMdd") // 注释日期格式
.outputDir(System.getProperty("user.dir")+"/rocketmq-msg/orders"+ "/src/main/java/")
.fileOverride(); //覆盖文件
})
// 包配置
.packageConfig(builder -> {
builder.parent("com.itbaizhan.orders") // 包名前缀
.entity("entity")//实体类包名
.mapper("mapper")//mapper接口包名
.service("service"); //service包名
})
.strategyConfig(builder -> {
// 设置需要生成的表名
builder.addInclude(Arrays.asList("orders","tx_log"))
// 开始实体类配置
.entityBuilder()
// 开启lombok模型
.enableLombok()
//表名下划线转驼峰
.naming(NamingStrategy.underline_to_camel)
//列名下划线转驼峰
.columnNaming(NamingStrategy.underline_to_camel);
})
.execute();
}
}创建TxMessage类在项目的com.itbaizhan.orders.tx包下创建TxMessage类,主要用 来封装实现分布式事务时,在订单微服务、RocketMQ消息中间件 和库存微服务之间传递的全局事务消息,项目中会通过事务消息实现幂等。@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxMessage implements Serializable
{
private static final long serialVersionUID = -4704980150056885074L;
/**
* 商品id
*/
private Long productId;
/**
* 商品购买数量
*/
private Integer payCount;
/**
* 全局事务编号
*/
private String txNo;
}可靠消息最终一致性分布式事务实战_订单微服务业务层实现 业务逻辑层主要实现了用户提交订单后的业务逻辑。编写OrderService接口 /**
* 添加订单
* @param productId 商品id
* @param payCount 购买数量
*/
void save(Long productId,Integer payCount);
/**
* 提交订单同时保存事务信息
*/
void submitOrderAndSaveTxNo(TxMessage txMessage);
/**
* 提交订单
* @param productId 商品id
* @param payCount 购买数量
*/
void submitOrder(Long productId, Integer payCount);编写OrderService接口实现package com.itbaizhan.order.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.order.entity.Order;
import com.itbaizhan.order.entity.TxLog;
import com.itbaizhan.order.mapper.OrderMapper;
import com.itbaizhan.order.mapper.TxLogMapper;
import com.itbaizhan.order.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.itbaizhan.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.UUID;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-20
*/
@Slf4j
@Service
public class OrderServiceImpl extends
ServiceImpl<OrderMapper, Order> implements IOrderService {
@Resource
RocketMQTemplate rocketMQTemplate;
@Resource
private TxLogMapper txLogMapper;
/**
* 添加
* @param productId 商品id
* @param payCount 购买数量
*/
@Override
public void save(Long productId, Integer payCount) {
Order order = new Order();
// 订单创建时间
order.setCreateTime(LocalDateTime.now());
// 生产订单编号
order.setOrderNo(UUID.randomUUID().toString().replace("-",""));
// 商品id
order.setProductId(productId);
// 购买数量
order.setPayCount(payCount);
baseMapper.insert(order);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void submitOrderAndSaveTxNo(TxMessage txMessage) {
TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
if(txLog != null){
log.info("订单微服务已经执行过事务,商品id为:{},事务编号为:{}",txMessage.getProductId(),txMessage.getTxNo());
return;
}
//生成订单
this.save(txMessage.getProductId(),txMessage.getPayCount());
//生成订单
txLog = new TxLog();
txLog.setTxNo(txMessage.getTxNo());
txLog.setCreateTime(LocalDateTime.now());
//添加事务日志
txLogMapper.insert(txLog);
}
/**
* 提交订单
* @param productId 商品id
* @param payCount 购买数量
*/
@Override
public void submitOrder(Long productId,Integer payCount) {
//生成全局分布式序列号
String txNo = UUID.randomUUID().toString();
TxMessage txMessage = new TxMessage(productId, payCount, txNo);
JSONObject jsonObject = new JSONObject();
jsonObject.put("txMessage", txMessage);
Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build();
//发送事务消息 且该消息不允许消费
tx_order_group: 指定版事务消息组
rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg", message, null);
}
}可靠消息最终一致性分布式事务实战_订单微服务监听事务消息 执行本地的业务代码package com.itbaizhan.order.message;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.order.entity.TxLog;
import com.itbaizhan.order.mapper.TxLogMapper;
import com.itbaizhan.order.service.IOrderService;
import com.itbaizhan.order.service.ITxLogService;
import com.itbaizhan.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author itbaizhan
* @version 1.0.0
* @description 监听事务消息
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements
RocketMQLocalTransactionListener {
@Autowired
private IOrderService orderService;
@Resource
private TxLogMapper txLogMapper;
/**
* RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。
* 成功返回: RocketMQLocalTransactionState.COMMIT,
* 失败返回:RocketMQLocalTransactionState.ROLLBACK
*/
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj)
{
try {
log.info("订单微服务执行本地事务");
TxMessage txMessage = this.getTxMessage(msg);
//执行本地事务
orderService.submitOrderAndSaveTxNo(txMessage);
//提交事务
log.info("订单微服务提交事务");
// COMMIT:即生产者通知Rocket该消息可以消费
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
//异常回滚事务
log.info("订单微服务回滚事务");
// ROLLBACK:即生产者通知Rocket将该消息删除
return RocketMQLocalTransactionState.ROLLBACK;
}
}
private TxMessage getTxMessage(Message msg)
{
String messageString = new String((byte[]) msg.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
String txStr = jsonObject.getString("txMessage");
return JSONObject.parseObject(txStr,TxMessage.class);
}
}网络异常消息处理 /**
* 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地
* 事务是否已经执行成功,
* 成功返回: RocketMQLocalTransactionState.COMMIT,
* 失败返回:RocketMQLocalTransactionState.ROLLBACK
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("订单微服务查询本地事务");
TxMessage txMessage = this.getTxMessage(msg);
// 获取订单的消息
Integer exists = txLogService.isExistsTx(txMessage.getTxNo());
if (exists != null) {
// COMMIT:即生产者通知Rocket该消息可以消费
return RocketMQLocalTransactionState.COMMIT;
}
// UNKNOWN:即生产者通知Rocket继续查询该消息的状态
return RocketMQLocalTransactionState.UNKNOWN;
}
private TxMessage getTxMessage(Message msg)
{
String messageString = new String((byte[]) msg.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
String txStr = jsonObject.getString("txMessage");
return JSONObject.parseObject(txStr,TxMessage.class);
}可靠消息最终一致性分布式事务实战_实现库存微服务创建库存微服务tx-msg-stock 引入依赖 <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starterweb</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connectorjava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-bootstarter</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-bootstarter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>编写配置文件server:
port: 6060
spring:
application:
name: tx-msg-stock
datasource:
url: jdbc:mysql://192.168.66.100:3306/txmsg-stock?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
username: root
password01: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
name-server: 192.168.66.100:9876编写主启动类package com.itbaizhan.stock;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author itbaizhan
* @version 1.0.0
* @description 库存微服务启动类
*/
@MapperScan("com.itbaizhan.stock.mapper")
@Slf4j
@SpringBootApplication
public class StockServerStarter {
public static void main(String[] args) {
SpringApplication.run(StockServerStarter.class, args);
log.info("**************** 库存服务启动成功 ***********");
}
}代码生成package com.itbaizhan.utils;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
import java.util.Arrays;
import java.util.List;
public class CodeGenerator {
public static void main(String[] args) {
FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-stock", "root", "123456")
.globalConfig(builder -> { builder.author("itbaizhan")// 设置作者
.commentDate("MMdd") // 注释日期格式
.outputDir(System.getProperty("user.dir") +"/rocketmq-msg/stock"+ "/src/main/java/")
.fileOverride(); //覆盖文件
})
// 包配置
.packageConfig(builder -> {
builder.parent("com.itbaizhan.stock") // 包名前缀
.entity("entity")//实体类包名
.mapper("mapper")//mapper接口包名
.service("service"); //service包名
})
.strategyConfig(builder -> {
// 设置需要生成的表名
builder.addInclude(Arrays.asList("stock","tx_log"))
// 开始实体类配置
.entityBuilder()
// 开启lombok模型
.enableLombok() //表名下划线转驼峰
.naming(NamingStrategy.underline_to_camel)
//列名下划线转驼峰
.columnNaming(NamingStrategy.underline_to_camel);
})
.execute();
}
}编写库存接口public interface StockService {
/**
* 根据id查询库存
* @param id
* @return
*/
Stock getStockById(Long id);
/**
* 扣减库存
*/
void decreaseStock(TxMessage txMessage);
}可靠消息最终一致性分布式事务实战_库存微服务业务层实现 库存微服务的业务逻辑层主要监听RocketMQ发送过来的事务消 息,并在本地事务中执行扣减库存的操作。编写库存接口 /**
* 扣减库存
*/
void decreaseStock(TxMessage txMessage);库存接口实现类package com.itbaizhan.stock.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.itbaizhan.stock.entity.Stock;
import com.itbaizhan.stock.entity.TxLog;
import com.itbaizhan.stock.mapper.StockMapper;
import com.itbaizhan.stock.mapper.TxLogMapper;
import com.itbaizhan.stock.service.IStockService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.itbaizhan.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-20
*/
@Slf4j
@Service
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements
IStockService {
@Resource
private StockMapper stockMapper;
@Resource
private TxLogMapper txLogMapper;
@Transactional
@Override
public void decreaseStock(TxMessage txMessage) {
log.info("库存微服务执行本地事务,商品id:{},购买数量:{}", txMessage.getProductId(),
txMessage.getPayCount());
//检查是否执行过事务
TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
if(txLog != null){
log.info("库存微服务已经执行过事务,事务编号为:{}", txMessage.getTxNo());
}
// 根据商品id查询库存
QueryWrapper<Stock> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("product_id",txMessage.getProductId());
Stock stock = stockMapper.selectOne(queryWrapper);
if(stock.getTotalCount() < txMessage.getPayCount()){
throw new RuntimeException("库存不足");
}
// 减库存
stock.setTotalCount(stock.getTotalCount()- txMessage.getPayCount());
stockMapper.updateById(stock);
//生成订单
txLog = new TxLog();
txLog.setTxNo(txMessage.getTxNo());
txLog.setCreateTime(LocalDateTime.now());
//添加事务日志
txLogMapper.insert(txLog);
}
}库存微服务消费者实现用于消费RocketMQ发送过来的事务消息,并且调用StockService中的decreaseStock(TxMessage)方法扣减库存。库存事务消费者package com.itbaizhan.stock.message;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.stock.service.IStockService;
import com.itbaizhan.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author binghe
* @version 1.0.0
* @description 库存事务消费者
*/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements
RocketMQListener<String> {
@Autowired
private IStockService stockService;
@Override
public void onMessage(String message) {
log.info("库存微服务开始消费事务消息:{}", message);
TxMessage txMessage = this.getTxMessage(message);
stockService.decreaseStock(txMessage);
}
private TxMessage getTxMessage(String msg){
JSONObject jsonObject = JSONObject.parseObject(msg);
String txStr = jsonObject.getString("txMessage");
return JSONObject.parseObject(txStr,TxMessage.class);
}
}可靠消息最终一致性分布式事务实战_测试程序查询数据正式测试之前,先来查询下tx-msg-orders数据库和tx-msg-stock数 据库各个数据表中的数据。 分别启动库存和订单微服务编写控制层接口@Autowired
private IOrderService iOrderService;
/**
* 创建订单
* @param productId 商品id
* @param payCount 购买数量
* @return
*/
@GetMapping(value = "/submit_order")
public String transfer(@RequestParam("productId")Long productId, @RequestParam("payCount") Integer payCount){
iOrderService.submitOrder(productId, payCount);
return "下单成功";
}分别启动库存微服务stock和订单微服务orders,并在浏览器中访问 http://localhost:9090/order/submit_order?productId=1001&pay Count=1 最终一致性分布式事务解决方案_什么是最大努力通知型分布式事务 最大努力通知型( Best-effort delivery)是最简单的一种柔性事务。适用场景最大努力通知型解决方案适用于最终一致性时间敏感度低的场景。 最典型的使用场景就是支付成功后,支付平台异步通知商户支付结 果。并且事务被动方的处理结果不会影响主动方的处理结果。 典型的使用场景:如银行通知、商户通知等。 流程图 最大努力通知型分布式事务_最大努力通知与可靠消息最终一致性的区别 最大努力通知型分布式事务解决方案 流程:1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。2、接收通知方监听 MQ。3、接收通知方接收消息,业务处理完成回应ack。4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、 30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用 rocketMq,在 broker中可进行配置),直到达到通知要求的时间窗口上限。5、接收通知方可通过消息校对接口来校对消息的一致性。 最大努力通知型分布式事务_案例业务说明 设计完数据库后,创建tx-notifymsg-account库SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for account_info
-- ----------------------------
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
`id` int(11) NOT NULL COMMENT '主键id',
`account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '账户',
`account_name` varchar(255) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'账户名',
`account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of account_info
-- ----------------------------
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info` (
`tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号',
`account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
`pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
`pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
`pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;设计完数据库后,创建tx-notifymsg-payment库SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info` (
`tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水
号',
`account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
`pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
`pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
`pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;最大努力通知型分布式事务实战_实现充值微服务 主要实现功能1、充值接口2、查询充值结果接口 创建父项目rocketmq-notifymsg 创建子工程 引入依赖 <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starterweb</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-bootstarter</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connectorjava</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-bootstarter</artifactId>
<version>2.0.1</version>
</dependency>
<!-- 引入nacos依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
</dependency>
</dependencies>编写主启动类@EnableDiscoveryClient
@MapperScan("com.itbaizhan.payment.mapper")
@SpringBootApplication
@Slf4j
public class PayMain7071 {
public static void main(String[] args) {
SpringApplication.run(PayMain7071.class,args);
log.info("*********** 充值服务启动成功*********");
}
}编写配置文件server:
port: 7071
spring:
cloud:
nacos:
discovery:
server-addr: 192.168.66.100:8848
application:
name: tx-notifymsg-pay
datasource:
url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-payment?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
username: root
password01: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
name-server: 192.168.66.100:9876
producer:
group: payment-group最大努力通知型分布式事务_充值微服务之业务层实现 充值微服务的业务逻辑层主要完成充值的业务逻辑处理,当充值成功时,会向RocketMQ发送充值结果信息,同时提供业务逻辑层查询充值结果信息的接口。编写充值接口public interface IPayInfoService extends IService<PayInfo> {
/**
* 保存充值信息
*/
PayInfo savePayInfo(PayInfo payInfo);
/**
* 查询指定的充值信息
*/
PayInfo getPayInfoByTxNo(String txNo);
}充值接口实现@Slf4j
@Service
public class PayInfoServiceImpl extends
ServiceImpl<PayInfoMapper, PayInfo> implements IPayInfoService {
@Resource
private PayInfoMapper payInfoMapper;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public PayInfo savePayInfo(PayInfo payInfo)
{
payInfo.setTxNo(UUID.randomUUID().toString().replace("-",""));
payInfo.setPayResult("success");
payInfo.setPayTime(LocalDateTime.now());
int count = payInfoMapper.insert(payInfo);
//充值信息保存成功
if(count > 0){
log.info("充值微服务向账户微服务发送结果消息");
//发送消息通知账户微服务
rocketMQTemplate.convertAndSend("topic_nofitymsg",JSON.toJSONString(payInfo));
return payInfo;
}
return null;
}
@Override
public PayInfo getPayInfoByTxNo(String txNo) {
return baseMapper.selectById(txNo);
}
}编写充值接口@RestController
@RequestMapping("/payInfo")
public class PayInfoController {
@Autowired
private IPayInfoService payInfoService;
/**
* 充值
* @param payInfo
* @return
*/
@GetMapping(value = "/pay_account")
public PayInfo pay(PayInfo payInfo){
//生成事务编号
return payInfoService.savePayInfo(payInfo);
}
/**
* 查询充值结果
* @param txNo
* @return
*/
@GetMapping(value = "/query/payresult/{txNo}")
public PayInfo payResult(@PathVariable("txNo") String txNo){
return payInfoService.getPayInfoByTxNo(txNo);
}
}最大努力通知型分布式事务_实现账户微服务创建子工程account引入依赖 <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starterweb</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-bootstarter</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connectorjava</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-bootstarter</artifactId>
<version>2.0.1</version>
</dependency>
<!-- 引入Nacos注册中心 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
</dependency>
<!-- 引入OpenFeign -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starteropenfeign</artifactId>
</dependency>
<!-- 引入负载均衡器-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloudloadbalancer</artifactId>
</dependency>
</dependencies>编写配置文件server:
port: 7070
spring:
cloud:
nacos:
discovery:
server-addr: 192.168.66.100:8848
application:
name: tx-notifymsg-account
datasource:
url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-account?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
username: root
password01: 123456
driver-class-name: com.mysql.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
name-server: 192.168.66.100:9876最大努力通知型分布式事务_账户微服务之业务层实现 RocketMQ消费充值信息@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_account", topic = "topic_nofitymsg")
public class NotifyMsgAccountListener implements RocketMQListener<String> {
@Autowired
private IAccountInfoService accountInfoService;
@Override
public void onMessage(String message) {
log.info("账户微服务收到RocketMQ的消息: {}", JSONObject.toJSONString(message));
//如果是充值成功,则修改账户余额
PayInfo payInfo = JSON.parseObject(message, PayInfo.class);
if("success".equals(payInfo.getPayResult())){
accountInfoService.updateAccountBalance(payInfo);
}
log.info("更新账户余额完毕:{}", JSONObject.toJSONString(payInfo));
}
}编写账户操作接口 /**
* 更新账户余额
*/
void updateAccountBalance(PayInfo payInfo);实现账户操作接口@Slf4j
@Service
public class AccountInfoServiceImpl extends
ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
@Resource
private AccountInfoMapper accountInfoMapper;
@Resource
private PayInfoMapper payInfoMapper;
/**
*
* @param payInfo
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void updateAccountBalance(PayInfo payInfo) {
if(payInfoMapper.selectById(payInfo.getTxNo()) != null){
log.info("账户微服务已经处理过当前事务...");
return;
}
LambdaUpdateWrapper<AccountInfo> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.eq(AccountInfo::getAccountNo,payInfo.getAccountNo());
//更新账户余额
List<AccountInfo> accountInfos = baseMapper.selectList(lambdaUpdateWrapper);
if (accountInfos != null && !accountInfos.isEmpty()){
AccountInfo accountInfo = accountInfos.get(0);
accountInfo.setAccountBalance(accountInfo.getAccountBalance().add(payInfo.getPayAmount()));
accountInfoMapper.updateById(accountInfo);
}
//保存充值记录
payInfoMapper.insert(payInfo);
}
}最大努力通知型分布式事务_账户微服务远程调用实现 主启动类加Feign注解@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.itbaizhan.account.mapper")
@SpringBootApplication
@Slf4j
public class AccountMain7070 {
public static void main(String[] args) {
SpringApplication.run(AccountMain7070.class,args);
log.info("*********** AccountMain7070启动成功 *********");
}
}编写远程调用接口@Service
@FeignClient("tx-notifymsg-pay")
public interface IPayFeignService {
@GetMapping(value = "/payInfo/query/payresult/{txNo}")
PayInfo payResult(@PathVariable("txNo") String txNo);
}编写查询账户接口 /**
* 查询充值结果
*/
PayInfo queryPayResult(String txNo);实现查询账户信息 /**
* 查询结果
* @param txNo
* @return
*/
@Override
public PayInfo queryPayResult(String txNo)
{
try{
return iPayFeignService.payResult(txNo);
}catch (Exception e){
log.error("查询充值结果异常:{}", e);
}
return null;
}编写查询充值结果接口 /**
* 主动查询充值结果
* @param txNo
* @return
*/
@GetMapping(value = "/query/payresult/{txNo}")
public ResponseEntity result(@PathVariable("txNo") String txNo){
return ResponseEntity.ok(accountInfoService.queryPayResult(txNo));
}最大努力通知型分布式事务_测试程序查看account库和payment库数据 启动账户和充值微服务调用充值微服务的接口http://localhost:7071/payInfo/pay_accoun t为账户编号为1001的账户充值1000元。 账户微服务的日志文件中输出如下信息 可以看到,充值微服务将充值结果信息成功发送到了RocketMQ, 并且账户微服务成功订阅了RocketMQ的消息并执行了本地事务。查询充值结果
ECS配置mySQL\MariaDB和PHP环境
我们使用Apache+PHP+MySQL来配置环境 操作系统:CentOS一、配置Apacheyum install httpd
//之间过程根据提示输入Y允许安装安装之后开启Apache/bin/systemctl start httpd.service二、配置MySQL(MariaDB) MariaDB数据库管理系统是MySQL的一个分支,完全兼容MySQL,MySQL被Oracle收购之后,或许有些Linux发行版因为版权的问题都改为MariaDB 了.分别执行下列5条命令,之间过程按y继续yum install mysql mysql-server
yum install -y mariadb-server
systemctl start mariadb.service
systemctl enable mariadb.service
mysql_sceure_installation在执行mysql_sceure_installation后会进入MySQL设置Enter current password for root (enter for none):
//全新安装时root密码为空,按回车键继续即可
Set root password? [Y/n]
//进入密码设置,按Y回车继续
New password:
Re-enter new password:
//密码输入两次即可
Remove anonymous users? [Y/n]
//是否删除匿名用户,按Y回车继续
Disallow root login remotely? [Y/n]
//是否禁止root远程登录,根据自己的需求选择Y/n回车继续
Remove test database and access to it? [Y/n]
//是否删除test数据库,按n继续
Reload privilege tables now? [Y/n]
//按Y回车继续
Success!三、配置PHP 分别执行下面两条命令yum install php
yum install php-mysql php-gd libjpeg* php-imap php-ldap php-odbc php-pear php-xml php-xmlrpc php-mbstring php-mcrypt php-bcmath php-mhash libmcrypt到此即完成了PHP环境配置
The superclass "javax.servlet.http.HttpServlet" was not found on the Java Build Path类似问题简单解决方案
当出现The superclass “javax.servlet.http.HttpServlet” was not found on the Java Build Path类似问题的时候(见下图),解决方案见下面几个步骤。一、选择出现该问题的文件工程,然后右击文件工程,再点击选择红色箭头2指向的红色框中的内容(Bulid Path),然后再点击红色箭头3指向的红色框中的内容(Configure Bulid Path)进行路径的重定向。二、出现新页面之后,选择点击(Libraries),然后再点击红色箭头2指向的红色框中的内容(Classpath),然后再选择点击红色箭头3指向的红色框中的(Add Library…)。三、出现新页面之后,选择红色箭头1指向的红色框中的内容(Server Runtime),然后再选择点击红色箭头2指向的红色框中的内容(Next)。四、选择红色箭头1指向的红色框中的内容(Apache Tomcat v7.0,注意:博主在自己电脑下载的Eclipse中安装的Tomcat是这个,若大家没有安装Tomcat的可以参考博主的文章,点击打开文章链接:Eclipse 服务器配置之安装 Tomcat 服务器,有的自己选择自己安装版本的Tomcat就行),然后点击红色箭头2指向的红色框中的内容(Finsh)。五、然后再选择点击红色箭头指向的红色框中的内容(Apply and Close)。六、然后再看之前的代码,没有报错就解决问题了。
解决Failed resolution of: Lorg/apache/http/client/methods/HttpEntityEnclosingRequestBase的方案
博主在主做安卓课设时,由于换了安卓虚拟机的路径,重新换了个安卓虚拟机,所以遇到了上面的错误,经过一番修修改改,发现一个很好的简单解决方式仅供大家参考借鉴尝试。在出现该问题的安卓工程文件中首先找到manifests文件,然后再点击打开AndroidManifest.XML文件,将下面的代码复制到application标签中(注意代码放置位置),然后再重新运行应该就不会报错了。<uses-library android:name="org.apache.http.legacy" android:required="false" />