开发者社区 > 云原生 > 消息队列 > 正文

RocketMQ用go sdk怎么发送定时的消息?

RocketMQ用go sdk怎么发送定时的消息?

展开
收起
-Feng、冯冯 2023-10-17 23:43:23 64 0
4 条回答
写回答
取消 提交回答
  • 要使用Go SDK发送定时消息,您需要按照以下步骤操作:

    1 首先,确保已经安装了RocketMQ的Go SDK。如果没有安装,可以通过以下命令安装:

    go get -u github.com/apache/rocketmq-client-go
    

    2 然后,编写代码以发送定时消息。以下是一个简单的示例:

    package main
    
    import (
        "fmt"
        "time"
    
        "github.com/apache/rocketmq-client-go/core"
        "github.com/apache/rocketmq-client-go/producer"
    )
    
    func main() {
        // 创建一个生产者实例,指定NameServer地址和生产者组名
        p, err := producer.NewSyncProducer([]string{"127.0.0.1:9876"}, "my-producer-group")
        if err != nil {
            fmt.Println("创建生产者失败:", err)
            return
        }
        defer p.Close()
    
        // 创建一个消息实例,设置主题、标签和消息体
        msg := core.Message{
            Topic: "my-topic", // 主题名
            Tags:  []string{"tagA", "tagB"}, // 标签列表
            Body:  []byte("Hello, RocketMQ!"), // 消息体
        }
    
        // 设置定时消息的延迟时间(单位:毫秒)和定时级别(定时级别越高,优先级越高)
        delayTime := int64(1000 * time.Millisecond) // 延迟1秒发送
        msg.DelayTimeLevel = delayTime % 5 + 1 // 定时级别为1-5,这里设置为2(即延迟1秒发送)
    
        // 发送消息并检查是否成功
        err = p.SendSync(msg)
        if err != nil {
            fmt.Println("发送消息失败:", err)
        } else {
            fmt.Println("发送消息成功:", msg)
        }
    }
    

    在这个示例中,我们创建了一个生产者实例,然后创建了一个消息实例,设置了主题、标签和消息体。接着,我们设置了定时消息的延迟时间和定时级别,最后发送了消息。如果发送成功,将输出“发送消息成功”的信息。

    2023-10-31 20:53:06
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    对于 RocketMQ 的 Go SDK,目前版本(V1.1.0)还不支持直接发送定时消息。不过,你可以通过扩展 Go SDK 来实现定时消息的发送。

    以下是一个简单的示例代码,演示如何在 Go SDK 中发送定时消息:

    package main
    
    import (
        "fmt"
        "github.com/apache/rocketmq-client-go/v2"
        "github.com/apache/rocketmq-client-go/v2/primitive"
        "time"
    )
    
    func main() {
        p, _ := rocketmq.NewProducer(
            rocketmq.WithNameServer([]string{"localhost:9876"}),
            // 设置自定义消息队列选择器,用于发送延迟消息
            rocketmq.WithCustomizedSelector(func(mqs []*primitive.MessageQueue, msg *primitive.Message, arg interface{}) *primitive.MessageQueue {
                delayTime := arg.(int64) // 延迟时间,单位为毫秒
                now := time.Now().UnixNano() / 1000000
                index := (now + delayTime - now%10000) / 10000 % int64(len(mqs))
                return mqs[index]
            }, time.Minute.Nanoseconds()), // 在此设置默认延迟时间为 1 分钟
        )
    
        err := p.Start()
        if err != nil {
            fmt.Printf("Start producer error: %s", err.Error())
            return
        }
    
        topic := "your_topic"
        message := &primitive.Message{
            Topic: topic,
            Body:  []byte("Hello RocketMQ"),
        }
    
        delayTime := int64(5 * 60 * 1000) // 5 分钟的延迟时间,单位为毫秒
        // 设置消息的延迟级别(可选)
        message.DelayTimeLevel = 1
    
        // 发送定时消息
        sendResult, err := p.SendSync(context.Background(), message, delayTime)
        if err != nil {
            fmt.Printf("Send message error: %s", err.Error())
            return
        }
    
        fmt.Printf("Send message success. result=%v", sendResult)
    
        p.Shutdown()
    }
    

    在上述示例代码中,我们通过自定义消息队列选择器来实现定时消息的发送。通过计算当前时间和延迟时间,将消息发送到相应的消息队列中,从而实现定时发送的效果。

    该示例代码仅用于说明原理,并没有考虑多线程安全性等问题。在实际生产环境中,你可能需要进行更多的细节处理,例如错误处理、消息队列选择算法的优化等。

    另外,RocketMQ 也提供了官方支持的 Java SDK,如果你的业务允许,可以考虑使用 Java SDK 来发送定时消息,因为官方 Java SDK 已经内置了定时消息的支持。

    2023-10-30 19:19:45
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    根据提供的信息,如果你想要使用Go SDK在RocketMQ中发送定时消息,可以使用以下步骤:

    1. 创建一个消息实例:你可以使用NewMessage函数创建一个消息实例。消息实例包含主题、消息体和消息属性等信息。
    2. 设置消息属性:你可以使用SetProperty函数设置消息属性。消息属性用于控制消息的行为,例如消息的优先级、时间戳等。
    3. 设置消息的属性:你可以使用SetProperty函数设置消息的属性。消息属性用于控制消息的行为,例如消息的优先级、时间戳等。
    4. 设置消息的属性:你可以使用SetProperty函数设置消息的属性。消息属性用于控制消息的行为,例如消息的优先级、时间戳等。
    5. 设置消息的属性:你可以使用SetProperty函数设置消息的属性。消息属性用于控制消息的行为,例如消息的优先级、时间戳等。
    6. 发送消息:你可以使用Send函数发送消息。在发送消息时,你可以选择异步发送或同步发送。
    2023-10-18 14:17:18
    赞同 展开评论 打赏
  • 本文提供使用HTTP协议下的Go SDK收发定时消息和延时消息的示例代码。https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-subscribe-to-scheduled-messages-and-delayed-messages-1?spm=a2c4g.11186623.0.i80

    发送定时消息或延时消息
    发送定时消息或延时消息的示例代码如下。
    ```package main

    import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
    

    )

    func main() {
    // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
    endpoint := "${HTTP_ENDPOINT}"
    // 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
    // AccessKey ID,阿里云身份验证标识。
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    // AccessKey Secret,阿里云身份验证密钥。
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // 消息所属的Topic,在消息队列RocketMQ版控制台创建。
    topic := "${TOPIC}"
    // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
    // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    
    mqProducer := client.GetProducer(instanceId, topic)
    // 循环发送4条消息。
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
    
            msg = mq_http_sdk.PublishMessageRequest{
            MessageBody: "hello mq!",         // 消息内容。
            MessageTag:  "",                  // 消息标签。
            Properties:  map[string]string{}, // 消息属性。
            }
    // 设置消息的Key。
            msg.MessageKey = "MessageKey"
            // 设置消息自定义属性。
            msg.Properties["a"] = strconv.Itoa(i)
            // 延时消息,发送时间为10s后。该参数格式为毫秒级别的时间戳。
            // 若发送定时消息,设置该参数时需要计算定时时间与当前时间的时间差。
            msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
    
        ret, err := mqProducer.PublishMessage(msg)
    
        if err != nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
    

    }
    ```

    2023-10-18 10:01:06
    赞同 展开评论 打赏

多个子产品线联合打造金融级高可用消息服务以及对物联网的原生支持,覆盖多行业。

相关产品

  • 云消息队列 MQ
  • 相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载