Kafka学习笔记 -- Avro + Twitter Bijection

栏目: 后端 · 发布时间: 7年前

内容简介:路径:src/main/resources/user.json
  1. 编译Schema
  2. 依赖于Avro实现 自定义的序列化器和反序列化器

引入依赖

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>bijection-avro_2.12</artifactId>
    <version>0.9.6</version>
</dependency>

Schema

路径:src/main/resources/user.json

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name",  "type": "string"},
    {"name": "age", "type": "int"}
  ]
}

发送消息

String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
Producer<String, byte[]> producer = new KafkaProducer<>(props);

for (int i = 0; i < 10; i++) {
    GenericData.Record record = new GenericData.Record(schema);
    record.put("id", i);
    record.put("name", "zhongmingmao" + i);
    record.put("age", i);
    byte[] bytes = recordInjection.apply(record);
    ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("zhongmingmao", bytes);
    producer.send(producerRecord);
    TimeUnit.SECONDS.sleep(1);
}
producer.close();

消费消息

String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "zhongmingmao");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("zhongmingmao"));

try {
    while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(100);
        for (ConsumerRecord<String, byte[]> record : records) {
            GenericRecord genericRecord = recordInjection.invert(record.value()).get();
            log.info("id={}, name={}, age={}, partition={}, offset={}",
                    genericRecord.get("id"), genericRecord.get("name"), genericRecord.get("age"),
                    record.partition(), record.offset());
        }
        TimeUnit.SECONDS.sleep(1);
    }
} finally {
    consumer.close();
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

汇编语言(第2版)

汇编语言(第2版)

王爽 / 清华大学出版社 / 2008-4 / 33.00元

《汇编语言(第2版)》是各种CPU提供的机器指令的助记符的集合,人们可以用汇编语言直接控制硬件系统进行工作。汇编语言是很多相关课程(如数据结构、操作系统、微机原理等)的重要基础。为了更好地引导、帮助读者学习汇编语言,作者以循序渐进的思想精心创作了《汇编语言(第2版)》。《汇编语言(第2版)》具有如下特点:采用了全新的结构对课程的内容进行组织,对知识进行最小化分割,为读者构造了循序渐进的学习线索;在......一起来看看 《汇编语言(第2版)》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

随机密码生成器
随机密码生成器

多种字符组合密码