Kafka学习笔记 -- Avro + Twitter Bijection

栏目: 后端 · 发布时间: 6年前

内容简介:路径:src/main/resources/user.json
  1. 编译Schema
  2. 依赖于Avro实现 自定义的序列化器和反序列化器

引入依赖

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>bijection-avro_2.12</artifactId>
    <version>0.9.6</version>
</dependency>

Schema

路径:src/main/resources/user.json

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name",  "type": "string"},
    {"name": "age", "type": "int"}
  ]
}

发送消息

String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
Producer<String, byte[]> producer = new KafkaProducer<>(props);

for (int i = 0; i < 10; i++) {
    GenericData.Record record = new GenericData.Record(schema);
    record.put("id", i);
    record.put("name", "zhongmingmao" + i);
    record.put("age", i);
    byte[] bytes = recordInjection.apply(record);
    ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("zhongmingmao", bytes);
    producer.send(producerRecord);
    TimeUnit.SECONDS.sleep(1);
}
producer.close();

消费消息

String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "zhongmingmao");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("zhongmingmao"));

try {
    while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(100);
        for (ConsumerRecord<String, byte[]> record : records) {
            GenericRecord genericRecord = recordInjection.invert(record.value()).get();
            log.info("id={}, name={}, age={}, partition={}, offset={}",
                    genericRecord.get("id"), genericRecord.get("name"), genericRecord.get("age"),
                    record.partition(), record.offset());
        }
        TimeUnit.SECONDS.sleep(1);
    }
} finally {
    consumer.close();
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Dojo权威指南

Dojo权威指南

拉塞尔 / 李松峰、李丽 / 机械工业出版社 / 2009-4 / 79.00元

通过使用Dojo这个工业强度的JavaScript工具箱,我们可以比使用其他任何Ajax框架更高效、更容易地创建JavaScript或Ajax驱动的应用程序和站点。 《Dojo权威指南》向读者展示了如何充分利用Dojo工具箱中包含的大量实用特性,以前所未有的效率开发出功能丰富、响应敏捷的Web应用程序。读者通过《Dojo权威指南》能够学习到创建复杂布局和表单控件(常见于高级桌面应用程序)的技......一起来看看 《Dojo权威指南》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具