rabbitmq消费者插入mysql数据并实现断线自动重连

栏目: Redis · 发布时间: 7年前

连接rabbitmq的代码 消费者断线重连: package main import ( "log" "github.com/streadway/amqp" "database/sql" _"github.com/go-sql-driver/mysql" //"time" ) func insert(db *sql.DB, a []byte) { stmt, err := db.Prepare("INSERT INTO rabbit (name) VALUES(?)") stmt.Exec(a) defer stmt.Close() if err != nil { log.Println(err) return } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { //打开mysql db, err := sql.Open("mysql", "root:0022....hh@tcp(127.0.0.1:3306)/rabbitmq?charset=utf8") if err != nil { log.Fatalf("Open database error: %s\n", err) } defer db.Close() err = db.Ping() if err != nil { log.Fatal(err) } //forever := make(chan bool) //连接rabbitmq for { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { continue } failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "hello", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 创建消费者 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") // 协程获取消息队列处理结果 go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) insert(db, d.Body) } }() if conn != nil { //time.Sleep(50*time.Second) continue } log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //<-forever } }


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

别具光芒

别具光芒

李烨 / 2008-10 / 59.00元

《别具光芒CSS属性、浏览器兼容与网页布局》结合大量范例与实际应用的实例,详细介绍了W3C发布的层叠样式表CSS2.1规范,浏览器对于CSS2.1规范解释的异同,以及使用XHTML和层叠样式表对网页进行结构化与美化的实际制作方法。《别具光芒CSS属性、浏览器兼容与网页布局》内容由浅入深,不仅介绍了Web标准和层叠样式表的各个属性,还结合实例对属性的实际应用进行讲解,同时配合在不同浏览器内的效果展示......一起来看看 《别具光芒》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换