分布式系列教程(10) -分布式协调工具Zookeeper(负载均衡原理实现)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 分布式系列教程(10) -分布式协调工具Zookeeper(负载均衡原理实现)

代码已上传到Gtihub,有兴趣的同学可以下载来看看(git版本号90275563ea2f9efc047e62427f6b9d472fe47b7f):https://github.com/ylw-github/Zookeeper-Demo

使用Zookeeper实现负载均衡原理:

  • 服务器端将启动的服务注册到ZK注册中心上,采用临时节点。
  • 客户端从zk节点上获取最新服务节点信息,本地使用负载均衡算法,随机分配服务器。

下面来开始讲解代码的实现

ZK负载均衡实现

传统模式

1.添加maven依赖:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.8</version>
</dependency>

2.创建Server服务端:

ZkServerScoekt服务:

package com.ylw.zookeeper.loadbalance;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class ZkServerScoekt implements Runnable {
    private int port = 18080;
    public ZkServerScoekt(int port) {
        this.port = port;
    }
    public void run() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("Server start port:" + port);
            Socket socket = null;
            while (true) {
                socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws IOException {
        int port = 18080;
        ZkServerScoekt server = new ZkServerScoekt(port);
        Thread thread = new Thread(server);
        thread.start();
    }
}

ServerHandler:

package com.ylw.zookeeper.loadbalance;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class ServerHandler implements Runnable {
    private Socket socket;
    public ServerHandler(Socket socket) {
        this.socket = socket;
    }
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null)
                    break;
                System.out.println("Receive : " + body);
                out.println("Hello, " + body);
            }
        } catch (Exception e) {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if (out != null) {
                out.close();
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                this.socket = null;
            }
        }
    }
}

3.创建客户端ZkServerClient:

package com.ylw.zookeeper.loadbalance;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class ZkServerClient {
    public static List<String> listServer = new ArrayList<String>();
    public static void main(String[] args) {
        initServer();
        ZkServerClient client = new ZkServerClient();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String name;
            try {
                name = console.readLine();
                if ("exit".equals(name)) {
                    System.exit(0);
                }
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    // 注册所有server
    public static void initServer() {
        listServer.clear();
        listServer.add("127.0.0.1:18080");
    }
    // 获取当前server信息
    public static String getServer() {
        return listServer.get(0);
    }
    public void send(String name) {
        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            out.println(name);
            while (true) {
                String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0) {
                    System.out.println("Receive : " + resp);
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                out.close();
            }
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4. 运行Server和Client

客户端控制台输入并发送:Hello ,I am YLW !

服务端接收:

使用Zookeeper

1.改造ZkServerScoekt

package com.ylw.zookeeper.loadbalance;
import org.I0Itec.zkclient.ZkClient;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class ZkServerScoekt2 implements Runnable {
    private static int port = 18081;  //socket 服务启动后的所使用的 端口号
    public static void main(String[] args) throws IOException {
        ZkServerScoekt server = new ZkServerScoekt(port);  //构造函数传入port
        regServer();  //去zk注册
        Thread thread = new Thread(server);
        thread.start();
    }
    public ZkServerScoekt2(int port) {
        this.port = port;
    }
    //注册服务
    public static void regServer() {
        //1、建立zk连接
        ZkClient zkClient = new ZkClient("192.168.162.131", 5000, 10000);
        //2.先创建父节点
        String root = "/toov5";
        if (!zkClient.exists(root)) {
            //如果父节点不存,直接创建父节点
            zkClient.createPersistent(root);  //持久节点
        }
        //3、创建子节点
        String nodeName = root + "/service_" + port;
        String nodeValue = "127.0.0.1:" + port;
        if (zkClient.exists(nodeName)) {  //如果存在 直接删除掉
            zkClient.delete(nodeName);
        }
        zkClient.createEphemeral(nodeName, "127.0.0.1:" + port);
        System.out.println("服务注册成功" + nodeName);
    }
    public void run() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("Server start port:" + port);
            Socket socket = null;
            while (true) {
                socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            } catch (Exception e2) {
            }
        }
    }
}

改造ZkServerClient:

package com.ylw.zookeeper.loadbalance;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class ZkServerClient2 {
    public static List<String> listServer = new ArrayList<String>();
    public static void main(String[] args) {
        initServer();
        ZkServerClient2 client = new ZkServerClient2();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String name;
            try {
                name = console.readLine();
                if ("exit".equals(name)) {
                    System.exit(0);
                }
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    // 注册所有server
    public static void initServer() {
        listServer.clear();
        listServer.add("127.0.0.1:18081"); //连接服务 放到list中   存放集群地址 做负载均衡用的
    }
    // 获取当前server信息
    public static String getServer() {
        return listServer.get(0);
    }
    public void send(String name) {
        String server = ZkServerClient2.getServer();
        String[] cfg = server.split(":");
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            out.println(name);
            while (true) {
                String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0) {
                    System.out.println("Receive : " + resp);
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                out.close();
            }
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

参考:https://www.cnblogs.com/toov5/p/9899238.html

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
18天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
32 2
|
3月前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
70 0
|
1月前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
90 1
|
1月前
|
算法 数据处理 异构计算
CatBoost高级教程:分布式训练与大规模数据处理
CatBoost高级教程:分布式训练与大规模数据处理【2月更文挑战第15天】
247 14
|
2月前
|
Java Linux Spring
Zookeeper实现分布式服务配置中心
Zookeeper实现分布式服务配置中心
49 0
|
2月前
|
存储 分布式计算 Hadoop
ZooKeeper初探:分布式世界的守护者
ZooKeeper初探:分布式世界的守护者
64 0
|
2月前
|
机器学习/深度学习 分布式计算 Python
OpenAI Gym 高级教程——分布式训练与并行化
OpenAI Gym 高级教程——分布式训练与并行化
200 1
|
2月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
301 0
|
2月前
|
网络协议 中间件 数据库
Zookeeper学习系列【三】Zookeeper 集群架构、读写机制以及一致性原理(ZAB协议)
Zookeeper学习系列【三】Zookeeper 集群架构、读写机制以及一致性原理(ZAB协议)
100 0
|
3月前
|
监控 前端开发 Java
JAVAEE分布式技术之Zookeeper技术
JAVAEE分布式技术之Zookeeper技术
17 0
JAVAEE分布式技术之Zookeeper技术