RabbitMQ搭建
docker
docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_MANAGEMENT_PLUGIN=rabbitmq_management_agent -p 15672:15672 -p 5672:5672 rabbitmq:management
代码
golang
生产者
package main import ( "flag" "fmt" amqp "github.com/rabbitmq/amqp091-go" "log" "strconv" "time" ) func main() { var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL") var exchange = flag.String("exchange", "logs", "Exchange name") var key = flag.String("key", "log", "Routing key") flag.Parse() // 连接到RabbitMQ服务器 conn, err := amqp.Dial(*url) if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() // 创建一个通道 ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( *exchange, // name: 交换机名称 "fanout", // kind: 交换机类型 true, // durable: 是否持久化 false, // autoDelete: 没有队列绑定时是否自动删除 false, // internal: 是否是内部交换机 false, // noWait: 是否需要等待服务器响应 nil, // args: 其他参数 ) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } // 发送消息 body := "Hello World!" + fmt.Sprintf(time.Now().String()) for i := 0; i < 20; i++ { body = strconv.Itoa(i) + body err = ch.Publish( *exchange, // 交换机名称 *key, // 路由键 false, // 强制发布 false, // 立即发布 amqp.Publishing{ ContentType: "text/plain", DeliveryMode: amqp.Persistent, Body: []byte(body), Expiration: "10000", // 3000 3秒 }) } if err != nil { log.Fatalf("Failed to publish a message: %v", err) } fmt.Printf(" [x] Sent %s", body) }
消费者
package main import ( "flag" "fmt" "log" amqp "github.com/rabbitmq/amqp091-go" ) func main() { var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL") var exchange = flag.String("exchange", "logs", "Exchange name") var key = flag.String("key", "log", "Routing key") flag.Parse() // 连接到RabbitMQ服务器 conn, err := amqp.Dial(*url) if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() // 创建一个通道 ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() // 声明一个交换机 err = ch.ExchangeDeclare( *exchange, // name: 交换机名称 "fanout", // kind: 交换机类型 true, // durable: 是否持久化 false, // autoDelete: 没有队列绑定时是否自动删除 false, // internal: 是否是内部交换机 false, // noWait: 是否需要等待服务器响应 nil, // args: 其他参数 ) if err != nil { log.Fatalf("Failed to declare an exchange: %v", err) } // 声明一个队列 q, err := ch.QueueDeclare( "queue01", // 随机生成队列名称 true, // 持久化 false, // 删除 false, // 独占 false, // 不等消息 nil, // 其他参数 ) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } // 绑定队列到交换机 err = ch.QueueBind( q.Name, // 队列名称 *key, // 路由键 *exchange, // 交换机名称 false, // 现在绑定 nil, // 其他参数 ) if err != nil { log.Fatalf("Failed to bind a queue: %v", err) } // 接收消息 msgs, err := ch.Consume( q.Name, // 队列名称 "consumer01", // 消费者标签 false, // 自动ack false, // 不独占 false, // 不等消息 false, // 不从服务器获取消息 nil, // 其他参数 ) if err != nil { log.Fatalf("Failed to register a consumer: %v", err) } fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n") for d := range msgs { // 输出接收到的消息 fmt.Printf(" [x] Received %s\n", d.Body) err = ch.Ack(d.DeliveryTag, true) if err != nil { log.Fatalf("Failed to ack message: %v", err) } } }
可视化
看板
账户密码
admin admin
消费进度
http://localhost:15672/#/queues