一、服务器的动态感知
(1) 描述过程
使用 ZooKeeper 实现服务器的动态感知,动态获取应用服务(秒杀服务)的上线、宕机情况,并能够让客户端(如 Java 客户端)知道。
① 每个集群服务器的秒杀服务都需要连接 ZooKeeper 客户端。当上线秒杀服务的时候,往 ZooKeeper 的指定目录创建临时顺序节点,并写入当前秒杀服务器的信息(ip 地址、端口等)
为什么是【临时顺序节点】?
- 临时节点的特点:当会话断开的时候,临时节点会自动被移除(可实现服务器宕机的时候自动移除服务器在 ZooKeeper 注册中心的信息)
- 顺序节点的特点:顺序节点的节点名字可以一样,ZooKeeper 会自动在节点的名字后面加上顺序号
② Java 客户端连接 ZooKeeper 服务器。获取 ZooKeeper 指定目录(秒杀服务目录)的子节点列表信息(秒杀服务的地址列表信息),并注册永久的节点改变事件(是秒杀服务上线和宕机的时候可以通知 Java 客户端的必须条件)
(2) 秒杀服务端代码
@Configuration
public class ZooKeeperClient {
private static final String CONNECTING_STRING = "192.168.80.129:2888:3888,192.168.80.129:2888:3888,192.168.80.129:2889:3889";
private static final int SESSION_TIMEOUT = 30000;
private static final String BASE_PATH = "/server";
private static final String SUB_PATH = "/seckillServer";
@Value("${server.host}")
private String seckillHost;
@Value("${server.port}")
private String seckillPort;
private ZooKeeper zooKeeper;
/**
* 连接 ZooKeeper
*/
@Bean
public ZooKeeper zooKeeper() throws Exception {
zooKeeper = new ZooKeeper(CONNECTING_STRING, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("connect zookeeper successfully");
String s = writeSeckillServerAddrInfo(seckillHost + ":" + seckillPort);
System.out.println(seckillHost + ":" + seckillPort);
System.out.println("创建节点: " + s);
}
}
});
return zooKeeper;
}
/**
* 往 ZooKeeper 的指定目录创建子节点, 在子节点中存储当前秒杀服务器的地址信息
*/
private String writeSeckillServerAddrInfo(String addrInfo) {
try {
String nodePath = zooKeeper.create(BASE_PATH + SUB_PATH,
addrInfo.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
return "success_" + nodePath;
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
}
# 应用名称
spring.application.name=seckill-server
# 应用服务 WEB 访问端口
server.port=8080
server.host=111.111.111.111
(3) 客户端代码
/**
* Java 客户端连接上 ZooKeeper
*/
@Configuration
public class ZooKeeperClient {
private static final String CONNECTING_STRING = "192.168.80.129:2888:3888,192.168.80.129:2888:3888,192.168.80.129:2889:3889";
private static final int SESSION_TIMEOUT = 30000;
private static final String BASE_PATH = "/server";
private static final String SUB_PATH = "/seckillServer";
private ZooKeeper zooKeeper;
/**
* 连接 ZooKeeper
*/
@Bean
public ZooKeeper zooKeeper() throws Exception {
zooKeeper = new ZooKeeper(CONNECTING_STRING, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("connect zookeeper successfully");
List<String> addrInfo = getChildNodeAddrInfo();
if (addrInfo != null) {
for (String info : addrInfo) {
System.out.println("地址信息: " + info);
}
}
}
}
});
return zooKeeper;
}
/**
* 获取子节点列表
*/
private List<String> getChildNodeList() {
try {
Stat stat = zooKeeper.exists(BASE_PATH, false);
if (stat == null) throw new IllegalArgumentException("路径节点不存在");
return zooKeeper.getChildren(BASE_PATH, false);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取子节点列表上的地址信息
*/
private List<String> getChildNodeAddrInfo() {
try {
ArrayList<String> childNodeData = new ArrayList<>();
Stat stat = zooKeeper.exists(BASE_PATH, false);
if (stat == null) throw new IllegalArgumentException("路径节点不存在");
List<String> childNodeList = getChildNodeList();
if (childNodeList == null) return null;
for (String childPath : childNodeList) {
byte[] data = getNodeData(BASE_PATH + "/" + childPath);
if (data != null) {
childNodeData.add(new String(data));
}
}
addWatch();
return childNodeData;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取节点数据
*/
private byte[] getNodeData(String path) {
try {
Stat stat = zooKeeper.exists(path, false);
if (stat == null) throw new IllegalArgumentException("路径节点不存在");
return zooKeeper.getData(path, false, stat);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 注册节点改变事件
*/
private void addWatch() {
try {
zooKeeper.addWatch(BASE_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
List<String> childNodeAddrInfo = getChildNodeAddrInfo();
if (childNodeAddrInfo != null) {
for (String info : childNodeAddrInfo) {
System.out.println(info);
}
}
}
}, AddWatchMode.PERSISTENT);
} catch (Exception e) {
e.printStackTrace();
}
}
}
# 应用名称
spring.application.name=java-client
# 应用服务 WEB 访问端口
server.port=8123
二、分布式锁
(1) 什么是锁?
多个任务同时执行的时候(多线程、多进程),① 若任务是需要对同一资源进行写操作,② 对于资源的访问,不能多个任务同时执行(同一时间只能一个任务访问资源)。
跨 JVM 的分布式程序之间进行数据共享需要使用分布式锁。
(2) 锁的基本概念
① 竞争锁:任务通过竞争获取到锁后才能对资源进行操作
公平竞争:按照一定的顺序,先来先获得锁
非公平竞争:没有顺序,谁抢到誰获得锁
② 占有锁:获得锁,有任务正在对资源进行更新操作
③ 任务阻塞:A、B、C 三个进程。A 占有锁的时候,B 和 C 只能等待,此时是阻塞状态
④ 释放锁:对资源的访问结束后,要释放锁
(3) 非分布式环境下的锁的使用
创建订单问题(多线程)
public class OrderService {
// 多个线程会对 count 的值进行改变
private int count = 0;
public synchronized String createOrderId() {
try {
TimeUnit.MICROSECONDS.sleep(666);
} catch (Exception e) {
e.printStackTrace();
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
count++;
return sdf.format(new Date()) + "-" + count;
}
}
public class OrderController implements Runnable {
private static OrderService orderService = new OrderService();
private static Set<String> orderIds = new HashSet<>();
// 用于结束进程
private static CountDownLatch countDownLatch = new CountDownLatch(50);
public static void main(String[] args) throws Exception {
OrderController orderController = new OrderController();
for (int i = 0; i < 50; i++) { // 创建 50 个线程
new Thread(orderController).start();
}
// 所有线程执行完之后才执行后面的代码
countDownLatch.await();
System.out.println(orderIds.size());
for (String orderId : orderIds) {
System.out.println(orderId);
}
}
@Override
public void run() {
orderIds.add(orderService.createOrderId());
// 执行完一个线程后减 1
countDownLatch.countDown();
}
}
(4) 分布式锁的实现
① 创建同名节点
a. 代码和画图
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private ZooKeeper zooKeeper;
// 用于发送网络请求
private RestTemplate networkReq = new RestTemplate();
private static final String BASE_PATH = "/locks";
private static final String SUB_PATH = "/lock";
private static final String FULL_PATH = BASE_PATH + SUB_PATH;
@Value("${server.port}")
private String port;
@GetMapping("/createOrder")
public String createOrder() {
if (tryGetLock()) {
String orderId = networkReq.getForObject(
"http://localhost:8080/orderIds/getId",
String.class);
System.out.println("【" + port + "】orderId = " + orderId);
releaseLock();
} else {
waitLock();
}
return port + " createOrder";
}
/**
* 尝试获取锁
* <p>
* 往指定目录创建同名节点 (locks)
* 创建成功获得锁资源
*/
private boolean tryGetLock() {
try {
String path;
path = zooKeeper.create(FULL_PATH,
"同名节点".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
return !"".equals(path);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 释放锁资源
*/
private void releaseLock() {
try {
Stat stat = zooKeeper.exists(FULL_PATH, false);
zooKeeper.delete(FULL_PATH, stat.getVersion());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 等待锁
* 绑定 /server 节点的子节点改变事件
*/
private void waitLock() {
try {
zooKeeper.getChildren(BASE_PATH, event -> {
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
System.out.println("【waitLock】子节点改变事件");
createOrder(); // 再次执行业务
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Configuration
public class ZooKeeperClient {
private static final String CONNECTING_STRING =
"192.168.80.129:2888:3888,192.168.80.129:2888:3888,192.168.80.129:2889:3889";
private static final int SESSION_TIMEOUT = 30000;
@Bean
public ZooKeeper zooKeeper() throws IOException {
return new ZooKeeper(CONNECTING_STRING,
SESSION_TIMEOUT,
event -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("【连接 ZooKeeper】成功");
}
}
);
}
}
@RestController
@RequestMapping("/orderIds")
public class OrderIdController {
private int count = 0;
@GetMapping("/getId")
public String getId() {
String id = null;
try {
TimeUnit.MILLISECONDS.sleep(50);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
count++;
id = sdf.format(new Date()) + "-" + count;
} catch (InterruptedException e) {
e.printStackTrace();
}
return id;
}
}
性能差、且不是公平的。
b. 异常处理(错误处理)
String orderId = networkReq.getForObject(
"http://localhost:8123/orderIds/getId",
String.class);
上面的请求路径中必须要加 http
② 创建临时顺序节点
/**
* 创建临时顺序节点
*/
@RestController
@RequestMapping("/orders2")
public class OrderController2 {
@Autowired
private ZooKeeper zooKeeper;
// 用于发送网络请求
private RestTemplate networkReq = new RestTemplate();
private static final String BASE_PATH = "/locks2";
private static final String SUB_PATH = "/lock";
private static final String FULL_PATH = BASE_PATH + SUB_PATH;
@Value("${server.port}")
private String port;
@GetMapping("/createOrder")
public String createOrder() {
try {
// 所有要获取锁资源的服务器都在 locks2 节点下创建 lock 临时顺序
// curPath: /locks2/lock000000
String curPath = zooKeeper.create(FULL_PATH,
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// 【lock000001】
String curPathTail = curPath.substring(curPath.lastIndexOf("/") + 1);
// 尝试获取锁
if (tryGetLock(curPathTail)) {
reqCreateOrderId();
releaseLock(curPathTail);
} else {
waitLock(curPath);
}
} catch (Exception e) {
e.printStackTrace();
}
return port + " createOrder";
}
/**
* 发请求执行业务方法
*/
private void reqCreateOrderId() {
String orderId = networkReq.getForObject(
"http://localhost:8080/orderIds/getId",
String.class);
System.out.println("【" + port + "】orderId = " + orderId);
}
/**
* 尝试获取锁
* <p>
* 获取 locks 节点下的所有子节点
* 子节点序号最小的获得锁资源
*/
private boolean tryGetLock(String curPath) {
try {
List<String> childNodes = zooKeeper.getChildren(BASE_PATH, false);
if (childNodes == null) return false;
// 对子节点进行升序排序
Collections.sort(childNodes);
// 判断当前节点是否是所有节点列表中最小的节点
return curPath.equals(childNodes.get(0));
// return StringUtils.pathEquals(curPath, childNodes.get(0));
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 释放锁资源
*/
private void releaseLock(String curPath) {
try {
Stat stat = zooKeeper.exists(BASE_PATH + "/" + curPath, false);
if (stat == null) throw new IllegalArgumentException("curPath 路径(节点)不存在");
zooKeeper.delete(BASE_PATH + "/" + curPath, stat.getVersion());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 等待锁
* 绑定前一个节点的删除事件
*/
private void waitLock(String curPath) {
try {
// 获取子节点列表
List<String> children = zooKeeper.getChildren(BASE_PATH, false);
Collections.sort(children);
int curPathIdx = children.indexOf(curPath); // 当前节点的索引
if (curPathIdx > 0) {
String prePath = children.get(curPathIdx - 1);
zooKeeper.getData(BASE_PATH + "/" + prePath, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
System.out.println("【waitLock】前一个节点删除事件");
reqCreateOrderId();
releaseLock(curPath);
}
}, new Stat());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}