分布式学习十四:协调任务

简介: 分布式学习十四:协调任务

分布式协调/通知服务

mysql备份数据时,我们会通过读取binlog方式备份,但是如果当从服务器宕机时,则备份就会停止,我们可以通过zookeeper实现分布式协调备份

主服务进行备份提交,其他服务监听主服务器状态,如果宕机失去联系,则替代主服务进行工作.

实现原理

在zookeeper节点结构如下:

test
└── customBackUp
    └── tasks  任务列表
        └── task01  任务
            ├── instance  服务实例列表
            │   └── server\_1\_00000001  有序/临时 节点
            ├── lastCommit 最后提交id
            └── status 当前状态

server进程:

1:判断tasks是否存在 task01 任务

2:如果不存在则初始化 task01 任务的节点列表

monitor进程:

1:监听tasks所有任务下的 status 节点,进行监控报警

task进程

1:多台服务初始化之后,先获取指定任务列表的节点数据(task01)

2:在instance中注册自己的有序/临时节点

3:注册完成之后,判断instance自己的节点是否为最小的,如果是,则节点状态为 "主服务"

4:如果不是最小的,则节点状态为:"从服务"

5:主服务进行处理数据,将status状态更新为"running",并将处理的进度id保留到 lastCommit 中

6:从服务进行监听instance节点列表,当主服务断线后,临时节点将会被删除,从而触发监听

7:从服务将status改为"stop"状态,重新进行判断节点是否最小

8:重复3-7

完整架构图解

image.png

简单实现代码

package main
import (
   "errors"
   "fmt"
   "github.com/go-zookeeper/zk"
   "os"
   "strconv"
   "strings"
   "time"
)
var serverId int = 1
type instanceStatus int
var (
   StatusRunning instanceStatus = 1
   StatusStandby instanceStatus = -1
)
func main() {
   serverId, _ = strconv.Atoi(os.Args\[1\])
   conn, _, err := zk.Connect(\[\]string{"127.0.0.1:20005"}, time.Second*10)
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("serverId:%v start.",serverId))
   //task path
   path := "/customBackUp/tasks/task01"
   //check path exits
   exists, _, err := conn.Exists(path)
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("check task node exist."))
   //if path  not exits,create path
   if exists == false {
      err := createPath(conn, path)
      if err != nil {
         panic(err)
      }
      err = createPath(conn, path+"/instance/")
      if err != nil {
         panic(err)
      }
      err = createPath(conn, path+"/lastCommit/")
      if err != nil {
         panic(err)
      }
      err = createPath(conn, path+"/status/")
      if err != nil {
         panic(err)
      }
      logWithTime(fmt.Sprintf("create task node"))
   }
   //register task(create a node sequence and ephemeral path)
   registerInstanceNodePath := path + "/instance/" + "server_" + strconv.Itoa(serverId)
   createPath, err := conn.Create(registerInstanceNodePath, \[\]byte{}, zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("register instance node: %v",createPath))
start_worker:
   status, err := checkInstanceStatus(conn, createPath, path+"/instance")
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("current server status: %v",status))
   if status == StatusRunning {
      //status need to update to running
      _, err := conn.Set(path+"/status", \[\]byte("1"), -1)
      if err != nil {
          panic(err)
      }
      logWithTime(fmt.Sprintf("current server handle task..."))
      handleTask(conn, path+"/lastCommit")
   } else if status == StatusStandby {
      logWithTime(fmt.Sprintf("current server watch node..."))
      watchTaskNode(conn, path+"/instance")
      //status need to update to stop
      _, err := conn.Set(path+"/status", \[\]byte("0"), -1)
      if err != nil {
         panic(err)
      }
      goto start_worker
   }
}
func handleTask(conn *zk.Conn, commitPath string) {
   //get commitId
   bytes, _, err := conn.Get(commitPath)
   if err != nil {
      panic(err)
   }
   str := string(bytes)
   id, _ := strconv.Atoi(str)
   for {
      id++
      str := strconv.Itoa(id)
      _, err := conn.Set(commitPath, \[\]byte(str), -1)
      if err != nil {
         panic(err)
      }
      fmt.Printf("\[%v\]serverId(%v),commitId:%v \\n", time.Now().Format("2006-01-02 15:04:05"), serverId, id)
      time.Sleep(time.Second * 5)
   }
}
func watchTaskNode(conn *zk.Conn, watchPath string) {
   _, _, events, err := conn.ChildrenW(watchPath)
   if err != nil {
      panic(err)
   }
   fmt.Printf("event change: %v", <-events)
   return
}
func checkInstanceStatus(conn *zk.Conn, nodeName string, path string) (status instanceStatus, err error) {
   currentId := getInstanceNodeId(nodeName)
   minId := currentId
   //get all child nodes of task Instance node
   nodeArr, _, err := conn.Children(path)
   if err != nil {
      return 0, err
   }
   for _, v := range nodeArr {
      nodeId := getInstanceNodeId(v)
      if nodeId <= minId {
         minId = nodeId
      }
   }
   if minId == currentId {
      return StatusRunning, nil
   } else {
      return StatusStandby, nil
   }
}
func getInstanceNodeId(nodeName string) int {
   //nodeanme='/customBackUp/tasks/task01/instance/server_10000000001'
   //only need to intercept the last 8 digits
   id := nodeName\[len(nodeName)-8:\]
   intId, _ := strconv.Atoi(id)
   return intId
}
func createPath(conn *zk.Conn, path string) (err error) {
   strArr := strings.Split(path, "/")
   var node string
   for _, str := range strArr {
      if str == "" {
         continue
      }
      node = node + "/" + str
      exists, _, err := conn.Exists(node)
      if err != nil {
         return errors.New(err.Error())
      }
      if exists {
         continue
      } else {
         _, err = conn.Create(node, \[\]byte{}, 0, zk.WorldACL(zk.PermAll))
         if err != nil {
            return errors.New(err.Error())
         }
      }
   }
   return err
}
func logWithTime(log string) {
   fmt.Printf("%v %v\\n", time.Now().Format("2006-01-02 15:04:05"), log)
}

运行工作图:

image.png

注意:此代码部分逻辑缺失,例如:

1:发布任务的task进程没有体现

2:监控任务的monitor没有体现

目录
相关文章
|
2月前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
430 2
|
2月前
|
SQL 关系型数据库 数据库
学习分布式事务Seata看这一篇就够了,建议收藏
学习分布式事务Seata看这一篇就够了,建议收藏
|
19天前
|
机器学习/深度学习 分布式计算 算法
联邦学习是保障数据隐私的分布式机器学习方法
【6月更文挑战第13天】联邦学习是保障数据隐私的分布式机器学习方法,它在不暴露数据的情况下,通过在各设备上本地训练并由中心服务器协调,实现全局模型构建。联邦学习的优势在于保护隐私、提高训练效率和增强模型泛化。已应用于医疗、金融和物联网等领域。未来趋势包括更高效的数据隐私保护、提升可解释性和可靠性,以及与其他技术融合,有望在更多场景发挥潜力,推动机器学习发展。
27 4
|
4天前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
18 0
|
2月前
|
并行计算 算法 物联网
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
|
12天前
|
负载均衡 NoSQL 关系型数据库
Redis分布式锁学习总结
Redis分布式锁学习总结
15 0
|
18天前
|
存储 监控 负载均衡
Zookeeper 详解:分布式协调服务的核心概念与实践
Zookeeper 详解:分布式协调服务的核心概念与实践
18 0
|
2月前
|
存储 大数据 Apache
深入理解ZooKeeper:分布式协调服务的核心与实践
【5月更文挑战第7天】ZooKeeper是Apache的分布式协调服务,确保大规模分布式系统中的数据一致性与高可用性。其特点包括强一致性、高可用性、可靠性、顺序性和实时性。使用ZooKeeper涉及安装配置、启动服务、客户端连接及执行操作。实际应用中,面临性能瓶颈、不可伸缩性和单点故障等问题,可通过水平扩展、集成其他服务和多集群备份来解决。理解ZooKeeper原理和实践,有助于构建高效分布式系统。
|
2月前
|
Dubbo Java 应用服务中间件
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
Java从入门到精通:3.2.2分布式与并发编程——了解分布式系统的基本概念,学习使用Dubbo、Spring Cloud等分布式框架
258 0
|
2月前
|
消息中间件 存储 负载均衡

热门文章

最新文章