内容简介:golang如何使用sarama访问kafka下面一个客户端代码例子访问kafka服务器,来发送和接受消息。使用方式
golang如何使用sarama访问kafka
下面一个客户端代码例子访问kafka服务器,来发送和接受消息。
使用方式
- 命令行参数
$ ./kafkaclient -h
Usage of ./client:
-ca string
CA Certificate (default "ca.pem")
-cert string
Client Certificate (default "cert.pem")
-command string
consumer|producer (default "consumer")
-host string
Common separated kafka hosts (default "localhost:9093")
-key string
Client Key (default "key.pem")
-partition int
Kafka topic partition
-tls
TLS enable
-topic string
Kafka topic (default "test--topic")
- 作为producer启动
$ ./kafkaclient -command producer \ -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command producer \ -tls -cert client.pem -key client.key -ca ca.pem \ -host kafka1:9093,kafka2:9093
producer发送消息给kafka:
> aaa 2018/12/15 07:11:21 Produced message: [aaa] > bbb 2018/12/15 07:11:30 Produced message: [bbb] > quit
- 作为consumer启动
$ ./kafkaclient -command consumer \ -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command consumer \ -tls -cert client.pem -key client.key -ca ca.pem \ -host kafka1:9093,kafka2:9093
consumer从kafka接受消息:
2018/12/15 07:11:21 Consumed message: [aaa], offset: [4] 2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]
完整源代码如下
这个代码使用到了Shopify/sarama库,请自行下载使用。
$ cat kafkaclient.go
package main
import (
"flag"
"fmt"
"log"
"os"
"io/ioutil"
"bufio"
"strings"
"crypto/tls"
"crypto/x509"
"github.com/Shopify/sarama"
)
var (
command string
tlsEnable bool
hosts string
topic string
partition int
clientcert string
clientkey string
cacert string
)
func main() {
flag.StringVar(&command, "command", "consumer", "consumer|producer")
flag.BoolVar(&tlsEnable, "tls", false, "TLS enable")
flag.StringVar(&hosts, "host", "localhost:9093", "Common separated kafka hosts")
flag.StringVar(&topic, "topic", "test--topic", "Kafka topic")
flag.IntVar(&partition, "partition", 0, "Kafka topic partition")
flag.StringVar(&clientcert, "cert", "cert.pem", "Client Certificate")
flag.StringVar(&clientkey, "key", "key.pem", "Client Key")
flag.StringVar(&cacert, "ca", "ca.pem", "CA Certificate")
flag.Parse()
config := sarama.NewConfig()
if tlsEnable {
//sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
if err != nil {
log.Fatal(err)
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
client, err := sarama.NewClient(strings.Split(hosts, ","), config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
if command == "consumer" {
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
loopConsumer(consumer, topic, partition)
} else {
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
loopProducer(producer, topic, partition)
}
}
func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
// load client cert
clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
if err != nil {
return nil, err
}
// load ca cert pool
cacert, err := ioutil.ReadFile(cacertfile)
if err != nil {
return nil, err
}
cacertpool := x509.NewCertPool()
cacertpool.AppendCertsFromPEM(cacert)
// generate tlcconfig
tlsConfig := tls.Config{}
tlsConfig.RootCAs = cacertpool
tlsConfig.Certificates = []tls.Certificate{clientcert}
tlsConfig.BuildNameToCertificate()
// tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
return &tlsConfig, err
}
func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
scanner := bufio.NewScanner(os.Stdin)
fmt.Print("> ")
for scanner.Scan() {
text := scanner.Text()
if text == "" {
} else if text == "exit" || text == "quit" {
break
} else {
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
}
fmt.Print("> ")
}
}
func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
log.Println(err)
return
}
defer partitionConsumer.Close()
for {
msg := <-partitionConsumer.Messages()
log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
}
}
编译:
$ go build kafkaclient.go
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 如何使用不同语言访问bitcoinj
- 使用Fluentd + Elasticsearch收集访问日志
- [译] 使用Laravel访问前端Cookie
- python使用锁访问共享变量实例解析
- 使用.net通过odbc访问Hive
- PHP CURL方式使用代理访问网站
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Algorithms on Strings, Trees and Sequences
Dan Gusfield / Cambridge University Press / 1997-5-28 / USD 99.99
String algorithms are a traditional area of study in computer science. In recent years their importance has grown dramatically with the huge increase of electronically stored text and of molecular seq......一起来看看 《Algorithms on Strings, Trees and Sequences》 这本书的介绍吧!