内容简介:微服务之间是相互独立的,不像单个工程一样各个模块之间可以直接通过方法调用实现通信,相互独立的服务直接一般的通信方式是使用在这篇文章安装
微服务之间通过RabbitMQ通信
微服务之间是相互独立的,不像单个工程一样各个模块之间可以直接通过方法调用实现通信,相互独立的服务直接一般的通信方式是使用 HTTP协议 、 rpc协议 或者使用消息中间件如 RabbitMQ``Kafka 等
image
在这篇文章 使用Golang和 MongoDB 构建微服务 已经实现了一个微服务的应用,在文章中已经实现了各个服务直接的通信,是使用的 HTTP 的形式 ,那各个服务之间如何通过 RabbitMQ 进行消息通信呢,我们现在要实现一个功能,就是一个用户预订电影票的接口,需要服务 User Service(port 8000) 和 服务 Booking Service(port 8003) 之间通信,用户预订之后,把预订信息写入到 booking 的数据库中
安装 RabbitMQ
安装 RabbitMQ 之前需要先安装 Erlang 的环境 ,然后下载安装 RabbitMQ ,请选择对应的版本,安装完成之后,RabbitMQ在Windows上是作为一个服务在后台运行,关于 RabbitMQ 的接口如何使用,请参考官网的 教程 ,有各个主流语言的实现我们使用的是 Go 版本,请下载对应的实现接口 go get github.com/streadway/amqp
对 RabbitMQ 的接口做一下简单的封装
- 定义一个接口
messaging/message.go
type IMessageClient interface {
ConnectToBroker(connectionStr string) error
PublishToQueue(data []byte, queueName string) error
SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
Close()
}
type MessageClient struct {
conn *amqp.Connection
}
- 连接接口
func (m *MessageClient) ConnectToBroker(connectionStr string) error {
if connectionStr == "" {
panic("the connection str mustnt be null")
}
var err error
m.conn, err = amqp.Dial(connectionStr)
return err
}
- 发布消息接口
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
if m.conn == nil {
panic("before publish you must connect the RabbitMQ first")
}
ch, err := m.conn.Channel()
defer ch.Close()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
failOnError(err, "Failed to publish a message")
return nil
}
- 订阅消息接口
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
ch, err := m.conn.Channel()
//defer ch.Close()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
go consumeLoop(msgs, handlerFunc)
return nil
}
实现通信
在 User Service 中定义一个新的 POST 接口 /user/{name}/booking ,实现用户的预订功能,预订之后,通过 RabbitMQ 发布一个消息给
Booking Service, Booking Service 接收到消息之后,做相应的处理(写入数据库)
User Service
- 初始化
MessageClient
users/controllers/user.go
var client messaging.IMessageClient
func init() {
client = &messaging.MessageClient{}
err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("connect to rabbitmq error", err)
}
}
- 添加新的路由和实现
routes.go
register("POST", "/user/{name}/booking", controllers.NewBooking, nil)
users/controllers/user.go
func NewBooking(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
user_name := params["name"]
defer r.Body.Close()
var bookings models.Booking
body, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(body, &bookings)
if err != nil {
fmt.Println("the format body error ", err)
}
fmt.Println("user name:", user_name, bookings)
go notifyMsg(body)
}
- 用一个协程实现消息的发布
func notifyMsg(body []byte) {
err := client.PublishToQueue(body, "new_booking")
if err != nil {
fmt.Println("Failed to publis message", err)
}
}
Booking Service
- 初始化MessageClient
var client messaging.IMessageClient
func initMessage() {
client = &messaging.MessageClient{}
err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ", err)
}
err = client.SubscribeToQueue("new_booking", getBooking)
if err != nil {
fmt.Println("Failed to comsuer the msg", err)
}
}
在 web服务之前启动
func main() {
initMessage()
r := routes.NewRouter()
http.ListenAndServe(":8003", r)
}
- 接收后的消息处理
func getBooking(delivery amqp.Delivery) {
var booking models.Booking
json.Unmarshal(delivery.Body, &booking)
booking.Id = bson.NewObjectId().Hex()
dao.Insert("Booking", "BookModel", booking)
fmt.Println("the booking msg", booking)
}
验证,需要启动 User Service 和 Booking Service
使用 Postman 发送对应的数据
post 127.0.0.1:8000/user/kevin_woo/booking
{
"name":"kevin_woo",
"books":[
{
"date":"20180727",
"movies":["5b4c45d49d5e3e33c4a5b97a"]
},
{
"date":"20180810",
"movies":["5b4c45ea9d5e3e33c4a5b97b"]
}
]
}
可以看到数据库已经有了一条新的预订信息
说明,我这里POST的数据就是booking数据库中的结构,实际情况需要对数据进行封装处理,在POST数据时,没有对数据进行验证,
在实际开发过程中需要对各个数据做相应的验证,这里主要是看一下 RabbitMQ的消息传递处理的过程
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 通过$listeners、inheritAttrs、$attr实现组件之间的数据通信
- 经验分享:Plaid如何通过机器学习实现商家和银行之间的交易对账结算? - Kevin Hu
- 通过 Sqoop1.4.7 将 Mysql5.7、Hive2.3.4、Hbase1.4.9 之间的数据导入导出
- 黑白之间,烦请适当宽松
- 算法与运营之间的战争
- 组件之间的通讯LiveDataBus
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Code Reading
Diomidis Spinellis / Addison-Wesley Professional / 2003-06-06 / USD 64.99
This book is a unique and essential reference that focuses upon the reading and comprehension of existing software code. While code reading is an important task faced by the vast majority of students,......一起来看看 《Code Reading》 这本书的介绍吧!