内容简介:微服务之间是相互独立的,不像单个工程一样各个模块之间可以直接通过方法调用实现通信,相互独立的服务直接一般的通信方式是使用在这篇文章安装
微服务之间通过RabbitMQ通信
微服务之间是相互独立的,不像单个工程一样各个模块之间可以直接通过方法调用实现通信,相互独立的服务直接一般的通信方式是使用 HTTP协议
、 rpc协议
或者使用消息中间件如 RabbitMQ``Kafka
等
image
在这篇文章 使用Golang和 MongoDB 构建微服务 已经实现了一个微服务的应用,在文章中已经实现了各个服务直接的通信,是使用的 HTTP
的形式 ,那各个服务之间如何通过 RabbitMQ
进行消息通信呢,我们现在要实现一个功能,就是一个用户预订电影票的接口,需要服务 User Service(port 8000) 和 服务 Booking Service(port 8003) 之间通信,用户预订之后,把预订信息写入到 booking 的数据库中
安装 RabbitMQ
安装 RabbitMQ
之前需要先安装 Erlang 的环境 ,然后下载安装 RabbitMQ ,请选择对应的版本,安装完成之后,RabbitMQ在Windows上是作为一个服务在后台运行,关于 RabbitMQ
的接口如何使用,请参考官网的 教程 ,有各个主流语言的实现我们使用的是 Go
版本,请下载对应的实现接口 go get github.com/streadway/amqp
对 RabbitMQ
的接口做一下简单的封装
- 定义一个接口
messaging/message.go
type IMessageClient interface { ConnectToBroker(connectionStr string) error PublishToQueue(data []byte, queueName string) error SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error Close() } type MessageClient struct { conn *amqp.Connection }
- 连接接口
func (m *MessageClient) ConnectToBroker(connectionStr string) error { if connectionStr == "" { panic("the connection str mustnt be null") } var err error m.conn, err = amqp.Dial(connectionStr) return err }
- 发布消息接口
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error { if m.conn == nil { panic("before publish you must connect the RabbitMQ first") } ch, err := m.conn.Channel() defer ch.Close() failOnError(err, "Failed to open a channel") q, err := ch.QueueDeclare( queueName, false, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") err = ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "application/json", Body: body, }, ) failOnError(err, "Failed to publish a message") return nil }
- 订阅消息接口
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error { ch, err := m.conn.Channel() //defer ch.Close() failOnError(err, "Failed to open a channel") q, err := ch.QueueDeclare( queueName, false, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, "", true, false, false, false, nil, ) failOnError(err, "Failed to register a consumer") go consumeLoop(msgs, handlerFunc) return nil }
实现通信
在 User Service 中定义一个新的 POST
接口 /user/{name}/booking
,实现用户的预订功能,预订之后,通过 RabbitMQ
发布一个消息给
Booking Service, Booking Service 接收到消息之后,做相应的处理(写入数据库)
User Service
- 初始化
MessageClient
users/controllers/user.go
var client messaging.IMessageClient func init() { client = &messaging.MessageClient{} err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/") if err != nil { fmt.Println("connect to rabbitmq error", err) } }
- 添加新的路由和实现
routes.go
register("POST", "/user/{name}/booking", controllers.NewBooking, nil)
users/controllers/user.go
func NewBooking(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) user_name := params["name"] defer r.Body.Close() var bookings models.Booking body, _ := ioutil.ReadAll(r.Body) err := json.Unmarshal(body, &bookings) if err != nil { fmt.Println("the format body error ", err) } fmt.Println("user name:", user_name, bookings) go notifyMsg(body) }
- 用一个协程实现消息的发布
func notifyMsg(body []byte) { err := client.PublishToQueue(body, "new_booking") if err != nil { fmt.Println("Failed to publis message", err) } }
Booking Service
- 初始化MessageClient
var client messaging.IMessageClient func initMessage() { client = &messaging.MessageClient{} err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/") if err != nil { fmt.Println("Failed to connect to RabbitMQ", err) } err = client.SubscribeToQueue("new_booking", getBooking) if err != nil { fmt.Println("Failed to comsuer the msg", err) } }
在 web服务之前启动
func main() { initMessage() r := routes.NewRouter() http.ListenAndServe(":8003", r) }
- 接收后的消息处理
func getBooking(delivery amqp.Delivery) { var booking models.Booking json.Unmarshal(delivery.Body, &booking) booking.Id = bson.NewObjectId().Hex() dao.Insert("Booking", "BookModel", booking) fmt.Println("the booking msg", booking) }
验证,需要启动 User Service 和 Booking Service
使用 Postman
发送对应的数据
post 127.0.0.1:8000/user/kevin_woo/booking { "name":"kevin_woo", "books":[ { "date":"20180727", "movies":["5b4c45d49d5e3e33c4a5b97a"] }, { "date":"20180810", "movies":["5b4c45ea9d5e3e33c4a5b97b"] } ] }
可以看到数据库已经有了一条新的预订信息
说明,我这里POST的数据就是booking数据库中的结构,实际情况需要对数据进行封装处理,在POST数据时,没有对数据进行验证,
在实际开发过程中需要对各个数据做相应的验证,这里主要是看一下 RabbitMQ的消息传递处理的过程
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 通过$listeners、inheritAttrs、$attr实现组件之间的数据通信
- 经验分享:Plaid如何通过机器学习实现商家和银行之间的交易对账结算? - Kevin Hu
- 通过 Sqoop1.4.7 将 Mysql5.7、Hive2.3.4、Hbase1.4.9 之间的数据导入导出
- 黑白之间,烦请适当宽松
- 算法与运营之间的战争
- 组件之间的通讯LiveDataBus
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。