Kafka学习笔记 -- Avro + Twitter Bijection

栏目: 后端 · 发布时间: 7年前

内容简介:路径:src/main/resources/user.json
  1. 编译Schema
  2. 依赖于Avro实现 自定义的序列化器和反序列化器

引入依赖

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>bijection-avro_2.12</artifactId>
    <version>0.9.6</version>
</dependency>

Schema

路径:src/main/resources/user.json

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name",  "type": "string"},
    {"name": "age", "type": "int"}
  ]
}

发送消息

String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
Producer<String, byte[]> producer = new KafkaProducer<>(props);

for (int i = 0; i < 10; i++) {
    GenericData.Record record = new GenericData.Record(schema);
    record.put("id", i);
    record.put("name", "zhongmingmao" + i);
    record.put("age", i);
    byte[] bytes = recordInjection.apply(record);
    ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("zhongmingmao", bytes);
    producer.send(producerRecord);
    TimeUnit.SECONDS.sleep(1);
}
producer.close();

消费消息

String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "zhongmingmao");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("zhongmingmao"));

try {
    while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(100);
        for (ConsumerRecord<String, byte[]> record : records) {
            GenericRecord genericRecord = recordInjection.invert(record.value()).get();
            log.info("id={}, name={}, age={}, partition={}, offset={}",
                    genericRecord.get("id"), genericRecord.get("name"), genericRecord.get("age"),
                    record.partition(), record.offset());
        }
        TimeUnit.SECONDS.sleep(1);
    }
} finally {
    consumer.close();
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

颠覆者:周鸿祎自传

颠覆者:周鸿祎自传

周鸿祎、范海涛 / 北京联合出版公司 / 2017-11 / 49.80元

周鸿祎,一个在中国互联网历史上举足轻重的名字。他被认为是奠定当今中国互联网格局的人之一。 作为第一代互联网人,中国互联网行业最好的产品经理、创业者,他每时每刻都以自己的实践,为互联网的发展贡献自己的力量。 在很长一段时间内,他没有在公共场合发声,甚至有粉丝对当前死水一潭的互联网现状不满意,发出了“人民想念周鸿祎”的呼声。 但周鸿祎在小时候,却是一个踢天弄井,动不动就大闹天宫的超级......一起来看看 《颠覆者:周鸿祎自传》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试