发送端代码
package main import ( "fmt" "log" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s : %s", msg, err) } } func connectMQ() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to opem a channel") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Fail open a chennel") defer ch.Close() q, err := ch.QueueDeclare( "hello", false, false, false, false, nil, ) failOnError(err, "failed to declare a quene") forerver := make(chan bool) go func() { count := 0 for { count += 1 body := fmt.Sprintf("%s , %d", "hello world", count) err = ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") time.Sleep(100 * time.Millisecond) } }() <-forerver } func main() { connectMQ() }
自动应答 接收端代码
package main import ( "log" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s - %s ", msg, err) } } func reviceMQ() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "failed to connect to rabbitmq") defer conn.Close() ch, err := conn.Channel() failOnError(err, " Failede to open achannel") defer ch.Close() ch.Qos(1, 0, true) q, err := ch.QueueDeclare( "hello", false, false, false, false, nil, ) failOnError(err, "failed to declare a queue ")
自动应答, 这样会造成 工人 死亡时 详细丢失
msgs, err := ch.Consume( q.Name, "", true, // 自动应答 标识 false, false, false, nil, ) forever := make(chan bool) go func() { for { d := <-msgs log.Printf("Receied a message : %s \n len msgs : %d \n", d.Body, len(msgs)) time.Sleep(400 * time.Millisecond) d.Ack(true) } }() log.Printf(" waiting for messages , to exit press ctrl+c") <-forever } func main() { reviceMQ() }
使用说明
sender 与 reciver serve 都可以启动多份,来提高队列效率
什么是自动应答?
自动应答就是,接到任务就认为完成,
PrefetchCount
PrefetchSize