分布式协调/通知服务
mysql备份数据时,我们会通过读取binlog方式备份,但是如果当从服务器宕机时,则备份就会停止,我们可以通过zookeeper实现分布式协调备份
主服务进行备份提交,其他服务监听主服务器状态,如果宕机失去联系,则替代主服务进行工作.
实现原理
在zookeeper节点结构如下:
test └── customBackUp └── tasks 任务列表 └── task01 任务 ├── instance 服务实例列表 │ └── server\_1\_00000001 有序/临时 节点 ├── lastCommit 最后提交id └── status 当前状态
server进程:
1:判断tasks是否存在 task01 任务
2:如果不存在则初始化 task01 任务的节点列表
monitor进程:
1:监听tasks所有任务下的 status 节点,进行监控报警
task进程
1:多台服务初始化之后,先获取指定任务列表的节点数据(task01)
2:在instance中注册自己的有序/临时节点
3:注册完成之后,判断instance自己的节点是否为最小的,如果是,则节点状态为 "主服务"
4:如果不是最小的,则节点状态为:"从服务"
5:主服务进行处理数据,将status状态更新为"running",并将处理的进度id保留到 lastCommit 中
6:从服务进行监听instance节点列表,当主服务断线后,临时节点将会被删除,从而触发监听
7:从服务将status改为"stop"状态,重新进行判断节点是否最小
8:重复3-7
完整架构图解
简单实现代码
package main import ( "errors" "fmt" "github.com/go-zookeeper/zk" "os" "strconv" "strings" "time" ) var serverId int = 1 type instanceStatus int var ( StatusRunning instanceStatus = 1 StatusStandby instanceStatus = -1 ) func main() { serverId, _ = strconv.Atoi(os.Args\[1\]) conn, _, err := zk.Connect(\[\]string{"127.0.0.1:20005"}, time.Second*10) if err != nil { panic(err) } logWithTime(fmt.Sprintf("serverId:%v start.",serverId)) //task path path := "/customBackUp/tasks/task01" //check path exits exists, _, err := conn.Exists(path) if err != nil { panic(err) } logWithTime(fmt.Sprintf("check task node exist.")) //if path not exits,create path if exists == false { err := createPath(conn, path) if err != nil { panic(err) } err = createPath(conn, path+"/instance/") if err != nil { panic(err) } err = createPath(conn, path+"/lastCommit/") if err != nil { panic(err) } err = createPath(conn, path+"/status/") if err != nil { panic(err) } logWithTime(fmt.Sprintf("create task node")) } //register task(create a node sequence and ephemeral path) registerInstanceNodePath := path + "/instance/" + "server_" + strconv.Itoa(serverId) createPath, err := conn.Create(registerInstanceNodePath, \[\]byte{}, zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) if err != nil { panic(err) } logWithTime(fmt.Sprintf("register instance node: %v",createPath)) start_worker: status, err := checkInstanceStatus(conn, createPath, path+"/instance") if err != nil { panic(err) } logWithTime(fmt.Sprintf("current server status: %v",status)) if status == StatusRunning { //status need to update to running _, err := conn.Set(path+"/status", \[\]byte("1"), -1) if err != nil { panic(err) } logWithTime(fmt.Sprintf("current server handle task...")) handleTask(conn, path+"/lastCommit") } else if status == StatusStandby { logWithTime(fmt.Sprintf("current server watch node...")) watchTaskNode(conn, path+"/instance") //status need to update to stop _, err := conn.Set(path+"/status", \[\]byte("0"), -1) if err != nil { panic(err) } goto start_worker } } func handleTask(conn *zk.Conn, commitPath string) { //get commitId bytes, _, err := conn.Get(commitPath) if err != nil { panic(err) } str := string(bytes) id, _ := strconv.Atoi(str) for { id++ str := strconv.Itoa(id) _, err := conn.Set(commitPath, \[\]byte(str), -1) if err != nil { panic(err) } fmt.Printf("\[%v\]serverId(%v),commitId:%v \\n", time.Now().Format("2006-01-02 15:04:05"), serverId, id) time.Sleep(time.Second * 5) } } func watchTaskNode(conn *zk.Conn, watchPath string) { _, _, events, err := conn.ChildrenW(watchPath) if err != nil { panic(err) } fmt.Printf("event change: %v", <-events) return } func checkInstanceStatus(conn *zk.Conn, nodeName string, path string) (status instanceStatus, err error) { currentId := getInstanceNodeId(nodeName) minId := currentId //get all child nodes of task Instance node nodeArr, _, err := conn.Children(path) if err != nil { return 0, err } for _, v := range nodeArr { nodeId := getInstanceNodeId(v) if nodeId <= minId { minId = nodeId } } if minId == currentId { return StatusRunning, nil } else { return StatusStandby, nil } } func getInstanceNodeId(nodeName string) int { //nodeanme='/customBackUp/tasks/task01/instance/server_10000000001' //only need to intercept the last 8 digits id := nodeName\[len(nodeName)-8:\] intId, _ := strconv.Atoi(id) return intId } func createPath(conn *zk.Conn, path string) (err error) { strArr := strings.Split(path, "/") var node string for _, str := range strArr { if str == "" { continue } node = node + "/" + str exists, _, err := conn.Exists(node) if err != nil { return errors.New(err.Error()) } if exists { continue } else { _, err = conn.Create(node, \[\]byte{}, 0, zk.WorldACL(zk.PermAll)) if err != nil { return errors.New(err.Error()) } } } return err } func logWithTime(log string) { fmt.Printf("%v %v\\n", time.Now().Format("2006-01-02 15:04:05"), log) }
运行工作图:
注意:此代码部分逻辑缺失,例如:
1:发布任务的task进程没有体现
2:监控任务的monitor没有体现