Golang调用RabbitMQ的案例

栏目: Go · 发布时间: 7年前

内容简介:参考链接1、安装Golang 1.10.1 版本

参考链接

https://mshk.top/2016/07/ubuntu-rabbitmq-golang/

1、安装Golang 1.10.1 版本

1.1、创建 go 的工作环境

注意:我将go的工作环境放在了$HOME/gowork

root@rabbitmq-1:~# mkdir -p $HOME/gowork

root@rabbitmq-1:~# cd $HOME/gowork

root@rabbitmq-1:~/gowork# mkdir src pkg bin

1.2、下载安装包并解压至/usr/local/ 下

root@rabbitmq-1:~/gowork# wget [https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz](https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz)

root@rabbitmq-1:~/gowork# tar -C /usr/local -xzf go1.10.1.linux-amd64.tar.gz

1.3、添加环境变量

在~/.bahsrc文件中添加下面内容:

root@rabbitmq-1:/usr/local/go# vim ~/.bashrc
export GOROOT=/usr/local/go //go的安装目录。也就是刚才指定的路径
export GOPATH=$HOME/gowork //这里的路径是你go语言的工作环境,按照自己的路径配置。
export GOBIN=$GOPATH/bin //编译后的可执行文件存放的目录
export PATH=$GOPATH:$GOBIN:$GOROOT:$PATH //添加进PATH路径

注意 :这个地方一定要配置对,配置仔细,要不然在启动网络的时候会出现找不到文件的问题。

使环境变量生效

root@rabbitmq-1:/usr/local/go# source ~/.bashrc

查看是否安装成功

root@rabbitmq-1:/usr/local/go/bin# /usr/local/go/bin/go version
go version go1.10.1 linux/amd64

2、Golang调用RabbitMQ的案例

我们先将包下载到本地,然后就可以直接使用了:

root@rabbitmq-1:~# /usr/local/go/bin/go get github.com/streadway/amqp

2.1、使用Golang来发送第一个hello idoall.org

在第一个教程中,我们写程序从一个命名的队列(test-idoall-queues)中发送和接收消息。

producer_hello.go(消息生产者):

root@rabbitmq-1:~# vim producer_hello.go

package main

import (

 "fmt"

 "log"

 "github.com/streadway/amqp"

)

const (

 //AMQP URI

 uri = "amqp://guest:guest@rabbitmq-1:5672/" # rabbitmq-1为主机名

 //Durable AMQP exchange name

 exchangeName = ""

 //Durable AMQP queue name

 queueName = "test-idoall-queues"

 //Body of message

 bodyMsg string = "hello idoall.org"

)

//如果存在错误,则输出

func failOnError(err error, msg string) {

 if err != nil {

 log.Fatalf("%s: %s", msg, err)

 panic(fmt.Sprintf("%s: %s", msg, err))

 }

}

func main(){

 //调用发布消息函数

 publish(uri, exchangeName, queueName, bodyMsg)

 log.Printf("published %dB OK", len(bodyMsg))

}

//发布者的方法

//

//@amqpURI, amqp的地址

//@exchange, exchange的名称

//@queue, queue的名称

//@body, 主体内容

func publish(amqpURI string, exchange string, queue string, body string){

 //建立连接

 log.Printf("dialing %q", amqpURI)

 connection, err := amqp.Dial(amqpURI)

 failOnError(err, "Failed to connect to RabbitMQ")

 defer connection.Close()

 //创建一个Channel

 log.Printf("got Connection, getting Channel")

 channel, err := connection.Channel()

 failOnError(err, "Failed to open a channel")

 defer channel.Close()

 log.Printf("got queue, declaring %q", queue)

 //创建一个queue

 q, err := channel.QueueDeclare(

 queueName, // name

 false, // durable

 false, // delete when unused

 false, // exclusive

 false, // no-wait

 nil, // arguments

 )

 failOnError(err, "Failed to declare a queue")

 log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

 // Producer只能发送到exchange,它是不能直接发送到queue的。

 // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。

 // routing_key就是指定的queue名字。

 err = channel.Publish(

 exchange, // exchange

 q.Name, // routing key

 false, // mandatory

 false, // immediate

 amqp.Publishing {

 Headers: amqp.Table{},

 ContentType: "text/plain",

 ContentEncoding: "",

 Body: []byte(body),

 })

 failOnError(err, "Failed to publish a message")

}

consumer_hello.go(消息消费者):

root@rabbitmq-1:~# vim consumer_hello.go

package main

import (

 "fmt"

 "log"

 "github.com/streadway/amqp"

)

const (

 //AMQP URI

 uri = "amqp://guest:guest@rabbitmq-1:5672/"

 //Durable AMQP exchange nam

 exchangeName = ""

 //Durable AMQP queue name

 queueName = "test-idoall-queues"

)

//如果存在错误,则输出

func failOnError(err error, msg string) {

 if err != nil {

 log.Fatalf("%s: %s", msg, err)

 panic(fmt.Sprintf("%s: %s", msg, err))

 }

}

func main(){

 //调用消息接收者

 consumer(uri, exchangeName, queueName)

}

//接收者方法

//

//@amqpURI, amqp的地址

//@exchange, exchange的名称

//@queue, queue的名称

func consumer(amqpURI string, exchange string, queue string){

 //建立连接

 log.Printf("dialing %q", amqpURI)

 connection, err := amqp.Dial(amqpURI)

  failOnError(err, "Failed to connect to RabbitMQ")

 defer connection.Close()

 //创建一个Channel

 log.Printf("got Connection, getting Channel")

 channel, err := connection.Channel()

 failOnError(err, "Failed to open a channel")

 defer channel.Close()

 log.Printf("got queue, declaring %q", queue)

 //创建一个queue

 q, err := channel.QueueDeclare(

 queueName, // name

 false, // durable

 false, // delete when unused

 false, // exclusive

 false, // no-wait

 nil, // arguments

 )

 failOnError(err, "Failed to declare a queue")

 log.Printf("Queue bound to Exchange, starting Consume")

 //订阅消息

 msgs, err := channel.Consume(

 q.Name, // queue

 "", // consumer

 true, // auto-ack

 false, // exclusive

 false, // no-local

 false, // no-wait

 nil, // args

 )

 failOnError(err, "Failed to register a consumer")

 //创建一个channel

 forever := make(chan bool)

 //调用gorountine

 go func() {

 for d := range msgs {

 log.Printf("Received a message: %s", d.Body)

 }

 }()

 log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

 //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出

 <-forever

}

查看消息(开打两个控制台console)

Console1(运行producer):

root@rabbitmq-1:~# /usr/local/go/bin/go run producer_hello.go

2018/12/14 03:41:32 dialing "amqp://guest:guest@rabbitmq-1:5672/"

2018/12/14 03:41:32 got Connection, getting Channel

2018/12/14 03:41:32 got queue, declaring "test-idoall-queues"

2018/12/14 03:41:32 declared queue, publishing 16B body ("hello idoall.org")

2018/12/14 03:41:32 published 16B OK

然后运行一下命令,可以看到我们刚才创建的queues在列表中

root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues

Listing queues

test-idoall-queues  1

Console2(运行consumer)打印消息到屏幕,可以看到刚才我们通过producer发送的消息hello idoall.org

root@rabbitmq-1:/usr/sbin# /usr/local/go/bin/go run /root/consumer_hello.go

2018/12/14 03:46:01 dialing "amqp://guest:guest@rabbitmq-1:5672/"

2018/12/14 03:46:01 got Connection, getting Channel

2018/12/14 03:46:01 got queue, declaring "test-idoall-queues"

2018/12/14 03:46:01 Queue bound to Exchange, starting Consume

2018/12/14 03:46:01 [*] Waiting for messages. To exit press CTRL+C

2018/12/14 03:46:01 Received a message: hello idoall.org

此时,web界面上也出现了信息。

Golang调用RabbitMQ的案例

image.png

2.2、Rabbitmq的任务分发机制

在2.1章节中,我们写程序从一个命名的队列中发送和接收消息。在这个章节中,我们将创建一个工作队列,将用于分配在多个工人之间的耗时的任务。

RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果任务队伍过多,那么只需要创建更多的Consumer来进行任务处理即可。

producer_task.go(消息生产者):

root@rabbitmq-1:~# vim producer_task.go
package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri          =  "amqp://guest:guest@rabbitmq-1:5672/"
  //Durable AMQP exchange name
  exchangeName =  ""
  //Durable AMQP queue name
  queueName    = "test-idoall-queues-task"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //调用发布消息函数
  publish(uri, exchangeName, queueName, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
  //建立连接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()

  //创建一个Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()

  log.Printf("got queue, declaring %q", queue)

  //创建一个queue
  q, err := channel.QueueDeclare(
    queueName, // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

  // Producer只能发送到exchange,它是不能直接发送到queue的。
  // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
  // routing_key就是指定的queue名字。
  err = channel.Publish(
    exchange,     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_task.go(消息消费者)

root@rabbitmq-1:~# vim consumer_task.go
package main

import (
  "fmt"
  "log"
  "bytes"
  "time"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@rabbitmq-1:5672/"
  //Durable AMQP exchange nam
  exchangeName  = ""
  //Durable AMQP queue name
  queueName     = "test-idoall-queues-task"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //调用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
  //建立连接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()

  //创建一个Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()

  log.Printf("got queue, declaring %q", queue)

  //创建一个queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //订阅消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //创建一个channel
  forever := make(chan bool)

  //调用gorountine
  go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
  <-forever
}

查看结果

Console1(consumer):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:28:59 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:28:59 got Connection, getting Channel
2018/12/14 05:28:59 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:28:59 Queue bound to Exchange, starting Consume
2018/12/14 05:28:59  [*] Waiting for messages. To exit press CTRL+C

Console2(consumer):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:29:04 got Connection, getting Channel
2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:29:04 Queue bound to Exchange, starting Consume
2018/12/14 05:29:04  [*] Waiting for messages. To exit press CTRL+C

在第三个窗口,这个时候我们使用Producer 来 Publish Message:

root@rabbitmq-1:~# /usr/local/go/bin/go run producer_task.go First message. &&/usr/local/go/bin/go run producer_task.go Second message.. && /usr/local/go/bin/go run producer_task.go Third message... && /usr/local/go/bin/go run producer_task.go Fourth message.... && /usr/local/go/bin/go run producer_task.go Fifth message.....
2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:16 got Connection, getting Channel
2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:16 declared queue, publishing 14B body ("First message.")
2018/12/14 05:34:16 published 14B OK
2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:16 got Connection, getting Channel
2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:16 declared queue, publishing 16B body ("Second message..")
2018/12/14 05:34:16 published 16B OK
2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:17 got Connection, getting Channel
2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:17 declared queue, publishing 16B body ("Third message...")
2018/12/14 05:34:17 published 16B OK
2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:17 got Connection, getting Channel
2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:17 declared queue, publishing 18B body ("Fourth message....")
2018/12/14 05:34:17 published 18B OK
2018/12/14 05:34:18 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:18 got Connection, getting Channel
2018/12/14 05:34:18 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:18 declared queue, publishing 18B body ("Fifth message.....")
2018/12/14 05:34:18 published 18B OK

这时我们再看刚才打开的两个Consumer的结果:

Console1(consumer):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:32:36 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:32:36 got Connection, getting Channel
2018/12/14 05:32:36 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:32:36 Queue bound to Exchange, starting Consume
2018/12/14 05:32:36  [*] Waiting for messages. To exit press CTRL+C
2018/12/14 05:34:16 Received a message: Second message..
2018/12/14 05:34:18 Done
2018/12/14 05:34:18 Received a message: Fourth message....
2018/12/14 05:34:22 Done

Console2(consumer):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:29:04 got Connection, getting Channel
2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:29:04 Queue bound to Exchange, starting Consume
2018/12/14 05:29:04  [*] Waiting for messages. To exit press CTRL+C
2018/12/14 05:34:16 Received a message: First message.
2018/12/14 05:34:17 Done
2018/12/14 05:34:17 Received a message: Third message...
2018/12/14 05:34:20 Done
2018/12/14 05:34:20 Received a message: Fifth message.....
2018/12/14 05:34:25 Done

默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin,也叫消息轮询

Web页面情况

Golang调用RabbitMQ的案例

image.png

2.3、Message acknowledgment 消息确认

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们的代码,一旦RabbitMQ Server发送给Consumer消息后,会立即把这个Message标记为完成,然后从queue中删除。我们将无法再操作这个尚未处理完成的消息。

实际场景中,如果一个Consumer异常退出了,我们希望它处理的数据能够被另外的Consumer处理,这样数据在这种情况下(通道关闭、连接关闭、TCP连接丢失等情况)就不会丢失了。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,ack(nowledgments)是从Consumer消费后发送到一个特定的消息告诉RabbitMQ已经收到、处理结束,RabbitMQ可以去安全的删除它了。

如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message重新排进队列,发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

消息确认默认是关闭的,我们需要通过,d.ACK(false)来告诉RabbitMQ我们已经完成任务。

producer_acknowledgments(消息生产者).go:

package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri          =  "amqp://guest:guest@rabbitmq-1:5672/"
  //Durable AMQP exchange name
  exchangeName =  ""
  //Durable AMQP queue name
  queueName    = "test-idoall-queues-acknowledgments"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //调用发布消息函数
  publish(uri, exchangeName, queueName, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
  //建立连接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()

  //创建一个Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()

  log.Printf("got queue, declaring %q", queue)

  //创建一个queue
  q, err := channel.QueueDeclare(
    queueName, // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

  // Producer只能发送到exchange,它是不能直接发送到queue的。
  // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
  // routing_key就是指定的queue名字。
  err = channel.Publish(
    exchange,     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}
  

consumer_acknowledgments(消息消费者).go

package main

import (
  "fmt"
  "log"
  "bytes"
  "time"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@rabbitmq-1:5672/"
  //Durable AMQP exchange nam
  exchangeName  = ""
  //Durable AMQP queue name
  queueName     = "test-idoall-queues-acknowledgments"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //调用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
  //建立连接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()

  //创建一个Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()

  log.Printf("got queue, declaring %q", queue)

  //创建一个queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //订阅消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //创建一个channel
  forever := make(chan bool)

  //调用gorountine
  go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
        d.Ack(false)
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
  <-forever
}

查看结果

我们先使用Producer来发送一列消息:

root@rabbitmq-1:~# /usr/local/go/bin/go run producer_acknowledgments.go First message. &&/usr/local/go/bin/go run producer_acknowledgments.go Second message.. && /usr/local/go/bin/go run producer_acknowledgments.go Third message... && /usr/local/go/bin/go run producer_acknowledgments.go Fourth message.... && /usr/local/go/bin/go run producer_acknowledgments.go Fifth message.....
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:10 got Connection, getting Channel
2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:10 declared queue, publishing 14B body ("First message.")
2018/12/14 07:30:10 published 14B OK
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:10 got Connection, getting Channel
2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:10 declared queue, publishing 16B body ("Second message..")
2018/12/14 07:30:10 published 16B OK
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 16B body ("Third message...")
2018/12/14 07:30:11 published 16B OK
2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 18B body ("Fourth message....")
2018/12/14 07:30:11 published 18B OK
2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 18B body ("Fifth message.....")
2018/12/14 07:30:11 published 18B OK

通过rabbitmqctl命令,来看下messages_unacknowledged的情况:

root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
test-idoall-queues-task 0   0
test-idoall-queues-acknowledgments  5   0
test-idoall-queues  0   0

使用Consumer来订阅消息操作到第四条的时候,我们按CTRL+C退出,这个时候相当于消息已经被读取,但是未发送d.ACK(false):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_acknowledgments.go 
2018/12/14 07:30:25 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:25 got Connection, getting Channel
2018/12/14 07:30:25 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:25 Queue bound to Exchange, starting Consume
2018/12/14 07:30:25  [*] Waiting for messages. To exit press CTRL+C
2018/12/14 07:30:25 Received a message: First message.
2018/12/14 07:30:26 Done
2018/12/14 07:30:26 Received a message: Second message..
2018/12/14 07:30:28 Done
2018/12/14 07:30:28 Received a message: Third message...
2018/12/14 07:30:31 Done
2018/12/14 07:30:31 Received a message: Fourth message....
^Csignal: interrupt

再通过rabbitmqctl命令可以看到,还是有2条消息未处理

root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
test-idoall-queues-task 0   0
test-idoall-queues-acknowledgments  2   0
test-idoall-queues  0   0

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Mastering Regular Expressions, Second Edition

Mastering Regular Expressions, Second Edition

Jeffrey E F Friedl / O'Reilly Media / 2002-07-15 / USD 39.95

Regular expressions are an extremely powerful tool for manipulating text and data. They have spread like wildfire in recent years, now offered as standard features in Perl, Java, VB.NET and C# (and an......一起来看看 《Mastering Regular Expressions, Second Edition》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具