golang中使用kafka

栏目: Go · 发布时间: 6年前

内容简介:golang中比较好用的kafka client有其中 sarama的使用者应该是最多的, 然后还有一个sarama的cluster版本本文简单描述下sarama的一些简单使用

golang中比较好用的kafka client有

其中 sarama的使用者应该是最多的, 然后还有一个sarama的cluster版本 sarama-cluster

本文简单描述下sarama的一些简单使用

生产者接口

func producer_test() {
    fmt.Printf("producer_test\n")
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.V0_11_0_2

    producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        fmt.Printf("producer_test create producer error :%s\n", err.Error())
        return
    }

    defer producer.AsyncClose()

    // send message
    msg := &sarama.ProducerMessage{
        Topic: "kafka_go_test",
        Key:   sarama.StringEncoder("go_test"),
    }

    value := "this is message"
    for {
        fmt.Scanln(&value)
        msg.Value = sarama.ByteEncoder(value)
        fmt.Printf("input [%s]\n", value)

        // send to chain
        producer.Input() <- msg

        select {
        case suc := <-producer.Successes():
            fmt.Printf("offset: %d,  timestamp: %s", suc.Offset, suc.Timestamp.String())
        case fail := <-producer.Errors():
            fmt.Printf("err: %s\n", fail.Err.Error())
        }
    }
}

消费者接口

func consumer_test() {
    fmt.Printf("consumer_test")

    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Version = sarama.V0_11_0_2

    // consumer
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        fmt.Printf("consumer_test create consumer error %s\n", err.Error())
        return
    }

    defer consumer.Close()

    partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
    if err != nil {
        fmt.Printf("try create partition_consumer error %s\n", err.Error())
        return
    }
    defer partition_consumer.Close()

    for {
        select {
        case msg := <-partition_consumer.Messages():
            fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
                msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
        case err := <-partition_consumer.Errors():
            fmt.Printf("err :%s\n", err.Error())
        }
    }

}

元数据接口

func metadata_test() {
    fmt.Printf("metadata test\n")

    config := sarama.NewConfig()
    config.Version = sarama.V0_11_0_2

    client, err := sarama.NewClient([]string{"localhost:9092"}, config)
    if err != nil {
        fmt.Printf("metadata_test try create client err :%s\n", err.Error())
        return
    }

    defer client.Close()

    // get topic set
    topics, err := client.Topics()
    if err != nil {
        fmt.Printf("try get topics err %s\n", err.Error())
        return
    }

    fmt.Printf("topics(%d):\n", len(topics))

    for _, topic := range topics {
        fmt.Println(topic)
    }

    // get broker set
    brokers := client.Brokers()
    fmt.Printf("broker set(%d):\n", len(brokers))
    for _, broker := range brokers {
        fmt.Printf("%s\n", broker.Addr())
    }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

CSS世界

CSS世界

张鑫旭 / 人民邮电出版社 / 2017-12 / CNY 69.00

本书从前端开发人员的需求出发,以“流”为线索,从结构、内容到美化装饰等方面,全面且深入地讲解前端开发人员必须了解和掌握的大量的CSS知识点。同时,作者结合多年的从业经验,通过大量的实战案例,详尽解析CSS的相关知识与常见问题。作者还为本书开发了专门的配套网站,进行实例展示、问题答疑。 作为一本CSS深度学习的书,书中介绍大量许多前端开发人员都不知道的CSS知识点。通过阅读本书,读者会对CSS......一起来看看 《CSS世界》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具