内容简介:路径:src/main/resources/user.json
- 编译Schema
- 依赖于Avro实现 自定义的序列化器和反序列化器
引入依赖
<dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.12</artifactId> <version>0.9.6</version> </dependency>
Schema
路径:src/main/resources/user.json
{ "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] }
发送消息
String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath(); Schema schema = new Schema.Parser().parse(new File(schemaFilePath)); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", ByteArraySerializer.class.getName()); Producer<String, byte[]> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { GenericData.Record record = new GenericData.Record(schema); record.put("id", i); record.put("name", "zhongmingmao" + i); record.put("age", i); byte[] bytes = recordInjection.apply(record); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("zhongmingmao", bytes); producer.send(producerRecord); TimeUnit.SECONDS.sleep(1); } producer.close();
消费消息
String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath(); Schema schema = new Schema.Parser().parse(new File(schemaFilePath)); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "zhongmingmao"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("zhongmingmao")); try { while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(100); for (ConsumerRecord<String, byte[]> record : records) { GenericRecord genericRecord = recordInjection.invert(record.value()).get(); log.info("id={}, name={}, age={}, partition={}, offset={}", genericRecord.get("id"), genericRecord.get("name"), genericRecord.get("age"), record.partition(), record.offset()); } TimeUnit.SECONDS.sleep(1); } } finally { consumer.close(); }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- Golang学习笔记-调度器学习
- Vue学习笔记(二)------axios学习
- 算法/NLP/深度学习/机器学习面试笔记
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Tango with Django
David Maxwell、Leif Azzopardi / Leanpub / 2016-11-12 / USD 19.00
Tango with Django is a beginner's guide to web development using the Python programming language and the popular Django web framework. The book is written in a clear and friendly style teaching you th......一起来看看 《Tango with Django》 这本书的介绍吧!