内容简介:golang中比较好用的kafka client有其中 sarama的使用者应该是最多的, 然后还有一个sarama的cluster版本本文简单描述下sarama的一些简单使用
golang中比较好用的kafka client有
其中 sarama的使用者应该是最多的, 然后还有一个sarama的cluster版本 sarama-cluster
本文简单描述下sarama的一些简单使用
生产者接口
func producer_test() {
fmt.Printf("producer_test\n")
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Version = sarama.V0_11_0_2
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Printf("producer_test create producer error :%s\n", err.Error())
return
}
defer producer.AsyncClose()
// send message
msg := &sarama.ProducerMessage{
Topic: "kafka_go_test",
Key: sarama.StringEncoder("go_test"),
}
value := "this is message"
for {
fmt.Scanln(&value)
msg.Value = sarama.ByteEncoder(value)
fmt.Printf("input [%s]\n", value)
// send to chain
producer.Input() <- msg
select {
case suc := <-producer.Successes():
fmt.Printf("offset: %d, timestamp: %s", suc.Offset, suc.Timestamp.String())
case fail := <-producer.Errors():
fmt.Printf("err: %s\n", fail.Err.Error())
}
}
}
消费者接口
func consumer_test() {
fmt.Printf("consumer_test")
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V0_11_0_2
// consumer
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Printf("consumer_test create consumer error %s\n", err.Error())
return
}
defer consumer.Close()
partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
if err != nil {
fmt.Printf("try create partition_consumer error %s\n", err.Error())
return
}
defer partition_consumer.Close()
for {
select {
case msg := <-partition_consumer.Messages():
fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
case err := <-partition_consumer.Errors():
fmt.Printf("err :%s\n", err.Error())
}
}
}
元数据接口
func metadata_test() {
fmt.Printf("metadata test\n")
config := sarama.NewConfig()
config.Version = sarama.V0_11_0_2
client, err := sarama.NewClient([]string{"localhost:9092"}, config)
if err != nil {
fmt.Printf("metadata_test try create client err :%s\n", err.Error())
return
}
defer client.Close()
// get topic set
topics, err := client.Topics()
if err != nil {
fmt.Printf("try get topics err %s\n", err.Error())
return
}
fmt.Printf("topics(%d):\n", len(topics))
for _, topic := range topics {
fmt.Println(topic)
}
// get broker set
brokers := client.Brokers()
fmt.Printf("broker set(%d):\n", len(brokers))
for _, broker := range brokers {
fmt.Printf("%s\n", broker.Addr())
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- RecyclerView使用指南(一)—— 基本使用
- 如何使用Meteorjs使用URL参数
- 使用 defer 还是不使用 defer?
- 使用 Typescript 加强 Vuex 使用体验
- [译] 何时使用 Rust?何时使用 Go?
- UDP协议的正确使用场合(谨慎使用)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
网站入侵与脚本技术快速防杀
2011-1 / 56.00元
《反黑风暴·网站入侵与脚本技术快速防杀》由浅入深、图文并茂地再现了网站入侵与脚本技术快速防杀的全过程,内容涵盖:Windows系统编程基础、黑客程序的配置和数据包嗅探、Web脚本攻击与防御、基于Web的DDoS攻击与防御、流行的黑客编程技术、XSS跨站脚本攻击技术与防范、Cookie欺骗与防御技术剖析、数据库入侵与防范技术、SQL注入攻击与防范、网络上传漏洞的攻击与防范、系统后门编程技术、编程攻击......一起来看看 《网站入侵与脚本技术快速防杀》 这本书的介绍吧!