nsq 是基于 Go语言开发出来的消息队列中间件,今天在windows上来安装一下基础环境。
一、下载可执行文件
下载完成之后解压:
二、执行 nsqlookup
nsqlookupd是管理拓扑信息的守护进程。客户端查询nsqlookupd以发现特定主题的nsqd生产者,nsqd节点广播主题和通道信息。
H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>nsqlookupd [nsqlookupd] 2022/12/21 18:02:36.729343 INFO: nsqlookupd v1.2.1 (built w/go1.16.6) [nsqlookupd] 2022/12/21 18:02:36.739842 INFO: HTTP: listening on [::]:4161 [nsqlookupd] 2022/12/21 18:02:36.739842 INFO: TCP: listening on [::]:4160
三、执行 nsqd
nsqd
是接收消息、排队并将消息传递给客户机的守护进程。
H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>nsqd --lookupd-tcp-address=127.0.0.1:4160 [nsqd] 2022/12/21 18:06:30.371132 INFO: nsqd v1.2.1 (built w/go1.16.6) [nsqd] 2022/12/21 18:06:30.380208 INFO: ID: 94 [nsqd] 2022/12/21 18:06:30.381257 INFO: NSQ: persisting topic/channel metadata to nsqd.dat [nsqd] 2022/12/21 18:06:30.508955 INFO: TCP: listening on [::]:4150 [nsqd] 2022/12/21 18:06:30.508955 INFO: LOOKUP(127.0.0.1:4160): adding peer [nsqd] 2022/12/21 18:06:30.508955 INFO: HTTP: listening on [::]:4151 [nsqd] 2022/12/21 18:06:30.510027 INFO: LOOKUP connecting to 127.0.0.1:4160 [nsqd] 2022/12/21 18:06:30.512820 INFO: LOOKUPD(127.0.0.1:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.1 BroadcastAddress:FENG}
四、执行 nsqadmin
nsqadmin是一个Web UI,用于实时查看聚合的集群统计信息并执行各种管理任务。
H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>nsqadmin --lookupd-http-address=127.0.0.1:4161 [nsqadmin] 2022/12/21 18:07:45.818217 INFO: nsqadmin v1.2.1 (built w/go1.16.6) [nsqadmin] 2022/12/21 18:07:45.827762 INFO: HTTP: listening on [::]:4171
五、测试
- 发布一个初始消息
H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>curl -d "hello world 1" "http://127.0.0.1:4151/pub?topic=test" OK H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>
六、启动 nsq_to_file
nsq_to_file 使用指定的主题/通道并写入以换行符分隔的文件,可选地滚动和/或压缩该文件。
H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>nsq_to_file --topic=test --output-dir=./tmp --lookupd-http-address=127.0.0.1:4161 2022/12/21 18:13:14 INF 1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2022/12/21 18:13:14 INF 1 [test/nsq_to_file] (FENG:4150) connecting to nsqd [nsq_to_file] 2022/12/21 18:13:33.800381 INFO: [test/nsq_to_file] opening tmp/test.FENG.2022-12-21_18.log [nsq_to_file] 2022/12/21 18:13:33.800381 INFO: [test/nsq_to_file] syncing 1 records to disk 2022/12/21 18:14:22 INF 1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2022/12/21 18:15:22 INF 1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2022/12/21 18:16:22 INF 1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test 2022/12/21 18:17:22 INF 1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
七、推送更多消息
H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>curl -d "hello world 1" "http://127.0.0.1:4151/pub?topic=test" OK H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>curl -d "hello world 1" "http://127.0.0.1:4151/pub?topic=test" OK H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>curl -d "hello world 1" "http://127.0.0.1:4151/pub?topic=test" OK H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>curl -d "hello world 1" "http://127.0.0.1:4151/pub?topic=test" OK H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>curl -d "hello world 1" "http://127.0.0.1:4151/pub?topic=test" OK H:\我的开发环境\消息队列\nsq-1.2.1.windows-amd64.go1.16.6\bin>
八、通过界面查看消息统计
浏览器打开:http://127.0.0.1:4171/
- 可以看到,刚刚发布了11条消息
九、Go 语言验证
- 消费者(Consumer)程序段
package main import ( "github.com/nsqio/go-nsq" "log" "os" "os/signal" "syscall" ) type myMessageHandler struct{} // HandleMessage implements the Handler interface func (h *myMessageHandler) HandleMessage(m *nsq.Message) error { if len(m.Body) == 0 { // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. // In the case, a message with an empty body is simply ignored/discarded. return nil } // do whatever actual message processing is desired. //err := processMessage(m.Body) err := func(message []byte) error { log.Println("接收到消息: " + string(message)) return nil }(m.Body) // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message. return err } func main() { // Instantiate a consumer that will subscribe to the provided channel. config := nsq.NewConfig() consumer, err := nsq.NewConsumer("topic", "channel", config) if err != nil { log.Fatal(err) } // Set the Handler for messages received by this Consumer. Can be called multiple times. // See also AddConcurrentHandlers. consumer.AddHandler(&myMessageHandler{}) // Use nsqlookupd to discover nsqd instances. // See also ConnectToNSQD,ConnectToNSQDs, ConnectToNSQLookupds. err = consumer.ConnectToNSQLookupd("localhost:4161") if err != nil { log.Fatal(err) } // wait for signal to exit sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan // Gracefully stop the consumer consumer.Stop() }
生产者(Producer)程序段
package main import ( "github.com/nsqio/go-nsq" "log" ) func main() { config := nsq.NewConfig() producer, err := nsq.NewProducer("127.0.0.1:4150", config) if err != nil { log.Fatal(err) } messageBody := []byte("你好呀,七镜") topicName := "topic" // Synchronously publish a single message to the specified topic. // Messages can also be sent asynchronously and/or in batches. err = producer.Publish(topicName, messageBody) if err != nil { log.Fatal(err) } // Gracefully stop the producer when appropriate (e.g. before shutting down the service) producer.Stop() }
生产者程序段执行结果
消费者程序段执行结果
算是走了遍消息队列的入门程序, 后面还需要好好研究。