内容简介:参考链接1、安装Golang 1.10.1 版本
参考链接
https://mshk.top/2016/07/ubuntu-rabbitmq-golang/
1、安装Golang 1.10.1 版本
1.1、创建 go 的工作环境
注意:我将go的工作环境放在了$HOME/gowork
root@rabbitmq-1:~# mkdir -p $HOME/gowork root@rabbitmq-1:~# cd $HOME/gowork root@rabbitmq-1:~/gowork# mkdir src pkg bin
1.2、下载安装包并解压至/usr/local/ 下
root@rabbitmq-1:~/gowork# wget [https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz](https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz) root@rabbitmq-1:~/gowork# tar -C /usr/local -xzf go1.10.1.linux-amd64.tar.gz
1.3、添加环境变量
在~/.bahsrc文件中添加下面内容:
root@rabbitmq-1:/usr/local/go# vim ~/.bashrc export GOROOT=/usr/local/go //go的安装目录。也就是刚才指定的路径 export GOPATH=$HOME/gowork //这里的路径是你go语言的工作环境,按照自己的路径配置。 export GOBIN=$GOPATH/bin //编译后的可执行文件存放的目录 export PATH=$GOPATH:$GOBIN:$GOROOT:$PATH //添加进PATH路径
注意 :这个地方一定要配置对,配置仔细,要不然在启动网络的时候会出现找不到文件的问题。
使环境变量生效
root@rabbitmq-1:/usr/local/go# source ~/.bashrc
查看是否安装成功
root@rabbitmq-1:/usr/local/go/bin# /usr/local/go/bin/go version go version go1.10.1 linux/amd64
2、Golang调用RabbitMQ的案例
我们先将包下载到本地,然后就可以直接使用了:
root@rabbitmq-1:~# /usr/local/go/bin/go get github.com/streadway/amqp
2.1、使用Golang来发送第一个hello idoall.org
在第一个教程中,我们写程序从一个命名的队列(test-idoall-queues)中发送和接收消息。
producer_hello.go(消息生产者):
root@rabbitmq-1:~# vim producer_hello.go package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@rabbitmq-1:5672/" # rabbitmq-1为主机名 //Durable AMQP exchange name exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues" //Body of message bodyMsg string = "hello idoall.org" ) //如果存在错误,则输出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ //调用发布消息函数 publish(uri, exchangeName, queueName, bodyMsg) log.Printf("published %dB OK", len(bodyMsg)) } //发布者的方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名称 //@queue, queue的名称 //@body, 主体内容 func publish(amqpURI string, exchange string, queue string, body string){ //建立连接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //创建一个Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //创建一个queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("declared queue, publishing %dB body (%q)", len(body), body) // Producer只能发送到exchange,它是不能直接发送到queue的。 // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。 // routing_key就是指定的queue名字。 err = channel.Publish( exchange, // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), }) failOnError(err, "Failed to publish a message") }
consumer_hello.go(消息消费者):
root@rabbitmq-1:~# vim consumer_hello.go package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@rabbitmq-1:5672/" //Durable AMQP exchange nam exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues" ) //如果存在错误,则输出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ //调用消息接收者 consumer(uri, exchangeName, queueName) } //接收者方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名称 //@queue, queue的名称 func consumer(amqpURI string, exchange string, queue string){ //建立连接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //创建一个Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //创建一个queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("Queue bound to Exchange, starting Consume") //订阅消息 msgs, err := channel.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") //创建一个channel forever := make(chan bool) //调用gorountine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出 <-forever }
查看消息(开打两个控制台console)
Console1(运行producer):
root@rabbitmq-1:~# /usr/local/go/bin/go run producer_hello.go 2018/12/14 03:41:32 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 03:41:32 got Connection, getting Channel 2018/12/14 03:41:32 got queue, declaring "test-idoall-queues" 2018/12/14 03:41:32 declared queue, publishing 16B body ("hello idoall.org") 2018/12/14 03:41:32 published 16B OK
然后运行一下命令,可以看到我们刚才创建的queues在列表中
root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues Listing queues test-idoall-queues 1
Console2(运行consumer)打印消息到屏幕,可以看到刚才我们通过producer发送的消息hello idoall.org
root@rabbitmq-1:/usr/sbin# /usr/local/go/bin/go run /root/consumer_hello.go 2018/12/14 03:46:01 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 03:46:01 got Connection, getting Channel 2018/12/14 03:46:01 got queue, declaring "test-idoall-queues" 2018/12/14 03:46:01 Queue bound to Exchange, starting Consume 2018/12/14 03:46:01 [*] Waiting for messages. To exit press CTRL+C 2018/12/14 03:46:01 Received a message: hello idoall.org
此时,web界面上也出现了信息。
image.png
2.2、Rabbitmq的任务分发机制
在2.1章节中,我们写程序从一个命名的队列中发送和接收消息。在这个章节中,我们将创建一个工作队列,将用于分配在多个工人之间的耗时的任务。
RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果任务队伍过多,那么只需要创建更多的Consumer来进行任务处理即可。
producer_task.go(消息生产者):
root@rabbitmq-1:~# vim producer_task.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@rabbitmq-1:5672/" //Durable AMQP exchange name exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-task" ) //如果存在错误,则输出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ bodyMsg := bodyFrom(os.Args) //调用发布消息函数 publish(uri, exchangeName, queueName, bodyMsg) log.Printf("published %dB OK", len(bodyMsg)) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello idoall.org" } else { s = strings.Join(args[1:], " ") } return s } //发布者的方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名称 //@queue, queue的名称 //@body, 主体内容 func publish(amqpURI string, exchange string, queue string, body string){ //建立连接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //创建一个Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //创建一个queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("declared queue, publishing %dB body (%q)", len(body), body) // Producer只能发送到exchange,它是不能直接发送到queue的。 // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。 // routing_key就是指定的queue名字。 err = channel.Publish( exchange, // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), }) failOnError(err, "Failed to publish a message") }
consumer_task.go(消息消费者)
root@rabbitmq-1:~# vim consumer_task.go package main import ( "fmt" "log" "bytes" "time" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@rabbitmq-1:5672/" //Durable AMQP exchange nam exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-task" ) //如果存在错误,则输出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ //调用消息接收者 consumer(uri, exchangeName, queueName) } //接收者方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名称 //@queue, queue的名称 func consumer(amqpURI string, exchange string, queue string){ //建立连接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //创建一个Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //创建一个queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("Queue bound to Exchange, starting Consume") //订阅消息 msgs, err := channel.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") //创建一个channel forever := make(chan bool) //调用gorountine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出 <-forever }
查看结果
Console1(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go 2018/12/14 05:28:59 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:28:59 got Connection, getting Channel 2018/12/14 05:28:59 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:28:59 Queue bound to Exchange, starting Consume 2018/12/14 05:28:59 [*] Waiting for messages. To exit press CTRL+C
Console2(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go 2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:29:04 got Connection, getting Channel 2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:29:04 Queue bound to Exchange, starting Consume 2018/12/14 05:29:04 [*] Waiting for messages. To exit press CTRL+C
在第三个窗口,这个时候我们使用Producer 来 Publish Message:
root@rabbitmq-1:~# /usr/local/go/bin/go run producer_task.go First message. &&/usr/local/go/bin/go run producer_task.go Second message.. && /usr/local/go/bin/go run producer_task.go Third message... && /usr/local/go/bin/go run producer_task.go Fourth message.... && /usr/local/go/bin/go run producer_task.go Fifth message..... 2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:34:16 got Connection, getting Channel 2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:34:16 declared queue, publishing 14B body ("First message.") 2018/12/14 05:34:16 published 14B OK 2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:34:16 got Connection, getting Channel 2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:34:16 declared queue, publishing 16B body ("Second message..") 2018/12/14 05:34:16 published 16B OK 2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:34:17 got Connection, getting Channel 2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:34:17 declared queue, publishing 16B body ("Third message...") 2018/12/14 05:34:17 published 16B OK 2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:34:17 got Connection, getting Channel 2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:34:17 declared queue, publishing 18B body ("Fourth message....") 2018/12/14 05:34:17 published 18B OK 2018/12/14 05:34:18 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:34:18 got Connection, getting Channel 2018/12/14 05:34:18 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:34:18 declared queue, publishing 18B body ("Fifth message.....") 2018/12/14 05:34:18 published 18B OK
这时我们再看刚才打开的两个Consumer的结果:
Console1(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go 2018/12/14 05:32:36 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:32:36 got Connection, getting Channel 2018/12/14 05:32:36 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:32:36 Queue bound to Exchange, starting Consume 2018/12/14 05:32:36 [*] Waiting for messages. To exit press CTRL+C 2018/12/14 05:34:16 Received a message: Second message.. 2018/12/14 05:34:18 Done 2018/12/14 05:34:18 Received a message: Fourth message.... 2018/12/14 05:34:22 Done
Console2(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go 2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:29:04 got Connection, getting Channel 2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:29:04 Queue bound to Exchange, starting Consume 2018/12/14 05:29:04 [*] Waiting for messages. To exit press CTRL+C 2018/12/14 05:34:16 Received a message: First message. 2018/12/14 05:34:17 Done 2018/12/14 05:34:17 Received a message: Third message... 2018/12/14 05:34:20 Done 2018/12/14 05:34:20 Received a message: Fifth message..... 2018/12/14 05:34:25 Done
默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin,也叫消息轮询
Web页面情况
image.png
2.3、Message acknowledgment 消息确认
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们的代码,一旦RabbitMQ Server发送给Consumer消息后,会立即把这个Message标记为完成,然后从queue中删除。我们将无法再操作这个尚未处理完成的消息。
实际场景中,如果一个Consumer异常退出了,我们希望它处理的数据能够被另外的Consumer处理,这样数据在这种情况下(通道关闭、连接关闭、TCP连接丢失等情况)就不会丢失了。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,ack(nowledgments)是从Consumer消费后发送到一个特定的消息告诉RabbitMQ已经收到、处理结束,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message重新排进队列,发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。
这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。
消息确认默认是关闭的,我们需要通过,d.ACK(false)来告诉RabbitMQ我们已经完成任务。
producer_acknowledgments(消息生产者).go:
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@rabbitmq-1:5672/" //Durable AMQP exchange name exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-acknowledgments" ) //如果存在错误,则输出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ bodyMsg := bodyFrom(os.Args) //调用发布消息函数 publish(uri, exchangeName, queueName, bodyMsg) log.Printf("published %dB OK", len(bodyMsg)) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello idoall.org" } else { s = strings.Join(args[1:], " ") } return s } //发布者的方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名称 //@queue, queue的名称 //@body, 主体内容 func publish(amqpURI string, exchange string, queue string, body string){ //建立连接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //创建一个Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //创建一个queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("declared queue, publishing %dB body (%q)", len(body), body) // Producer只能发送到exchange,它是不能直接发送到queue的。 // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。 // routing_key就是指定的queue名字。 err = channel.Publish( exchange, // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), }) failOnError(err, "Failed to publish a message") }
consumer_acknowledgments(消息消费者).go
package main import ( "fmt" "log" "bytes" "time" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:guest@rabbitmq-1:5672/" //Durable AMQP exchange nam exchangeName = "" //Durable AMQP queue name queueName = "test-idoall-queues-acknowledgments" ) //如果存在错误,则输出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main(){ //调用消息接收者 consumer(uri, exchangeName, queueName) } //接收者方法 // //@amqpURI, amqp的地址 //@exchange, exchange的名称 //@queue, queue的名称 func consumer(amqpURI string, exchange string, queue string){ //建立连接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //创建一个Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //创建一个queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("Queue bound to Exchange, starting Consume") //订阅消息 msgs, err := channel.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") //创建一个channel forever := make(chan bool) //调用gorountine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出 <-forever }
查看结果
我们先使用Producer来发送一列消息:
root@rabbitmq-1:~# /usr/local/go/bin/go run producer_acknowledgments.go First message. &&/usr/local/go/bin/go run producer_acknowledgments.go Second message.. && /usr/local/go/bin/go run producer_acknowledgments.go Third message... && /usr/local/go/bin/go run producer_acknowledgments.go Fourth message.... && /usr/local/go/bin/go run producer_acknowledgments.go Fifth message..... 2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 07:30:10 got Connection, getting Channel 2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments" 2018/12/14 07:30:10 declared queue, publishing 14B body ("First message.") 2018/12/14 07:30:10 published 14B OK 2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 07:30:10 got Connection, getting Channel 2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments" 2018/12/14 07:30:10 declared queue, publishing 16B body ("Second message..") 2018/12/14 07:30:10 published 16B OK 2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 07:30:11 got Connection, getting Channel 2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments" 2018/12/14 07:30:11 declared queue, publishing 16B body ("Third message...") 2018/12/14 07:30:11 published 16B OK 2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 07:30:11 got Connection, getting Channel 2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments" 2018/12/14 07:30:11 declared queue, publishing 18B body ("Fourth message....") 2018/12/14 07:30:11 published 18B OK 2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 07:30:11 got Connection, getting Channel 2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments" 2018/12/14 07:30:11 declared queue, publishing 18B body ("Fifth message.....") 2018/12/14 07:30:11 published 18B OK
通过rabbitmqctl命令,来看下messages_unacknowledged的情况:
root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues test-idoall-queues-task 0 0 test-idoall-queues-acknowledgments 5 0 test-idoall-queues 0 0
使用Consumer来订阅消息操作到第四条的时候,我们按CTRL+C退出,这个时候相当于消息已经被读取,但是未发送d.ACK(false):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_acknowledgments.go 2018/12/14 07:30:25 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 07:30:25 got Connection, getting Channel 2018/12/14 07:30:25 got queue, declaring "test-idoall-queues-acknowledgments" 2018/12/14 07:30:25 Queue bound to Exchange, starting Consume 2018/12/14 07:30:25 [*] Waiting for messages. To exit press CTRL+C 2018/12/14 07:30:25 Received a message: First message. 2018/12/14 07:30:26 Done 2018/12/14 07:30:26 Received a message: Second message.. 2018/12/14 07:30:28 Done 2018/12/14 07:30:28 Received a message: Third message... 2018/12/14 07:30:31 Done 2018/12/14 07:30:31 Received a message: Fourth message.... ^Csignal: interrupt
再通过rabbitmqctl命令可以看到,还是有2条消息未处理
root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues test-idoall-queues-task 0 0 test-idoall-queues-acknowledgments 2 0 test-idoall-queues 0 0
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 直观讲解-RPC调用和HTTP调用的区别
- iOS混合开发库(GICXMLLayout)布局案例分析(1)今日头条案例
- 调用链系列一:解读UAVStack中的调用链技术
- 调用链系列二:解读UAVStack中的调用链技术
- 调用链系列三:解读UAVStack中的调用链技术
- dubbo源码解析(二十七)远程调用——injvm本地调用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Designing with Web Standards (2nd Edition)
Jeffrey Zeldman / Peachpit Press / 2006-07-06 / USD 44.99
Best-selling author, designer, and web standards evangelist Jeffrey Zeldman has updated his classic, industry-shaking guidebook. This new edition--now in full color--covers improvements in best prac......一起来看看 《Designing with Web Standards (2nd Edition)》 这本书的介绍吧!