内容简介:参考链接1、安装Golang 1.10.1 版本
参考链接
https://mshk.top/2016/07/ubuntu-rabbitmq-golang/
1、安装Golang 1.10.1 版本
1.1、创建 go 的工作环境
注意:我将go的工作环境放在了$HOME/gowork
root@rabbitmq-1:~# mkdir -p $HOME/gowork root@rabbitmq-1:~# cd $HOME/gowork root@rabbitmq-1:~/gowork# mkdir src pkg bin
1.2、下载安装包并解压至/usr/local/ 下
root@rabbitmq-1:~/gowork# wget [https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz](https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz) root@rabbitmq-1:~/gowork# tar -C /usr/local -xzf go1.10.1.linux-amd64.tar.gz
1.3、添加环境变量
在~/.bahsrc文件中添加下面内容:
root@rabbitmq-1:/usr/local/go# vim ~/.bashrc export GOROOT=/usr/local/go //go的安装目录。也就是刚才指定的路径 export GOPATH=$HOME/gowork //这里的路径是你go语言的工作环境,按照自己的路径配置。 export GOBIN=$GOPATH/bin //编译后的可执行文件存放的目录 export PATH=$GOPATH:$GOBIN:$GOROOT:$PATH //添加进PATH路径
注意 :这个地方一定要配置对,配置仔细,要不然在启动网络的时候会出现找不到文件的问题。
使环境变量生效
root@rabbitmq-1:/usr/local/go# source ~/.bashrc
查看是否安装成功
root@rabbitmq-1:/usr/local/go/bin# /usr/local/go/bin/go version go version go1.10.1 linux/amd64
2、Golang调用RabbitMQ的案例
我们先将包下载到本地,然后就可以直接使用了:
root@rabbitmq-1:~# /usr/local/go/bin/go get github.com/streadway/amqp
2.1、使用Golang来发送第一个hello idoall.org
在第一个教程中,我们写程序从一个命名的队列(test-idoall-queues)中发送和接收消息。
producer_hello.go(消息生产者):
root@rabbitmq-1:~# vim producer_hello.go
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/" # rabbitmq-1为主机名
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues"
//Body of message
bodyMsg string = "hello idoall.org"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
//调用发布消息函数
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
}
//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
consumer_hello.go(消息消费者):
root@rabbitmq-1:~# vim consumer_hello.go
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
//调用消息接收者
consumer(uri, exchangeName, queueName)
}
//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
//订阅消息
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
//创建一个channel
forever := make(chan bool)
//调用gorountine
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
//没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
<-forever
}
查看消息(开打两个控制台console)
Console1(运行producer):
root@rabbitmq-1:~# /usr/local/go/bin/go run producer_hello.go
2018/12/14 03:41:32 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 03:41:32 got Connection, getting Channel
2018/12/14 03:41:32 got queue, declaring "test-idoall-queues"
2018/12/14 03:41:32 declared queue, publishing 16B body ("hello idoall.org")
2018/12/14 03:41:32 published 16B OK
然后运行一下命令,可以看到我们刚才创建的queues在列表中
root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues Listing queues test-idoall-queues 1
Console2(运行consumer)打印消息到屏幕,可以看到刚才我们通过producer发送的消息hello idoall.org
root@rabbitmq-1:/usr/sbin# /usr/local/go/bin/go run /root/consumer_hello.go 2018/12/14 03:46:01 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 03:46:01 got Connection, getting Channel 2018/12/14 03:46:01 got queue, declaring "test-idoall-queues" 2018/12/14 03:46:01 Queue bound to Exchange, starting Consume 2018/12/14 03:46:01 [*] Waiting for messages. To exit press CTRL+C 2018/12/14 03:46:01 Received a message: hello idoall.org
此时,web界面上也出现了信息。
image.png
2.2、Rabbitmq的任务分发机制
在2.1章节中,我们写程序从一个命名的队列中发送和接收消息。在这个章节中,我们将创建一个工作队列,将用于分配在多个工人之间的耗时的任务。
RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果任务队伍过多,那么只需要创建更多的Consumer来进行任务处理即可。
producer_task.go(消息生产者):
root@rabbitmq-1:~# vim producer_task.go
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-task"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
bodyMsg := bodyFrom(os.Args)
//调用发布消息函数
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
consumer_task.go(消息消费者)
root@rabbitmq-1:~# vim consumer_task.go
package main
import (
"fmt"
"log"
"bytes"
"time"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-task"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
//调用消息接收者
consumer(uri, exchangeName, queueName)
}
//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
//订阅消息
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
//创建一个channel
forever := make(chan bool)
//调用gorountine
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
//没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
<-forever
}
查看结果
Console1(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go 2018/12/14 05:28:59 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:28:59 got Connection, getting Channel 2018/12/14 05:28:59 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:28:59 Queue bound to Exchange, starting Consume 2018/12/14 05:28:59 [*] Waiting for messages. To exit press CTRL+C
Console2(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go 2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:29:04 got Connection, getting Channel 2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:29:04 Queue bound to Exchange, starting Consume 2018/12/14 05:29:04 [*] Waiting for messages. To exit press CTRL+C
在第三个窗口,这个时候我们使用Producer 来 Publish Message:
root@rabbitmq-1:~# /usr/local/go/bin/go run producer_task.go First message. &&/usr/local/go/bin/go run producer_task.go Second message.. && /usr/local/go/bin/go run producer_task.go Third message... && /usr/local/go/bin/go run producer_task.go Fourth message.... && /usr/local/go/bin/go run producer_task.go Fifth message.....
2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:16 got Connection, getting Channel
2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:16 declared queue, publishing 14B body ("First message.")
2018/12/14 05:34:16 published 14B OK
2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:16 got Connection, getting Channel
2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:16 declared queue, publishing 16B body ("Second message..")
2018/12/14 05:34:16 published 16B OK
2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:17 got Connection, getting Channel
2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:17 declared queue, publishing 16B body ("Third message...")
2018/12/14 05:34:17 published 16B OK
2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:17 got Connection, getting Channel
2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:17 declared queue, publishing 18B body ("Fourth message....")
2018/12/14 05:34:17 published 18B OK
2018/12/14 05:34:18 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:18 got Connection, getting Channel
2018/12/14 05:34:18 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:18 declared queue, publishing 18B body ("Fifth message.....")
2018/12/14 05:34:18 published 18B OK
这时我们再看刚才打开的两个Consumer的结果:
Console1(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go 2018/12/14 05:32:36 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:32:36 got Connection, getting Channel 2018/12/14 05:32:36 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:32:36 Queue bound to Exchange, starting Consume 2018/12/14 05:32:36 [*] Waiting for messages. To exit press CTRL+C 2018/12/14 05:34:16 Received a message: Second message.. 2018/12/14 05:34:18 Done 2018/12/14 05:34:18 Received a message: Fourth message.... 2018/12/14 05:34:22 Done
Console2(consumer):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go 2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 05:29:04 got Connection, getting Channel 2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task" 2018/12/14 05:29:04 Queue bound to Exchange, starting Consume 2018/12/14 05:29:04 [*] Waiting for messages. To exit press CTRL+C 2018/12/14 05:34:16 Received a message: First message. 2018/12/14 05:34:17 Done 2018/12/14 05:34:17 Received a message: Third message... 2018/12/14 05:34:20 Done 2018/12/14 05:34:20 Received a message: Fifth message..... 2018/12/14 05:34:25 Done
默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin,也叫消息轮询
Web页面情况
image.png
2.3、Message acknowledgment 消息确认
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们的代码,一旦RabbitMQ Server发送给Consumer消息后,会立即把这个Message标记为完成,然后从queue中删除。我们将无法再操作这个尚未处理完成的消息。
实际场景中,如果一个Consumer异常退出了,我们希望它处理的数据能够被另外的Consumer处理,这样数据在这种情况下(通道关闭、连接关闭、TCP连接丢失等情况)就不会丢失了。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,ack(nowledgments)是从Consumer消费后发送到一个特定的消息告诉RabbitMQ已经收到、处理结束,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message重新排进队列,发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。
这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。
消息确认默认是关闭的,我们需要通过,d.ACK(false)来告诉RabbitMQ我们已经完成任务。
producer_acknowledgments(消息生产者).go:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange name
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-acknowledgments"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
bodyMsg := bodyFrom(os.Args)
//调用发布消息函数
publish(uri, exchangeName, queueName, bodyMsg)
log.Printf("published %dB OK", len(bodyMsg))
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello idoall.org"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
// Producer只能发送到exchange,它是不能直接发送到queue的。
// 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
// routing_key就是指定的queue名字。
err = channel.Publish(
exchange, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
consumer_acknowledgments(消息消费者).go
package main
import (
"fmt"
"log"
"bytes"
"time"
"github.com/streadway/amqp"
)
const (
//AMQP URI
uri = "amqp://guest:guest@rabbitmq-1:5672/"
//Durable AMQP exchange nam
exchangeName = ""
//Durable AMQP queue name
queueName = "test-idoall-queues-acknowledgments"
)
//如果存在错误,则输出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
//调用消息接收者
consumer(uri, exchangeName, queueName)
}
//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
//建立连接
log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
failOnError(err, "Failed to connect to RabbitMQ")
defer connection.Close()
//创建一个Channel
log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
failOnError(err, "Failed to open a channel")
defer channel.Close()
log.Printf("got queue, declaring %q", queue)
//创建一个queue
q, err := channel.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
log.Printf("Queue bound to Exchange, starting Consume")
//订阅消息
msgs, err := channel.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
//创建一个channel
forever := make(chan bool)
//调用gorountine
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
//没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
<-forever
}
查看结果
我们先使用Producer来发送一列消息:
root@rabbitmq-1:~# /usr/local/go/bin/go run producer_acknowledgments.go First message. &&/usr/local/go/bin/go run producer_acknowledgments.go Second message.. && /usr/local/go/bin/go run producer_acknowledgments.go Third message... && /usr/local/go/bin/go run producer_acknowledgments.go Fourth message.... && /usr/local/go/bin/go run producer_acknowledgments.go Fifth message.....
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:10 got Connection, getting Channel
2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:10 declared queue, publishing 14B body ("First message.")
2018/12/14 07:30:10 published 14B OK
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:10 got Connection, getting Channel
2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:10 declared queue, publishing 16B body ("Second message..")
2018/12/14 07:30:10 published 16B OK
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 16B body ("Third message...")
2018/12/14 07:30:11 published 16B OK
2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 18B body ("Fourth message....")
2018/12/14 07:30:11 published 18B OK
2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 18B body ("Fifth message.....")
2018/12/14 07:30:11 published 18B OK
通过rabbitmqctl命令,来看下messages_unacknowledged的情况:
root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues test-idoall-queues-task 0 0 test-idoall-queues-acknowledgments 5 0 test-idoall-queues 0 0
使用Consumer来订阅消息操作到第四条的时候,我们按CTRL+C退出,这个时候相当于消息已经被读取,但是未发送d.ACK(false):
root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_acknowledgments.go 2018/12/14 07:30:25 dialing "amqp://guest:guest@rabbitmq-1:5672/" 2018/12/14 07:30:25 got Connection, getting Channel 2018/12/14 07:30:25 got queue, declaring "test-idoall-queues-acknowledgments" 2018/12/14 07:30:25 Queue bound to Exchange, starting Consume 2018/12/14 07:30:25 [*] Waiting for messages. To exit press CTRL+C 2018/12/14 07:30:25 Received a message: First message. 2018/12/14 07:30:26 Done 2018/12/14 07:30:26 Received a message: Second message.. 2018/12/14 07:30:28 Done 2018/12/14 07:30:28 Received a message: Third message... 2018/12/14 07:30:31 Done 2018/12/14 07:30:31 Received a message: Fourth message.... ^Csignal: interrupt
再通过rabbitmqctl命令可以看到,还是有2条消息未处理
root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues test-idoall-queues-task 0 0 test-idoall-queues-acknowledgments 2 0 test-idoall-queues 0 0
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 直观讲解-RPC调用和HTTP调用的区别
- iOS混合开发库(GICXMLLayout)布局案例分析(1)今日头条案例
- 调用链系列一:解读UAVStack中的调用链技术
- 调用链系列二:解读UAVStack中的调用链技术
- 调用链系列三:解读UAVStack中的调用链技术
- dubbo源码解析(二十七)远程调用——injvm本地调用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Mastering Regular Expressions, Second Edition
Jeffrey E F Friedl / O'Reilly Media / 2002-07-15 / USD 39.95
Regular expressions are an extremely powerful tool for manipulating text and data. They have spread like wildfire in recent years, now offered as standard features in Perl, Java, VB.NET and C# (and an......一起来看看 《Mastering Regular Expressions, Second Edition》 这本书的介绍吧!