内容简介:阅读文章,希望能解决以下问题:序列化主要是用来解决数据在网络中传输的问题. 在网络中传输的数据必须全是字节,也称为字节流. 而文本数据到字节数据的这一步就是序列化(将非字节数据 -> 字节数组).Kafka中的序列化主要是将发送的消息序列化成字节数组. 在Java中,有八大基本数据类型和引用类型. Kafka预先内置了一些相应的序列化和反序列化
Kafka消息序列化
阅读文章,希望能解决以下问题:
- 序列化主要解决的问题
- 不同的序列化对消息大小的影响
- 可以用序列化来解决消息太大的问题吗
概括
序列化主要是用来解决数据在网络中传输的问题. 在网络中传输的数据必须全是字节,也称为字节流. 而文本数据到字节数据的这一步就是序列化(将非字节数据 -> 字节数组).
Kafka中序列化
Kafka中的序列化主要是将发送的消息序列化成字节数组. 在 Java 中,有八大基本数据类型和引用类型. Kafka预先内置了一些相应的序列化和反序列化
Java类型 | 序列化 | 反序列化 |
---|---|---|
int | IntegerSerializer | IntegerDeserializer |
long | LongSerializer | LongDeserializer |
double | DoubleSerializer | DoubleDeserializer |
byte | BytesSerializer | BytesDeserializer |
byte | ByteArraySerializer | ByteArrayDeserializer |
byte | ByteBufferSerializer | ByteBufferDeserializer |
String | StringSerializer | StringDeserializer |
通过上面表格可以看出,Kafka并不是为所有的基本类型内置了对应的序列化器和反序列化器. 而且Kafka为对byte提供方便,内置了三个不同的序列化器和反序列化器. 同时,Kafka为一个引用类型-String,提供了序列化器和反序列化器,因为String太常用了.
// StringSerializer序列化代码 public class StringSerializer implements Serializer<String> { private String encoding = "UTF8"; @Override public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } } }
从代码可以看出,默认情况下会把字符串编码成UTF-8格式,然后在网络中传输.
自定义序列化器
Kafka自带的序列化器并不能满足所有的需求,假如我有一个用户对象,里面包含用户姓名,用户年龄... 但是Kafka中没有提供相对应的序列化器,需要自己实现一个. 实现一个序列化器很简单,只需要实现一个接口.
public interface Serializer<T> extends Closeable { // 配置该类 void configure(Map<String, ?> configs, boolean isKey); // 将数据转变为字节数组 byte[] serialize(String topic, T data); // 默认方法 default byte[] serialize(String topic, Headers headers, T data) { return serialize(topic, data); } // 关闭序列化器 @Override void close(); }
接下来,自己实现一个序列化器. 下面序列化器是商店顾客序列化器. 这里采用硬编码的方式,将该对象序列化成字节数组.
public class CustomerSerializer implements Serializer<Customer> { @Override public byte[] serialize(String topic, Customer data) { try { byte[] serializedName; int stringSize; if (data == null) { return null; } else { if (data.getName() != null) { serializedName = data.getName().getBytes(StandardCharsets.UTF_8); stringSize = serializedName.length; } else { serializedName = new byte[0]; stringSize = 0; } } final ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getCustomerId()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch (Exception e) { throw new SerializationException("Error when serializing Customer to byte[] " + e); } } }
自定义序列化器的劣势:
- 需要考虑向前兼容和向后兼容的问题,假如更新的反序列化能否对以前的消息进行支持.
- 需要将序列化和反序列化成匹配的出现
用第三方jar包实现自定义序列化
用JSON,ProtoBuf,Protostuff,Thrift...实现通过的序列化工具.
public class JsonSerializer implements Serializer<Customer> { private final Logger log = LoggerFactory.getLogger(JsonSerializer.class); @Override public byte[] serialize(String topic, Customer data) { byte[] result = null; try { // 关键代码,把对象序列化为字节数组. result = JSON.toJSONBytes(data); log.info("{} is serialize after the size is {}", data, result.length); } catch (Exception e) { e.printStackTrace(); } return result; } }
总结
序列化只是用来将非字节数据变为字节数组,最终实现数据在网络传输的目的. 然而想要通过序列化提升传输的性能(例如把序列化后的字节变少)是比较难实现的. 因为最终的字节数组要在消费端反序列化,因此消费者需要和生产者约定好(例如 1 代表 K, 2 代表 A ...).
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
ANSI Common Lisp
Paul Graham / Prentice Hall / 1995-11-12 / USD 116.40
For use as a core text supplement in any course covering common LISP such as Artificial Intelligence or Concepts of Programming Languages. Teaching students new and more powerful ways of thinking abo......一起来看看 《ANSI Common Lisp》 这本书的介绍吧!