开发者社区 > 云原生 > 云消息队列 > 正文

RocketMQ用go sdk怎么发送定时的消息?

RocketMQ用go sdk怎么发送定时的消息?

展开
收起
嘟嘟嘟嘟嘟嘟 2023-10-17 23:43:23 154 0
4 条回答
写回答
取消 提交回答
  • 要使用Go SDK发送定时消息,您需要按照以下步骤操作:

    1 首先,确保已经安装了RocketMQ的Go SDK。如果没有安装,可以通过以下命令安装:

    go get -u github.com/apache/rocketmq-client-go
    

    2 然后,编写代码以发送定时消息。以下是一个简单的示例:

    package main
    
    import (
        "fmt"
        "time"
    
        "github.com/apache/rocketmq-client-go/core"
        "github.com/apache/rocketmq-client-go/producer"
    )
    
    func main() {
        // 创建一个生产者实例,指定NameServer地址和生产者组名
        p, err := producer.NewSyncProducer([]string{"127.0.0.1:9876"}, "my-producer-group")
        if err != nil {
            fmt.Println("创建生产者失败:", err)
            return
        }
        defer p.Close()
    
        // 创建一个消息实例,设置主题、标签和消息体
        msg := core.Message{
            Topic: "my-topic", // 主题名
            Tags:  []string{"tagA", "tagB"}, // 标签列表
            Body:  []byte("Hello, RocketMQ!"), // 消息体
        }
    
        // 设置定时消息的延迟时间(单位:毫秒)和定时级别(定时级别越高,优先级越高)
        delayTime := int64(1000 * time.Millisecond) // 延迟1秒发送
        msg.DelayTimeLevel = delayTime % 5 + 1 // 定时级别为1-5,这里设置为2(即延迟1秒发送)
    
        // 发送消息并检查是否成功
        err = p.SendSync(msg)
        if err != nil {
            fmt.Println("发送消息失败:", err)
        } else {
            fmt.Println("发送消息成功:", msg)
        }
    }
    

    在这个示例中,我们创建了一个生产者实例,然后创建了一个消息实例,设置了主题、标签和消息体。接着,我们设置了定时消息的延迟时间和定时级别,最后发送了消息。如果发送成功,将输出“发送消息成功”的信息。

    2023-10-31 20:53:06
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    对于 RocketMQ 的 Go SDK,目前版本(V1.1.0)还不支持直接发送定时消息。不过,你可以通过扩展 Go SDK 来实现定时消息的发送。

    以下是一个简单的示例代码,演示如何在 Go SDK 中发送定时消息:

    package main
    
    import (
        "fmt"
        "github.com/apache/rocketmq-client-go/v2"
        "github.com/apache/rocketmq-client-go/v2/primitive"
        "time"
    )
    
    func main() {
        p, _ := rocketmq.NewProducer(
            rocketmq.WithNameServer([]string{"localhost:9876"}),
            // 设置自定义消息队列选择器,用于发送延迟消息
            rocketmq.WithCustomizedSelector(func(mqs []*primitive.MessageQueue, msg *primitive.Message, arg interface{}) *primitive.MessageQueue {
                delayTime := arg.(int64) // 延迟时间,单位为毫秒
                now := time.Now().UnixNano() / 1000000
                index := (now + delayTime - now%10000) / 10000 % int64(len(mqs))
                return mqs[index]
            }, time.Minute.Nanoseconds()), // 在此设置默认延迟时间为 1 分钟
        )
    
        err := p.Start()
        if err != nil {
            fmt.Printf("Start producer error: %s", err.Error())
            return
        }
    
        topic := "your_topic"
        message := &primitive.Message{
            Topic: topic,
            Body:  []byte("Hello RocketMQ"),
        }
    
        delayTime := int64(5 * 60 * 1000) // 5 分钟的延迟时间,单位为毫秒
        // 设置消息的延迟级别(可选)
        message.DelayTimeLevel = 1
    
        // 发送定时消息
        sendResult, err := p.SendSync(context.Background(), message, delayTime)
        if err != nil {
            fmt.Printf("Send message error: %s", err.Error())
            return
        }
    
        fmt.Printf("Send message success. result=%v", sendResult)
    
        p.Shutdown()
    }
    

    在上述示例代码中,我们通过自定义消息队列选择器来实现定时消息的发送。通过计算当前时间和延迟时间,将消息发送到相应的消息队列中,从而实现定时发送的效果。

    该示例代码仅用于说明原理,并没有考虑多线程安全性等问题。在实际生产环境中,你可能需要进行更多的细节处理,例如错误处理、消息队列选择算法的优化等。

    另外,RocketMQ 也提供了官方支持的 Java SDK,如果你的业务允许,可以考虑使用 Java SDK 来发送定时消息,因为官方 Java SDK 已经内置了定时消息的支持。

    2023-10-30 19:19:45
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    根据提供的信息,如果你想要使用Go SDK在RocketMQ中发送定时消息,可以使用以下步骤:

    1. 创建一个消息实例:你可以使用NewMessage函数创建一个消息实例。消息实例包含主题、消息体和消息属性等信息。
    2. 设置消息属性:你可以使用SetProperty函数设置消息属性。消息属性用于控制消息的行为,例如消息的优先级、时间戳等。
    3. 设置消息的属性:你可以使用SetProperty函数设置消息的属性。消息属性用于控制消息的行为,例如消息的优先级、时间戳等。
    4. 设置消息的属性:你可以使用SetProperty函数设置消息的属性。消息属性用于控制消息的行为,例如消息的优先级、时间戳等。
    5. 设置消息的属性:你可以使用SetProperty函数设置消息的属性。消息属性用于控制消息的行为,例如消息的优先级、时间戳等。
    6. 发送消息:你可以使用Send函数发送消息。在发送消息时,你可以选择异步发送或同步发送。
    2023-10-18 14:17:18
    赞同 展开评论 打赏
  • 本文提供使用HTTP协议下的Go SDK收发定时消息和延时消息的示例代码。https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-subscribe-to-scheduled-messages-and-delayed-messages-1?spm=a2c4g.11186623.0.i80

    发送定时消息或延时消息
    发送定时消息或延时消息的示例代码如下。
    ```package main

    import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
    

    )

    func main() {
    // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
    endpoint := "${HTTP_ENDPOINT}"
    // 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
    // AccessKey ID,阿里云身份验证标识。
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    // AccessKey Secret,阿里云身份验证密钥。
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // 消息所属的Topic,在消息队列RocketMQ版控制台创建。
    topic := "${TOPIC}"
    // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
    // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    
    mqProducer := client.GetProducer(instanceId, topic)
    // 循环发送4条消息。
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
    
            msg = mq_http_sdk.PublishMessageRequest{
            MessageBody: "hello mq!",         // 消息内容。
            MessageTag:  "",                  // 消息标签。
            Properties:  map[string]string{}, // 消息属性。
            }
    // 设置消息的Key。
            msg.MessageKey = "MessageKey"
            // 设置消息自定义属性。
            msg.Properties["a"] = strconv.Itoa(i)
            // 延时消息,发送时间为10s后。该参数格式为毫秒级别的时间戳。
            // 若发送定时消息,设置该参数时需要计算定时时间与当前时间的时间差。
            msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
    
        ret, err := mqProducer.PublishMessage(msg)
    
        if err != nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
    

    }
    ```

    2023-10-18 10:01:06
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 热门讨论

    热门文章

    相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载