golang如何使用sarama访问kafka

栏目: Go · 发布时间: 5年前

https://www.jianshu.com/p/3102418e5a7d

golang如何使用sarama访问kafka
golang连接kafka有三种client认证方式:

无认证
TLS认证
SASL/PLAIN认证, (其他SASL/SCRAM, SASL/GSSAPI都不支持)

下面一个客户端代码例子访问kafka服务器,来发送和接受消息。

使用方式


命令行参数

$ ./kafkaclient -h
Usage of ./client:
  -ca string
        CA Certificate (default "ca.pem")
  -cert string
        Client Certificate (default "cert.pem")
  -command string
        consumer|producer (default "consumer")
  -host string
        Common separated kafka hosts (default "localhost:9093")
  -key string
        Client Key (default "key.pem")
  -partition int
        Kafka topic partition
  -password string
        SASL password
  -sasl
        SASL enable
  -tls
        TLS enable
  -topic string
        Kafka topic (default "test--topic")
  -username string
        SASL username


作为producer启动

$ ./kafkaclient -command producer \
  -host kafka1:9092,kafka2:9092

## SASL/PLAIN enable
$ ./kafkaclient -command producer \
  -sasl -username kafkaclient -password kafkapassword \
  -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command producer \
  -tls -cert client.pem -key client.key -ca ca.pem \
  -host kafka1:9093,kafka2:9093

producer发送消息给kafka:
> aaa
2018/12/15 07:11:21 Produced message: [aaa]
> bbb
2018/12/15 07:11:30 Produced message: [bbb]
> quit


作为consumer启动

$ ./kafkaclient -command consumer \
  -host kafka1:9092,kafka2:9092

## SASL/PLAIN enabled
$ ./kafkaclient -command consumer \
  -sasl -username kafkaclient -password kafkapassword \
  -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command consumer \
  -tls -cert client.pem -key client.key -ca ca.pem \
  -host kafka1:9093,kafka2:9093

consumer从kafka接受消息:
2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]


完整源代码如下

这个代码使用到了Shopify/sarama库,请自行下载使用。
$ cat kafkaclient.go
package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "io/ioutil"
    "bufio"
    "strings"

    "crypto/tls"
    "crypto/x509"

    "github.com/Shopify/sarama"
)

var (
    command     string
    hosts       string
    topic       string
    partition   int
    saslEnable  bool
    username    string
    password    string
    tlsEnable   bool
    clientcert  string
    clientkey   string
    cacert      string
)

func main() {
    flag.StringVar(&command,    "command",      "consumer",         "consumer|producer")
    flag.StringVar(&hosts,      "host",         "localhost:9093",   "Common separated kafka hosts")
    flag.StringVar(&topic,      "topic",        "test--topic",      "Kafka topic")
    flag.IntVar(&partition,     "partition",    0,                  "Kafka topic partition")

    flag.BoolVar(&saslEnable,   "sasl",          false,             "SASL enable")
    flag.StringVar(&username,   "username",      "",                "SASL Username")
    flag.StringVar(&password,   "password",      "",                "SASL Password")

    flag.BoolVar(&tlsEnable,    "tls",          false,              "TLS enable")
    flag.StringVar(&clientcert, "cert",         "cert.pem",         "Client Certificate")
    flag.StringVar(&clientkey,  "key",          "key.pem",          "Client Key")
    flag.StringVar(&cacert,     "ca",           "ca.pem",           "CA Certificate")
    flag.Parse()

    config := sarama.NewConfig()
    if saslEnable {
        config.Net.SASL.Enable   = true
        config.Net.SASL.User     = username
        config.Net.SASL.Password = password
    }

    if tlsEnable {
        //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
        tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
        if err != nil {
            log.Fatal(err)
        }

        config.Net.TLS.Enable = true
        config.Net.TLS.Config = tlsConfig
    }

    client, err := sarama.NewClient(strings.Split(hosts, ","), config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    if command == "consumer" {
        consumer, err := sarama.NewConsumerFromClient(client)
        if err != nil {
            log.Fatal(err)
        }
        defer consumer.Close()
        loopConsumer(consumer, topic, partition)
    } else {
        producer, err := sarama.NewAsyncProducerFromClient(client)
        if err != nil {
            log.Fatal(err)
        }
        defer producer.Close()
        loopProducer(producer, topic, partition)
    }
}

func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
    // load client cert
    clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
    if err != nil {
        return nil, err
    }

    // load ca cert pool
    cacert, err := ioutil.ReadFile(cacertfile)
    if err != nil {
        return nil, err
    }
    cacertpool := x509.NewCertPool()
    cacertpool.AppendCertsFromPEM(cacert)

    // generate tlcconfig
    tlsConfig := tls.Config{}
    tlsConfig.RootCAs = cacertpool
    tlsConfig.Certificates = []tls.Certificate{clientcert}
    tlsConfig.BuildNameToCertificate()
 // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
    return &tlsConfig, err
}

func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
    scanner := bufio.NewScanner(os.Stdin)
    fmt.Print("> ")
    for scanner.Scan() {
        text := scanner.Text()
        if text == "" {
        } else if text == "exit" || text == "quit" {
            break
        } else {
            producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
            log.Printf("Produced message: [%s]\n",text)
        }
        fmt.Print("> ")
    }
}

func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
    partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
    if err != nil {
        log.Println(err)
        return
    }
    defer partitionConsumer.Close()

    for {
        msg := <-partitionConsumer.Messages()
        log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
    }
}

编译:
$ go build kafkaclient.go

以上所述就是小编给大家介绍的《golang如何使用sarama访问kafka》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

编程之道

编程之道

杰弗雷﹒詹姆斯 / 清华大学出版社 / 1999-05 / 18.00元

本书出自美国一位善于进行哲学思考、有十多年工作经验的程序设计师——杰弗雷·詹姆斯之手,他以一种敏锐的眼光审视着发生在程序设计室里的各种各样的小故事,并利用古老的道家思想对其进行分析。简单的故事蕴含深奥的道理,是本书的最大特色。本书语言优美,比喻生动,可读性极强。一起来看看 《编程之道》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

URL 编码/解码
URL 编码/解码

URL 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具