[译] 为 Go 语言开发者介绍 NATS

栏目: 后端 · 发布时间: 6年前

内容简介:编辑:立尧

2018上海KubeCon

[译] 为  <a href='https://www.codercto.com/topics/6127.html'>Go</a>  语言开发者介绍 NATS

Kubernetes的全球盛会KubeCon将于11月13日~11月15日在中国上海隆重举行,此论坛汇集了众多在开源和云原生领域有卓越贡献的应用人员和技术专家。大会吸引了超过5000名行业精英前来参会,大家齐聚一堂相互分享经验,聚焦创新,并讨论云原生计算的未来。KubeCon + CloudNativeCon中国论坛将召开100多个分组会议,包括技术会议、深度学习、案例研究等。现在通过容器时代专属报名通道报名可以享受超大折扣哦,详情请戳此处链接: 【容器时代粉丝专属福利】KubeCon + CloudNativeCon门票惊喜折扣

写在前面

本文将为准备构建分布式系统和微服务的 Go 语言开发者介绍 NATS 消息系统。当你构建分布式应用时,消息系统对应用间通信而言非常关键,尤其是事件驱动架构的异步通信方式。为了构建现代分布式系统而诞生了很多分布式队列和消息系统。像 Kafka, NATS, NSQ, RabbitMQ, ActiveMQ 这类的开源技术,   以及像 Google Cloud Pub/Sub, Amazon SQS, Amazon SNS Topic, Azure Service Bus PaaS 云平台,上面这些都为消息中间件和分布式系统提供了不同的能力和模式。前面提及的技术中的 NATS NSQ 均由 Go 语言编写,借助于像微服务这样的现代方式,大大简化了构建分布式系统的过程。因为构建分布式系统本身就很复杂,如果使用复杂的消息系统则会使你的应用更加复杂。现代的消息系统应该在各种环境规模如内部服务器、云平台、容器上都能够轻松运行和扩展。

NATS介绍

[译] 为 Go 语言开发者介绍 NATS

NATS是一个开源、轻量级、高性能的云原生消息系统。它是实现了具有更高级别扩展性的发布-订阅消息系统。即使NATS是基于发布-订阅分发模型,你同样可以通过订阅服务器队列组实现分布式系统。NATS创建于2010年,原是服务于Cloud Foundary平台的消息系统。NATS最开始是由 Ruby 语言实现的,但随后NATS团队使用Go语言进行了重写。

NATS在两个互相操作的模块中使用:核心NATS平台-NATS服务器(其可执行文件的名字为gnatsd)简称为NATS;NATS流(其可执行文件的名字为nats-streaming-server)是一个事件流服务,用于NATS添加事件流、发布保障及再现历史数据。NATS服务器是面向高性能现代分布式系统架构而设计的,并不能进行消息持久化。因此,如果你的系统是离线状态,将不会接收到消息。如果你想要实现持续消息传递和发布保障,可以使用NATS流代替核心NATS平台,NATS流建立在核心NATS平台基础之上。本文我将专注于基础的NATS 服务器的介绍,至于NATS流我将在后续的文章中介绍。

NATS服务器(gnatsd)是最高性能的分布式消息系统,可以达到每秒钟发送1.5千万-1.8千万条消息。NATS平台易于使用和扩展,NATS的简洁性和高性能性质使得它对于构建现代云原生分布式系统及微服务而言是个不错的选择。我过去使用过许多消息系统,因为NATS的性能和简洁性,所以我强烈推荐它。

[译] 为 Go 语言开发者介绍 NATS ( 来源: bravenewgeek.com/dissecting-message-queues )

消息模式

当NATS作为发布-订阅引擎时,它提供了三种消息传递模式:

  • 发布-订阅

  • 队列

  • 请求-响应

消息架构组件

NATS消息基础结构的主要构成有:

  • 消息:消息是数据交换单元,用于应用间交换数据的有效载荷。

  • 主体:主体明确消息的目的。

  • 生产者:生产者向NATS服务器发送消息。

  • 消费者:消费者从NATS服务器中接收消息。

  • 消息服务器:NATS服务器从生产者到消费者间分配消息。

安装服务器和客户端

以下是下载 NATS 服务器的各种发行版:

http://nats.io/download/nats-io/gnatsd/

同样可以使用Go语言 工具 安装NATS服务器:

go get github.com/nats-io/gnatsd

通过运行可执行文件gnatsd来启动NATS服务器:

gnatsd

你也可以使用下面的Go语言工具安装NATS客户端:

go get github.com/nats-io/go-nats

在Go语言中使用NATS

让我们通过Go语言编写的一个分布式应用的例子探索下NATS。在这个例子中,我们使用请求-应答和发布-订阅消息传递模式。在发布-订阅模式中,将使用订阅者队列组进行排队。这个例子NATS将使用Protocol Buffers来发送和接收消息。

清单1:缓冲协议中的消息类型

message ServiceDiscovery {    
    string order_service_uri = 1;
}

message EventStore {  
    string aggregate_id = 1;  
    string aggregate_type = 2;  
    string event_id = 3;  
    string event_type = 4;  
    string event_data = 5;
}

消息类型ServiceDiscovery使用请求-应答通信,类型EventStore使用的是发布-订阅模式。

NATS的请求应答示例

请求-应答消息模式的工作方式类似于正常的请求-响应通信,其中发布请求操作发布带有回复主题的消息,同时等等对该回复主题的响应。这里我们使用这个格式用于找出服务端点的简单发现。值得注意的是我此处使用NATS的请求-应答进行消息传递仅仅是为了举个例子。NATS的请求-应答模式与gRPC的简单RPC非常类似,因此我一贯的使用gRPC的API接口代替NATS请求-应答消息传递。但对于以事件驱动的Go语言微服务结构,我通常使用NATS用于pub/sub通信。

这是一段代码块,在项目“Discovery.OrderService”中发送一个请求以得到服务端点。

清单2:作用于NATS请求-应答消息传递的请求:

func main() {	
    // Create NATS server connection
    natsConnection, _ := nats.Connect(nats.DefaultURL)
    log.Println("Connected to " + nats.DefaultURL)

    msg, err := natsConnection.Request("Discovery.OrderService", nil, 1000*time.Millisecond)	
    if err == nil && msg != nil {
        orderServiceDiscovery := pb.ServiceDiscovery{}
	err := proto.Unmarshal(msg.Data, &orderServiceDiscovery)		if err != nil {
	    log.Fatalf("Error on unmarshal: %v", err)
	}
	address := orderServiceDiscovery.OrderServiceUri
	log.Println("OrderService endpoint found at:", address)		
        //Set up a connection to the gRPC server.
	conn, err := grpc.Dial(address, grpc.WithInsecure())
    }
}

Go语言NATS客户端的库被导入的项目中:

import "github.com/nats-io/go-nats"

函数nats.Connect尝试连接NATS系统,默认的NATS服务器运在“nats://localhost:4222”。这里我们使用默认的URL连接服务器。

natsConnection, _ := nats.Connect(nats.DefaultURL)

NATS通常在项目中发送一个名为“Discovery.OrderService”的请求以得到应答。

msg, err := natsConnection.Request("Discovery.OrderService", nil, 1000*time.Millisecond)

当对某个项目发送请求时,你可以传递请求数据和超时。这里我们不提供任何数据,向这个项目发送请求只是为了接收应答。我们采用Protocol Buffers进行发送和接收消息,响应数据被编码在Go语言结构值中。

orderServiceDiscovery := pb.ServiceDiscovery{}
err := proto.Unmarshal(msg.Data, &orderServiceDiscovery
)if err != nil {
       log.Fatalf("Error on unmarshal: %v", err)
}
address := orderServiceDiscovery.OrderServiceUri
log.Println("OrderService endpoint found at:", address)

这是另一个应用的代码块,订阅项目 “Discovery.OrderService” 提供了对应请求的应答。

清单3:NATS的请求-应答消息传递的响应

var orderServiceUri string
orderServiceUri = viper.GetString("discovery.orderservice")

func main() {	
    // Create server connection
    natsConnection, _ := nats.Connect(nats.DefaultURL)
    log.Println("Connected to " + nats.DefaultURL)

    natsConnection.Subscribe("Discovery.OrderService", func(m *nats.Msg) {
	orderServiceDiscovery := pb.ServiceDiscovery{OrderServiceUri: orderServiceUri}
	data, err := proto.Marshal(&orderServiceDiscovery)		
        if err == nil {
	    natsConnection.Publish(m.Reply, data)
	}	
     })	
     // Keep the connection alive
    runtime.Goexit()
}

项目 “Discovery.OrderService”参与发送响应,这里通过编码到Protocol Buers以发送响应数据。

NATS发布订阅示例

我极力推荐NATS用于发布订阅引擎的pub/sub消息模型,构建企业级消息队列及Go语言搭建的分布式系统。NATS发布订阅是一对多通信系统,项目中一个发布者发送一个消息,项目中所有活跃的订阅者接收这个消息。此通信模型是典型的异步方式,发布的消息被分发到订阅消息的处理者。如果没有处理者,订阅以异步模型工作,客户端有可能被阻塞直到消息被处理。大多数真实情况下,你可能不需要通信的异步方式用于pub/sub通信。

[译] 为 Go 语言开发者介绍 NATS

当创建订阅者时,你可以同时为它注册一个队列名称。所有具有相同的队列名称的订阅者构成一个队列组。随着消息在被注册的项目中发布,队列组中被随机选择的一个订阅者用于接收消息。虽然队列组中有多个订阅者,但每个消息只能被一个订阅者接收,且只能接收一次。当创建订阅者时,你可以选择是否注册队列名。在队列组中的订阅者们,其中一个订阅者接收消息,而那些没有队列组的订阅者们,所有订阅者共同接受这条消息。有意思的是NATS本身提供队列甚至它本身也是基于消息发布-订阅模式。

在本文的消息发布-订阅模式,我们创建一个无队列的订阅者及一个名“Order.OrdersCreatedQueue”含有多个订阅者的订阅组。因此队列组中的一个订阅者及其他的订阅者(无队列组的)可以接收消息。发布者的客户端是一个gRPC服务器,当项目“Order.OrderCreated”中创建一个命令时,发布者将发布一个消息,详见下面的代码块:

清单4:用于NATS发布订阅消息传递的发布者客户端

const (
	aggregate = "Order"
	event     = "OrderCreated")
        // publishOrderCreated publish an event via NATS server
        
func publishOrderCreated(order *pb.Order) {	
        // Connect to NATS server
	natsConnection, _ := nats.Connect(nats.DefaultURL)
	log.Println("Connected to " + nats.DefaultURL)	
        defer natsConnection.Close()
	eventData, _ := json.Marshal(order)	
        event := pb.EventStore{
	    AggregateId:   order.OrderId,
	    AggregateType: aggregate,
	    EventId:       uuid.NewV4().String(),
	    EventType:     event,
	    EventData:     string(eventData),
	}
	subject := "Order.OrderCreated"
	data, _ := proto.Marshal(&event)	
        // Publish message on subject
	natsConnection.Publish(subject, data)
	log.Println("Published message on subject " + subject)
}

消息模式

NATS客户端的Publish功能,向给定的项目中发布一个消息。这里消息被整理到Protocol Buffers中,当信息从发布者客户端发布到项目中时,我们创建一个订阅者接收消息。

这段代码块来自于订阅者客户端用于订阅消息。

清单5:用于NATS发布订阅消息传递的订阅者客户端

const subject = "Order.>"

func main() {	
    // Create server connection
    natsConnection, _ := nats.Connect(nats.DefaultURL)
    log.Println("Connected to " + nats.DefaultURL)	
    // Subscribe to subject
    natsConnection.Subscribe(subject, func(msg *nats.Msg) {
	eventStore := pb.EventStore{}
	err := proto.Unmarshal(msg.Data, &eventStore)		
        if err == nil {			
            // Handle the message
	    log.Printf("Received message in EventStore service: %+v\n", eventStore)
	    store := store.EventStore{}
	    store.CreateEvent(&eventStore)
	    log.Println("Inserted event into Event Store")
	}
    })	
    // Keep the connection alive
    runtime.Goexit()
}

使用通配符项目“Order.>”订阅消息,NATS支持在项目订阅中使用通配符,支持星号字符(*)和大于号(>),也被认为所有的通配符被用于通配项目订阅。通配符Order.> will 被匹配为Order.Created,Order.Shipped, Order.Delivered, Order.Delivered.Returned等等。通配符Order.* 将被匹配为 Order.Created, Order.Shipped, Order.Delivered等等,而不是 Order.Delivered.Returned。

NATS客户端的Subscribe功能,当消息被给定的项目发布时,订阅消息处理异步接收消息。 由于消息由Protocol Buffers编码所发布,接收消息则是通过proto.Unmarshal解码到Go语言结构体值中。

让我们添加订阅者到一个队列组中,这是订阅者客户端从项目中订阅消息的代码块:

清单6:订阅者客户端队列组的NATS发布-订阅消息传递

const (
    queue   = "Order.OrdersCreatedQueue"
    subject = "Order.OrderCreated"
)
func main() {	
    // Create server connection
    natsConnection, _ := nats.Connect(nats.DefaultURL)
    log.Println("Connected to " + nats.DefaultURL)	
    // Subscribe to subject
    natsConnection.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
    	eventStore := pb.EventStore{}
	err := proto.Unmarshal(msg.Data, &eventStore)		
        if err == nil {			
        // Handle the message
	    log.Printf("Subscribed message in Worker 1: %+v\n", eventStore)
	}
    })	
    // Keep the connection alive
    runtime.Goexit()
}

主题“Order.OrderCreated”的消息使用名为“Order.OrdersCreatedQueue"的队列通过QueueSubscribe功能被订阅。当我们使用同一个队列名创建多个订阅者,它们被创建在一个队列组中,随机选择一个订阅者用来接收消息。如果只是想通过NATS仅仅用于排队,可以只通过一个队列组创建订阅者。

本文我仅展示了核心NATS平台的基础能力,后续我将写另一篇文章讲述NATS流服务器。

本文中的源代码可见:https://github.com/shijuvar/gokit/tree/master/examples/grpc-nats

你也可以在twitte r ( @shijucv)  上关注我

原文链接

原文作者:Shiju Varghese

原文链接: https://medium.com/@shijuvar/introducing-nats-to-go-developers-3cfcb98c21d0

容器时代志愿者招募

[译] 为 Go 语言开发者介绍 NATS

如果你对技术懵懵懂懂,想要入门却不知从何下手;

如果你求知若渴,想要学习更多技术、思想;

如果你对于技术有着一种狂热的喜爱并且热爱开源,以其为信仰。

快来加入我们吧

加入我们可以接触前沿技术

加入我们可以兼顾学习与分享

加入我们可以与大牛一对一交流

志愿者计划 JOIN US

容器时代志愿编辑

志愿内容

  1. 公众号运营 —— 比如晨读文章推荐、周推荐等; ( 特别欢迎在校大学生)

  2. 翻译 —— 容器生态圈相关教程、文章、资讯等的翻译;

[译] 为 Go 语言开发者介绍 NATS

点击 阅读原文 即可加入,加入之后还有 神秘福利 等着你呦~

[译] 为 Go 语言开发者介绍 NATS

编辑:立尧


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Node.js硬实战:115个核心技巧

Node.js硬实战:115个核心技巧

【美】Alex R. Young、【美】Marc Harter / 承竹、慕陶、邱娟、达峰 / 电子工业出版社 / 2017-1 / 109.9

《Node.js 硬实战:115 个核心技巧》是一本面向实战的Node.js 开发进阶指南。作为资深专家,《Node.js 硬实战:115 个核心技巧》作者独辟蹊径,将着眼点放在Node.js 的核心模块和网络应用,通过精心组织的丰富实例,向读者充分展示了Node.js 强大的并发处理能力,读者从中可真正掌握Node 的核心基础与高级技巧。《Node.js 硬实战:115 个核心技巧》总共有三部分......一起来看看 《Node.js硬实战:115个核心技巧》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具