golang kafka 2

栏目: Go · 发布时间: 6年前

package main

import (
    "fmt"
    "math/rand"
    "os"
    "strconv"
    "strings"
    "time"

    "github.com/Shopify/sarama"
    "github.com/bsm/sarama-cluster" //support automatic consumer-group rebalancing and offset tracking
    "github.com/sdbaiguanghe/glog"
)

var (
    topics = "test0"
)

// consumer 消费者
func consumer() {
    groupID := "group-1"
    config := cluster.NewConfig()
    config.Group.Return.Notifications = true
    config.Consumer.Offsets.CommitInterval = 1 * time.Second
    config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始从最新的offset开始

    c, err := cluster.NewConsumer(strings.Split("localhost:9092", ","), groupID, strings.Split(topics, ","), config)
    if err != nil {
        glog.Errorf("Failed open consumer: %v", err)
        return
    }
    defer c.Close()
    go func(c *cluster.Consumer) {
        errors := c.Errors()
        noti := c.Notifications()
        for {
            select {
            case err := <-errors:
                glog.Errorln(err)
            case <-noti:
            }
        }
    }(c)

    for msg := range c.Messages() {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
        c.MarkOffset(msg, "") //MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset
    }
}

// syncProducer 同步生产者
// 并发量小时,可以用这种方式
func syncProducer() {
    config := sarama.NewConfig()
    //  config.Producer.RequiredAcks = sarama.WaitForAll
    //  config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
    defer p.Close()
    if err != nil {
        glog.Errorln(err)
        return
    }

    v := "sync: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
    fmt.Fprintln(os.Stdout, v)
    msg := &sarama.ProducerMessage{
        Topic: topics,
        Value: sarama.ByteEncoder(v),
    }
    if _, _, err := p.SendMessage(msg); err != nil {
        glog.Errorln(err)
        return
    }
}

// asyncProducer 异步生产者
// 并发量大时,必须采用这种方式
func asyncProducer() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true //必须有这个选项
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewAsyncProducer(strings.Split("localhost:9092", ","), config)
    defer p.Close()
    if err != nil {
        return
    }

    //必须有这个匿名函数内容
    go func(p sarama.AsyncProducer) {
        errors := p.Errors()
        success := p.Successes()
        for {
            select {
            case err := <-errors:
                if err != nil {
                    glog.Errorln(err)
                }
            case <-success:
            }
        }
    }(p)
    for {
        v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
        fmt.Fprintln(os.Stdout, v)
        msg := &sarama.ProducerMessage{
            Topic: topics,
            Value: sarama.ByteEncoder(v),
        }
        p.Input() <- msg
        time.Sleep(time.Second * 1)
    }
}

func main() {
    go asyncProducer()
    go consumer()
    time.Sleep(time.Second * 10000)
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

复杂:信息时代的连接、机会与布局

复杂:信息时代的连接、机会与布局

罗家德 / 中信出版集团股份有限公司 / 2017-8-1 / 49.00 元

信息科技一方面创造了人们互联的需要,另一方面让人们在互联中抱团以寻找归属感,因此创造了大大小小各类群体的认同和圈子力量的兴起,即互联的同时又产生了聚群,甚至聚群间的相斥。要如何分析这张网?如何预测它的未来变化?如何在网中寻找机会,实现突围?本书提出了4个关键概念──关系、圈子、自组织与复杂系统: • 关系 关系是人与人的连接,又可以被分为强关系和弱关系。强关系就是和你拥有亲密关系的人,......一起来看看 《复杂:信息时代的连接、机会与布局》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具