数据订阅/发布
在分布式集群中,假设数据库发生了改动,就得修改所有分布式服务的数据库配置
我们可以通过zookeeper来实现数据库配置的订阅发布
我们先初始化数据库配置项环境
在zookeeper配置以下数据
\[zk: localhost:2181(CONNECTED) 51\] get /config-server/app1/database {"Host":"127.0.0.1:3300","User":"root","Password":"233274","Database":"test"}
go代码环境准备
引入
github.com/go-zookeeper/zk
go.mod内容为:
module zkStudy go 1.17 require ( github.com/go-sql-driver/mysql v1.6.0 github.com/go-zookeeper/zk v1.0.2 github.com/jmoiron/sqlx v1.3.4 )
发布数据库配置
我们只需要set path,在zk中将自动把数据发布到订阅此目录的客户端中
以下代码,每2秒更改一次数据库数据
func loopChangeDbConfig() { var dbConfig = config.DatabaseConfig{Host: "127.0.0.1:3300",User: "root",Password: "123456",Database: "test"} t := time.NewTicker(2 * time.Second) for { select { case <-t.C: dbConfig.Password=strconv.Itoa(rand.Intn(999999)+100000) jsonByte,_ := json.Marshal(dbConfig) _,err := zkConnect.Set(databaseZKPath,jsonByte,-1) if err!=nil { fmt.Println("zk set dbConfig path err :", err) return } } } }
订阅数据库配置
通过zk.getW方法,获取数据并返回一个event单向通道,通过此通道可监听获取一条事件更改数据:
func getDatabaseConfig() <-chan zk.Event { //listen mysql-config path jsonStrByte, _, event, err := zkConnect.GetW(databaseZKPath) if err != nil { panic(err) } _ = json.Unmarshal(jsonStrByte, &databaseConfig) fmt.Printf("%+v 123\\n", databaseConfig) return event }
获取到event之后,新开协程,进行阻塞获取通道,当获取到数据后,重新获取配置并继续获取一个通道监听数据
func listenDBConfigChange(event <-chan zk.Event) { var e zk.Event for { e = <-event //if node data change,db reconnect if e.Type == zk.EventNodeDataChanged { fmt.Printf("node data changed: %s, \\n",e.Path) event = getDatabaseConfig() err := db.Close() if err != nil { panic(err) } connectDb() dbTest() } } }
运行结果: