golang 中操作nsq队列数据库

简介: 首先先在本地将服务跑起来,我用的是docker-compose ,一句话6666 先新建一个docker-compose.yml version: '2' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "192.

首先先在本地将服务跑起来,我用的是docker-compose ,一句话6666

先新建一个docker-compose.yml
version: '2'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "192.168.9.111:4160:4160"
      - "192.168.9.111:4161:4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "192.168.9.111:4150:4150"
      - "192.168.9.111:4151:4151"
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd
    ports:
      - "192.168.9.111:4171:4171"

  然后整个数据就跑起来了

写个生产消息的

tproducter.go

package main
import(
    "log"
    "github.com/nsqio/go-nsq"
    "encoding/json"
    "strconv"
)

type Person struct {
    Id int
    Name string
    Age int
    NickName string
}

func main() {
    config :=nsq.NewConfig()
    w,err :=nsq.NewProducer("192.168.9.111:4150",config)
    if err !=nil {
        log.Panic("Could not create producer.")
    }
    defer w.Stop()
    for i :=0;i<100;i++{
        p :=&Person{}
        p.Id = i
        p.Name = "Jack"+strconv.Itoa(i)
        p.NickName="Luo"+strconv.Itoa(i)
        p.Age = i
        info,jerr :=json.Marshal(p)
        err :=w.Publish("write_test",info)
        if err !=nil || jerr !=nil {
            log.Panic("Could not connect.")
        }
    }
    w.Stop()
}

再写个消费的

tconsumer.go

package main

import (
    "log"
    "github.com/nsqio/go-nsq"
    "time"
)

func main() {
    config :=nsq.NewConfig()
    q,err := nsq.NewConsumer("write_test","ch",config)
    if err !=nil{
        log.Panic("Could not create consumer.")
    }
    defer q.Stop()
    q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error{
        log.Printf("Got a message: %v",string(message.Body))
        time.Sleep(5*time.Second)
        return nil
    }))
    //err = q.ConnectToNSQD("192.168.9.111:32771");
    err = q.ConnectToNSQD("192.168.9.111:4150");
    if err !=nil {
        log.Panic("Could not connect")
    }
    time.Sleep(3600*time.Second)
}

然后就要以6起来了

/usr/local/go/bin/go run /Users/jackluo/Works/golang/src/nsq/tconsumer.go
2017/08/29 15:29:45 INF    1 [write_test/ch] (192.168.9.111:4150) connecting to nsqd
2017/08/29 15:29:45 Got a message: {"Id":0,"Name":"Jack0","Age":0,"NickName":"Luo0"}
2017/08/29 15:29:50 Got a message: {"Id":1,"Name":"Jack1","Age":1,"NickName":"Luo1"}
2017/08/29 15:29:55 Got a message: {"Id":2,"Name":"Jack2","Age":2,"NickName":"Luo2"}
2017/08/29 15:30:00 Got a message: {"Id":3,"Name":"Jack3","Age":3,"NickName":"Luo3"}
2017/08/29 15:30:05 Got a message: {"Id":4,"Name":"Jack4","Age":4,"NickName":"Luo4"}
2017/08/29 15:30:10 Got a message: {"Id":5,"Name":"Jack5","Age":5,"NickName":"Luo5"}
2017/08/29 15:30:15 Got a message: {"Id":6,"Name":"Jack6","Age":6,"NickName":"Luo6"}
2017/08/29 15:30:20 Got a message: {"Id":7,"Name":"Jack7","Age":7,"NickName":"Luo7"}
2017/08/29 15:30:25 Got a message: {"Id":8,"Name":"Jack8","Age":8,"NickName":"Luo8"}
2017/08/29 15:30:30 Got a message: {"Id":9,"Name":"Jack9","Age":9,"NickName":"Luo9"}
2017/08/29 15:30:35 Got a message: {"Id":10,"Name":"Jack10","Age":10,"NickName":"Luo10"}
2017/08/29 15:30:40 Got a message: {"Id":11,"Name":"Jack11","Age":11,"NickName":"Luo11"}
2017/08/29 15:30:45 Got a message: {"Id":12,"Name":"Jack12","Age":12,"NickName":"Luo12"}

可以通过这个地址看得到界面http://192.168.9.111:4171/counter

目录
相关文章
|
17小时前
|
存储 关系型数据库 MySQL
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
10 1
|
2月前
|
存储 关系型数据库 MySQL
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
106 5
|
4月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之cdc postgres数据库,当表行记录修改后报错,该如何修改
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 监控 关系型数据库
实时计算 Flink版操作报错合集之在设置监控PostgreSQL数据库时,将wal_level设置为logical,出现一些表更新和删除操作报错,怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
关系型数据库 Java 数据库
实时计算 Flink版操作报错合集之flinksql采PG数据库时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之在处理PostgreSQL数据库遇到报错。该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 关系型数据库 数据库
实时计算 Flink版操作报错合集之在使用RDS数据库作为源端,遇到只能同步21个任务,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 数据库 Python
Django框架数据库ORM查询操作(6)
【7月更文挑战第6天】```markdown Django ORM常用数据库操作:1) 查询所有数据2) 根据ID查询 3) 精确查询 4) 分页排序
61 1
|
5月前
|
Java Devops API
阿里云云效操作报错合集之云效页面提示数据库保存不进去,该怎么办
本合集将整理呈现用户在使用过程中遇到的报错及其对应的解决办法,包括但不限于账户权限设置错误、项目配置不正确、代码提交冲突、构建任务执行失败、测试环境异常、需求流转阻塞等问题。阿里云云效是一站式企业级研发协同和DevOps平台,为企业提供从需求规划、开发、测试、发布到运维、运营的全流程端到端服务和工具支撑,致力于提升企业的研发效能和创新能力。
|
4月前
|
存储 监控 安全
安全规范问题之跟数据库交互涉及的敏感数据操作需要有哪些措施
安全规范问题之跟数据库交互涉及的敏感数据操作需要有哪些措施