内容简介:程序封装使用方法
程序封装
package rabbitmq import ( "fmt" "github.com/streadway/amqp" "time" ) // 定义全局变量,指针类型 var mqConn *amqp.Connection var mqChan *amqp.Channel // 定义生产者接口 type Producer interface { MsgContent() string } // 定义接收者接口 type Receiver interface { Consumer([]byte) error } // 定义RabbitMQ对象 type RabbitMQ struct { connection *amqp.Connection channel *amqp.Channel queueName string // 队列名称 routingKey string // key名称 exchangeName string // 交换机名称 exchangeType string // 交换机类型 producerList []Producer receiverList []Receiver } // 定义队列交换机对象 type QueueExchange struct { QuName string // 队列名称 RtKey string // key值 ExName string // 交换机名称 ExType string // 交换机类型 } // 链接rabbitMQ func (r *RabbitMQ)mqConnect() { var err error RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%d/", "guest", "guest", "******", 5673) mqConn, err = amqp.Dial(RabbitUrl) r.connection = mqConn // 赋值给RabbitMQ对象 if err != nil { fmt.Printf("MQ打开链接失败:%s \n", err) } mqChan, err = mqConn.Channel() r.channel = mqChan // 赋值给RabbitMQ对象 if err != nil { fmt.Printf("MQ打开管道失败:%s \n", err) } } // 关闭RabbitMQ连接 func (r *RabbitMQ)mqClose() { // 先关闭管道,再关闭链接 err := r.channel.Close() if err != nil { fmt.Printf("MQ管道关闭失败:%s \n", err) } err = r.connection.Close() if err != nil { fmt.Printf("MQ链接关闭失败:%s \n", err) } } // 创建一个新的操作对象 func New(q *QueueExchange) *RabbitMQ { return &RabbitMQ{ queueName:q.QuName, routingKey:q.RtKey, exchangeName: q.ExName, exchangeType: q.ExType, } } // 启动RabbitMQ客户端,并初始化 func (r *RabbitMQ) Start() { // 开启监听生产者发送任务 for _, producer := range r.producerList { go r.listenProducer(producer) } // 开启监听接收者接收任务 for _, receiver := range r.receiverList { go r.listenReceiver(receiver) } time.Sleep(1*time.Second) } // 注册发送指定队列指定路由的生产者 func (r *RabbitMQ) RegisterProducer(producer Producer) { r.producerList = append(r.producerList, producer) } // 发送任务 func (r *RabbitMQ) listenProducer(producer Producer) { // 验证链接是否正常,否则重新链接 if r.channel == nil { r.mqConnect() } // 用于检查队列是否存在,已经存在不需要重复声明 _, err := r.channel.QueueDeclarePassive(r.queueName, true,false,false,true,nil) if err != nil{ // 队列不存在,声明队列 // name:队列名称;durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;noWait:是否非阻塞, // true为是,不等待RMQ返回信息;args:参数,传nil即可;exclusive:是否设置排他 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil) if err != nil { fmt.Printf("MQ注册队列失败:%s \n", err) return } } // 队列绑定 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true,nil) if err != nil { fmt.Printf("MQ绑定队列失败:%s \n", err) return } // 用于检查交换机是否存在,已经存在不需要重复声明 err = r.channel.ExchangeDeclarePassive(r.exchangeName, r.exchangeType, true, false, false, true, nil) if err != nil{ // 注册交换机 // name:交换机名称,kind:交换机类型,durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除; // noWait:是否非阻塞, true为是,不等待RMQ返回信息;args:参数,传nil即可; internal:是否为内部 err = r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, true, nil) if err != nil { fmt.Printf("MQ注册交换机失败:%s \n", err) return } } // 发送任务消息 err = r.channel.Publish(r.exchangeName, r.routingKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(producer.MsgContent()), }) if err != nil { fmt.Printf("MQ任务发送失败:%s \n", err) return } } // 注册接收指定队列指定路由的数据接收者 func (r *RabbitMQ) RegisterReceiver(receiver Receiver) { r.receiverList = append(r.receiverList, receiver) } // 监听接收者接收任务 func (r *RabbitMQ) listenReceiver(receiver Receiver) { // 处理结束关闭链接 defer r.mqClose() // 验证链接是否正常 if r.channel == nil { r.mqConnect() } // 用于检查队列是否存在,已经存在不需要重复声明 _, err := r.channel.QueueDeclarePassive(r.queueName, true,false,false,true,nil) if err != nil{ // 队列不存在,声明队列 // name:队列名称;durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;noWait:是否非阻塞, // true为是,不等待RMQ返回信息;args:参数,传nil即可;exclusive:是否设置排他 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil) if err != nil { fmt.Printf("MQ注册队列失败:%s \n", err) return } } // 绑定任务 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true, nil) if err != nil { fmt.Printf("绑定队列失败:%s \n", err) return } // 获取消费通道,确保rabbitMQ一个一个发送消息 err = r.channel.Qos(1, 0, true) msgList, err := r.channel.Consume(r.queueName, "", false, false, false, false, nil) if err != nil { fmt.Printf("获取消费通道异常:%s \n", err) return } for msg := range msgList { // 处理数据 err := receiver.Consumer(msg.Body) if err!=nil { err = msg.Ack(true) if err != nil { fmt.Printf("确认消息未完成异常:%s \n", err) return } }else { // 确认消息,必须为false err = msg.Ack(false) if err != nil { fmt.Printf("确认消息完成异常:%s \n", err) return } return } } }
使用方法
package main import ( "fmt" "test/rabbitmq" ) type TestPro struct { msgContent string } // 实现发送者 func (t *TestPro) MsgContent() string { return t.msgContent } // 实现接收者 func (t *TestPro) Consumer(dataByte []byte) error { fmt.Println(string(dataByte)) return nil } func main() { msg := fmt.Sprintf("这是测试任务") t := &TestPro{ msg, } queueExchange := &rabbitmq.QueueExchange{ "test.rabbit", "rabbit.key", "test.rabbit.mq", "direct", } mq := rabbitmq.New(queueExchange) mq.RegisterProducer(t) mq.RegisterReceiver(t) mq.Start() }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 封装JDBC—非框架开发必备的封装类
- SpringBlade 2.3.2 发布,增加 OSS 封装及单元测试封装
- SpringBlade 2.3.2 发布,增加 OSS 封装及单元测试封装
- docker 封装 alinode
- 封装Apk签名工具
- axios封装笔记
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。