使用 Kafka 和 MongoDB 进行 Go 异步处理

栏目: 数据库 · 发布时间: 6年前

内容简介:图片描述(最多50字)$ cd /<download path>/kafka_2.11-1.1.0
在这个示例中,我将数据的保存和  MongoDB  分离,并创建另一个微服务去处理它。我还添加了 Kafka 为消息层服务,这样微服务就可以异步处理它自己关心的东西了。

下面是这个使用了两个微服务的简单的异步处理示例的上层架构图。

使用 Kafka 和 MongoDB 进行  <a href='https://www.codercto.com/topics/6127.html'>Go</a>  异步处理

图片描述(最多50字)

微服务 1 —— 是一个 REST 式微服务,它从一个 /POST http 调用中接收数据。接收到请求之后,它从 http 请求中检索数据,并将它保存到 Kafka。保存之后,它通过 /POST 发送相同的数据去响应调用者。

微服务 2 —— 是一个订阅了 Kafka 中的一个主题的微服务,微服务 1 的数据保存在该主题。一旦消息被微服务消费之后,它接着保存数据到 MongoDB 中。

我们开始吧!

首先,启动 Kafka,在你运行 Kafka 服务器之前,你需要运行 Zookeeper。下面是示例:

$ cd /<download path>/kafka_2.11-1.1.0

$ bin/zookeeper-server-start.sh config/zookeeper.properties

接着运行 Kafka —— 我使用 9092 端口连接到 Kafka。如果你需要改变端口,只需要在 config/server.properties 中配置即可。如果你像我一样是个新手,我建议你现在还是使用默认端口。

$ bin/kafka-server-start.sh config/server.properties

Kafka 跑起来之后,我们需要 MongoDB。它很简单,只需要使用这个 docker-compose.yml 即可。

version: '3'

services:

mongodb:

image: mongo

ports:

  • "27017:27017"
    volumes:
  • "mongodata:/data/db"
    networks:
  • network1
    volumes:
    mongodata:
    networks:
    network1:
    使用 Docker Compose 去运行 MongoDB docker 容器。

docker-compose up

这里是微服务 1 的相关代码。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

func jobsPostHandler(w http.ResponseWriter, r *http.Request) {

//Retrieve body from http request

b, err := ioutil.ReadAll(r.Body)

defer r.Body.Close()

if err != nil {

panic(err)

}

//Save data into Job struct

var _job Job

err = json.Unmarshal(b, &_job)

if err != nil {

http.Error(w, err.Error(), 500)

return

}

saveJobToKafka(_job)

//Convert job struct into json

jsonString, err := json.Marshal(

job)

if err != nil {

http.Error(w, err.Error(), 500)

return

}

//Set content-type http header

w.Header().Set("content-type", "application/json")

//Send back data as response

w.Write(jsonString)

}

func saveJobToKafka(job Job) {

fmt.Println("save to kafka")

jsonString, err := json.Marshal(job)

jobString := string(jsonString)

fmt.Print(jobString)

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})

if err != nil {

panic(err)

}

// Produce messages to topic (asynchronously)

topic := "jobs-topic1"

, word := range []string{string(jobString)} {

p.Produce(&kafka.Message{

TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},

Value: []byte(word),

}, nil)

}

}

这里是微服务 2 的代码。在这个代码中最重要的东西是从 Kafka 中消费数据,保存部分我已经在前面的博客文章中讨论过了。这里代码的重点部分是从 Kafka 中消费数据:

kafka-to-mongo/kafka-mongo-sample.go

func main() {

//Create MongoDB session

session := initialiseMongo()

mongoStore.session = session

receiveFromKafka()

}

func receiveFromKafka() {

fmt.Println("Start receiving from Kafka")

c, err := kafka.NewConsumer(&kafka.ConfigMap{

"bootstrap.servers": "localhost:9092",

"group.id": "group-id-1",

"auto.offset.reset": "earliest",

})

if err != nil {

panic(err)

}

c.SubscribeTopics([]string{"jobs-topic1"}, nil)

for {

msg, err := c.ReadMessage(-1)

if err == nil {

fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))

job := string(msg.Value)

saveJobToMongo(job)

} else {

fmt.Printf("Consumer error: %v (%v)\n", err, msg)

break

}

}

c.Close()

}

func saveJobToMongo(jobString string) {

fmt.Println("Save to MongoDB")

col := mongoStore.session.DB(database).C(collection)

//Save data into Job struct

var _job Job

b := []byte(jobString)

err := json.Unmarshal(b, &_job)

if err != nil {

panic(err)

}

//Insert job into MongoDB

errMongo := col.Insert(_job)

if errMongo != nil {

panic(errMongo)

}

fmt.Printf("Saved to MongoDB : %s", jobString)

}

我们来演示一下,运行微服务 1。确保 Kafka 已经运行了。

$ go run rest-kafka-sample.go

我使用 Postman 向微服务 1 发送数据。

使用 Kafka 和 MongoDB 进行 Go 异步处理

图片描述(最多50字)

这里是日志,你可以在微服务 1 中看到。当你看到这些的时候,说明已经接收到了来自 Postman 发送的数据,并且已经保存到了 Kafka。

使用 Kafka 和 MongoDB 进行 Go 异步处理

图片描述(最多50字)

因为我们尚未运行微服务 2,数据被微服务 1 只保存在了 Kafka。我们来消费它并通过运行的微服务 2 来将它保存到 MongoDB。

$ go run kafka-mongo-sample.go

现在,你将在微服务 2 上看到消费的数据,并将它保存到了 MongoDB。

使用 Kafka 和 MongoDB 进行 Go 异步处理

图片描述(最多50字)

检查一下数据是否保存到了 MongoDB。如果有数据,我们成功了!

使用 Kafka 和 MongoDB 进行 Go 异步处理

图片描述(最多50字)

欢迎工作一到五年的 Java 工程师朋友们加入Java架构开发: 855835163

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

若为自由故

若为自由故

[美] Sam Williams / 邓楠、李凡希 / 人民邮电出版社 / 2015-4 / 49

理查德·马修·斯托曼(Richard Matthew Stallman,简称RMS)是自由软件之父,他是自由软件运动的精神领袖、GNU计划以及自由软件基金会的创立者。作为一个著名的黑客,他的主要成就包括Emacs及后来的GNU Emacs、GNU C 编译器及GDB 调试器。他编写的GNU通用公共许可证(GNU GPL)是世上最广为采用的自由软件许可证,为copyleft观念开拓出一条崭新的道路。......一起来看看 《若为自由故》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换