{ "namespace": "me.zhongmingmao.avro", "type": "record", "name": "Stock", "fields": [ {"name": "stockCode", "type": "string"}, {"name": "stockName", "type": "string"}, {"name": "tradeTime", "type": "long"}, {"name": "preClosePrice", "type": "float"}, {"name": "openPrice", "type": "float"}, {"name": "currentPrice", "type": "float"}, {"name": "highPrice", "type": "float"}, {"name": "lowPrice", "type": "float"} ] }
编译Schema
mvn clean compile
自定义序列化器
public class StockSerializer implements Serializer<Stock> { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, Stock data) { if (null == data) { return null; } DatumWriter<Stock> datumWriter = new SpecificDatumWriter<>(data.getSchema()); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream, null); try { datumWriter.write(data, encoder); } catch (IOException e) { throw new SerializationException(e); } return outputStream.toByteArray(); } @Override public void close() { } }
自定义反序列化器
public class StockDeserializer implements Deserializer<Stock> { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public Stock deserialize(String topic, byte[] data) { if (null == data) { return null; } Stock stock = new Stock(); DatumReader<Stock> datumReader = new SpecificDatumReader<>(stock.getSchema()); ByteArrayInputStream in = new ByteArrayInputStream(data); BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null); try { stock = datumReader.read(null, decoder); } catch (IOException e) { throw new SerializationException(e); } return stock; } @Override public void close() { } }
发送消息
List<Stock> stocks = Lists.newArrayList(); for (int i = 0; i < 10; i++) { Stock stock = Stock.newBuilder() .setStockCode(String.valueOf(i)) .setStockName("stock" + i) .setTradeTime(System.currentTimeMillis()) .setPreClosePrice(100).setOpenPrice(200) .setCurrentPrice(300).setHighPrice(400).setLowPrice(0).build(); stocks.add(stock); } Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StockSerializer.class.getName()); Producer<String, Stock> producer = new KafkaProducer<>(props); for (Stock stock : stocks) { ProducerRecord<String, Stock> record = new ProducerRecord<>("zhongmingmao", stock); RecordMetadata metadata = producer.send(record).get(); log.info("stock={}, partition={}, offset={}", stock, metadata.partition(), metadata.offset()); TimeUnit.SECONDS.sleep(1); }
消费消息
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "zhongmingmao"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StockDeserializer.class.getName()); KafkaConsumer<String, Stock> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("zhongmingmao")); try { while (true) { ConsumerRecords<String, Stock> records = consumer.poll(100); for (ConsumerRecord<String, Stock> record : records) { Stock stock = record.value(); log.info("stock={}", stock); } } } finally { consumer.close(); }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- Golang学习笔记-调度器学习
- Vue学习笔记(二)------axios学习
- 算法/NLP/深度学习/机器学习面试笔记
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。