分布式系列教程(10) -分布式协调工具Zookeeper(负载均衡原理实现)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
网络型负载均衡 NLB,每月750个小时 15LCU
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 分布式系列教程(10) -分布式协调工具Zookeeper(负载均衡原理实现)

代码已上传到Gtihub,有兴趣的同学可以下载来看看(git版本号90275563ea2f9efc047e62427f6b9d472fe47b7f):https://github.com/ylw-github/Zookeeper-Demo

使用Zookeeper实现负载均衡原理:

  • 服务器端将启动的服务注册到ZK注册中心上,采用临时节点。
  • 客户端从zk节点上获取最新服务节点信息,本地使用负载均衡算法,随机分配服务器。

下面来开始讲解代码的实现

ZK负载均衡实现

传统模式

1.添加maven依赖:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.8</version>
</dependency>

2.创建Server服务端:

ZkServerScoekt服务:

package com.ylw.zookeeper.loadbalance;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class ZkServerScoekt implements Runnable {
    private int port = 18080;
    public ZkServerScoekt(int port) {
        this.port = port;
    }
    public void run() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("Server start port:" + port);
            Socket socket = null;
            while (true) {
                socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws IOException {
        int port = 18080;
        ZkServerScoekt server = new ZkServerScoekt(port);
        Thread thread = new Thread(server);
        thread.start();
    }
}

ServerHandler:

package com.ylw.zookeeper.loadbalance;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class ServerHandler implements Runnable {
    private Socket socket;
    public ServerHandler(Socket socket) {
        this.socket = socket;
    }
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null)
                    break;
                System.out.println("Receive : " + body);
                out.println("Hello, " + body);
            }
        } catch (Exception e) {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if (out != null) {
                out.close();
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                this.socket = null;
            }
        }
    }
}

3.创建客户端ZkServerClient:

package com.ylw.zookeeper.loadbalance;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class ZkServerClient {
    public static List<String> listServer = new ArrayList<String>();
    public static void main(String[] args) {
        initServer();
        ZkServerClient client = new ZkServerClient();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String name;
            try {
                name = console.readLine();
                if ("exit".equals(name)) {
                    System.exit(0);
                }
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    // 注册所有server
    public static void initServer() {
        listServer.clear();
        listServer.add("127.0.0.1:18080");
    }
    // 获取当前server信息
    public static String getServer() {
        return listServer.get(0);
    }
    public void send(String name) {
        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            out.println(name);
            while (true) {
                String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0) {
                    System.out.println("Receive : " + resp);
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                out.close();
            }
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4. 运行Server和Client

客户端控制台输入并发送:Hello ,I am YLW !

服务端接收:

使用Zookeeper

1.改造ZkServerScoekt

package com.ylw.zookeeper.loadbalance;
import org.I0Itec.zkclient.ZkClient;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class ZkServerScoekt2 implements Runnable {
    private static int port = 18081;  //socket 服务启动后的所使用的 端口号
    public static void main(String[] args) throws IOException {
        ZkServerScoekt server = new ZkServerScoekt(port);  //构造函数传入port
        regServer();  //去zk注册
        Thread thread = new Thread(server);
        thread.start();
    }
    public ZkServerScoekt2(int port) {
        this.port = port;
    }
    //注册服务
    public static void regServer() {
        //1、建立zk连接
        ZkClient zkClient = new ZkClient("192.168.162.131", 5000, 10000);
        //2.先创建父节点
        String root = "/toov5";
        if (!zkClient.exists(root)) {
            //如果父节点不存,直接创建父节点
            zkClient.createPersistent(root);  //持久节点
        }
        //3、创建子节点
        String nodeName = root + "/service_" + port;
        String nodeValue = "127.0.0.1:" + port;
        if (zkClient.exists(nodeName)) {  //如果存在 直接删除掉
            zkClient.delete(nodeName);
        }
        zkClient.createEphemeral(nodeName, "127.0.0.1:" + port);
        System.out.println("服务注册成功" + nodeName);
    }
    public void run() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("Server start port:" + port);
            Socket socket = null;
            while (true) {
                socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            } catch (Exception e2) {
            }
        }
    }
}

改造ZkServerClient:

package com.ylw.zookeeper.loadbalance;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class ZkServerClient2 {
    public static List<String> listServer = new ArrayList<String>();
    public static void main(String[] args) {
        initServer();
        ZkServerClient2 client = new ZkServerClient2();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String name;
            try {
                name = console.readLine();
                if ("exit".equals(name)) {
                    System.exit(0);
                }
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    // 注册所有server
    public static void initServer() {
        listServer.clear();
        listServer.add("127.0.0.1:18081"); //连接服务 放到list中   存放集群地址 做负载均衡用的
    }
    // 获取当前server信息
    public static String getServer() {
        return listServer.get(0);
    }
    public void send(String name) {
        String server = ZkServerClient2.getServer();
        String[] cfg = server.split(":");
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            out.println(name);
            while (true) {
                String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0) {
                    System.out.println("Receive : " + resp);
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                out.close();
            }
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

参考:https://www.cnblogs.com/toov5/p/9899238.html

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
22天前
|
存储 Dubbo Java
分布式 RPC 底层原理详解,看这篇就够了!
本文详解分布式RPC的底层原理与系统设计,大厂面试高频,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式 RPC 底层原理详解,看这篇就够了!
|
1月前
|
负载均衡 算法 应用服务中间件
5大负载均衡算法及原理,图解易懂!
本文详细介绍负载均衡的5大核心算法:轮询、加权轮询、随机、最少连接和源地址散列,帮助你深入理解分布式架构中的关键技术。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
5大负载均衡算法及原理,图解易懂!
|
25天前
|
数据可视化 数据挖掘 项目管理
远程团队的高效选择!必备协作工具助力分布式办公
随着远程办公的普及,团队协作面临沟通不畅、任务跟踪困难、协同效率低和反馈滞后等挑战。本文推荐几款高效协作工具,如板栗看板、Trello、Asana和Slack,帮助团队应对分布式协作中的痛点,提升项目管理透明度和沟通效率。
43 1
|
2月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
46 2
|
2月前
|
分布式计算 负载均衡 算法
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
31 1
|
2月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
43 1
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
50 1
|
2月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
38 1
|
2月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
50 1
|
2月前
|
存储 缓存 数据处理
深度解析:Hologres分布式存储引擎设计原理及其优化策略
【10月更文挑战第9天】在大数据时代,数据的规模和复杂性不断增加,这对数据库系统提出了更高的要求。传统的单机数据库难以应对海量数据处理的需求,而分布式数据库通过水平扩展提供了更好的解决方案。阿里云推出的Hologres是一个实时交互式分析服务,它结合了OLAP(在线分析处理)与OLTP(在线事务处理)的优势,能够在大规模数据集上提供低延迟的数据查询能力。本文将深入探讨Hologres分布式存储引擎的设计原理,并介绍一些关键的优化策略。
123 0