Kafka学习笔记 -- Avro + Kafka Native API

栏目: 后端 · 发布时间: 6年前

{
    "namespace": "me.zhongmingmao.avro",
    "type": "record",
    "name": "Stock",
    "fields": [
        {"name": "stockCode", "type": "string"},
        {"name": "stockName",  "type": "string"},
        {"name": "tradeTime", "type": "long"},
        {"name": "preClosePrice", "type": "float"},
        {"name": "openPrice", "type": "float"},
        {"name": "currentPrice", "type": "float"},
        {"name": "highPrice", "type": "float"},
        {"name": "lowPrice", "type": "float"}
    ]
}

编译Schema

mvn clean compile

自定义序列化器

public class StockSerializer implements Serializer<Stock> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, Stock data) {
        if (null == data) {
            return null;
        }

        DatumWriter<Stock> datumWriter = new SpecificDatumWriter<>(data.getSchema());
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
        try {
            datumWriter.write(data, encoder);
        } catch (IOException e) {
            throw new SerializationException(e);
        }

        return outputStream.toByteArray();
    }

    @Override
    public void close() {
    }
}

自定义反序列化器

public class StockDeserializer implements Deserializer<Stock> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public Stock deserialize(String topic, byte[] data) {
        if (null == data) {
            return null;
        }

        Stock stock = new Stock();
        DatumReader<Stock> datumReader = new SpecificDatumReader<>(stock.getSchema());
        ByteArrayInputStream in = new ByteArrayInputStream(data);
        BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
        try {
            stock = datumReader.read(null, decoder);
        } catch (IOException e) {
            throw new SerializationException(e);
        }
        return stock;
    }

    @Override
    public void close() {
    }
}

发送消息

List<Stock> stocks = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
    Stock stock = Stock.newBuilder()
            .setStockCode(String.valueOf(i))
            .setStockName("stock" + i)
            .setTradeTime(System.currentTimeMillis())
            .setPreClosePrice(100).setOpenPrice(200)
            .setCurrentPrice(300).setHighPrice(400).setLowPrice(0).build();
    stocks.add(stock);
}

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StockSerializer.class.getName());

Producer<String, Stock> producer = new KafkaProducer<>(props);

for (Stock stock : stocks) {
    ProducerRecord<String, Stock> record = new ProducerRecord<>("zhongmingmao", stock);
    RecordMetadata metadata = producer.send(record).get();
    log.info("stock={}, partition={}, offset={}", stock, metadata.partition(), metadata.offset());
    TimeUnit.SECONDS.sleep(1);
}

消费消息

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "zhongmingmao");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StockDeserializer.class.getName());

KafkaConsumer<String, Stock> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("zhongmingmao"));

try {
    while (true) {
        ConsumerRecords<String, Stock> records = consumer.poll(100);
        for (ConsumerRecord<String, Stock> record : records) {
            Stock stock = record.value();
            log.info("stock={}", stock);
        }
    }
} finally {
    consumer.close();
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

HTML5秘籍(第2版)

HTML5秘籍(第2版)

[美] Matthew MacDonald / 李松峰、朱巍、刘帅 / 人民邮电出版社 / 2015-4 / 89.00元

不依赖插件添加音频和视频,构建适用于所有浏览器的播放页面。 用Canvas创建吸引人的视觉效果,绘制图形、图像、文本,播放动画,运行交互游戏。 用CSS3将页面变活泼,比如添加新奇的字体,利用变换和动画添加吸引人的效果。 设计更出色的Web表单,利用HTML5新增的表单元素更加高效地收集访客信息。 一次开发,多平台运行,实现响应式设计,创建适配桌面计算机、平板电脑和智能手机......一起来看看 《HTML5秘籍(第2版)》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具