连接rabbitmq的代码 消费者断线重连: package main import ( "log" "github.com/streadway/amqp" "database/sql" _"github.com/go-sql-driver/mysql" //"time" ) func insert(db *sql.DB, a []byte) { stmt, err := db.Prepare("INSERT INTO rabbit (name) VALUES(?)") stmt.Exec(a) defer stmt.Close() if err != nil { log.Println(err) return } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { //打开mysql db, err := sql.Open("mysql", "root:0022....hh@tcp(127.0.0.1:3306)/rabbitmq?charset=utf8") if err != nil { log.Fatalf("Open database error: %s\n", err) } defer db.Close() err = db.Ping() if err != nil { log.Fatal(err) } //forever := make(chan bool) //连接rabbitmq for { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { continue } failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "hello", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 创建消费者 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") // 协程获取消息队列处理结果 go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) insert(db, d.Body) } }() if conn != nil { //time.Sleep(50*time.Second) continue } log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //<-forever } }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 不使用第三方框架编写的多线程断线续传功能
- 不使用第三方框架编写的多线程断线续传功能 原 荐
- HashMap为何从头插入改为尾插入
- C++拾趣——STL容器的插入、删除、遍历和查找操作性能对比(Windows VirtualStudio)——插入
- HashMap之元素插入
- 插入排序
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。