Kafka消息序列化

栏目: 后端 · 发布时间: 6年前

内容简介:阅读文章,希望能解决以下问题:序列化主要是用来解决数据在网络中传输的问题. 在网络中传输的数据必须全是字节,也称为字节流. 而文本数据到字节数据的这一步就是序列化(将非字节数据 -> 字节数组).Kafka中的序列化主要是将发送的消息序列化成字节数组. 在Java中,有八大基本数据类型和引用类型. Kafka预先内置了一些相应的序列化和反序列化

Kafka消息序列化

阅读文章,希望能解决以下问题:

  • 序列化主要解决的问题
  • 不同的序列化对消息大小的影响
  • 可以用序列化来解决消息太大的问题吗

概括

序列化主要是用来解决数据在网络中传输的问题. 在网络中传输的数据必须全是字节,也称为字节流. 而文本数据到字节数据的这一步就是序列化(将非字节数据 -> 字节数组).

Kafka中序列化

Kafka中的序列化主要是将发送的消息序列化成字节数组. 在 Java 中,有八大基本数据类型和引用类型. Kafka预先内置了一些相应的序列化和反序列化

Java类型 序列化 反序列化
int IntegerSerializer IntegerDeserializer
long LongSerializer LongDeserializer
double DoubleSerializer DoubleDeserializer
byte BytesSerializer BytesDeserializer
byte ByteArraySerializer ByteArrayDeserializer
byte ByteBufferSerializer ByteBufferDeserializer
String StringSerializer StringDeserializer

通过上面表格可以看出,Kafka并不是为所有的基本类型内置了对应的序列化器和反序列化器. 而且Kafka为对byte提供方便,内置了三个不同的序列化器和反序列化器. 同时,Kafka为一个引用类型-String,提供了序列化器和反序列化器,因为String太常用了.

// StringSerializer序列化代码
public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }
}

从代码可以看出,默认情况下会把字符串编码成UTF-8格式,然后在网络中传输.

自定义序列化器

Kafka自带的序列化器并不能满足所有的需求,假如我有一个用户对象,里面包含用户姓名,用户年龄... 但是Kafka中没有提供相对应的序列化器,需要自己实现一个. 实现一个序列化器很简单,只需要实现一个接口.

public interface Serializer<T> extends Closeable {

    // 配置该类
    void configure(Map<String, ?> configs, boolean isKey);

    // 将数据转变为字节数组
    byte[] serialize(String topic, T data);

    // 默认方法
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    // 关闭序列化器
    @Override
    void close();
}

接下来,自己实现一个序列化器. 下面序列化器是商店顾客序列化器. 这里采用硬编码的方式,将该对象序列化成字节数组.

public class CustomerSerializer implements Serializer<Customer> {
    @Override
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int stringSize;
            if (data == null) {
                return null;
            } else {
                if (data.getName() != null) {
                    serializedName = data.getName().getBytes(StandardCharsets.UTF_8);
                    stringSize = serializedName.length;
                } else {
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }

            final ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getCustomerId());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        }
    }
}

自定义序列化器的劣势:

  • 需要考虑向前兼容和向后兼容的问题,假如更新的反序列化能否对以前的消息进行支持.
  • 需要将序列化和反序列化成匹配的出现

用第三方jar包实现自定义序列化

用JSON,ProtoBuf,Protostuff,Thrift...实现通过的序列化工具.

public class JsonSerializer implements Serializer<Customer> {

    private final Logger log = LoggerFactory.getLogger(JsonSerializer.class);

    @Override
    public byte[] serialize(String topic, Customer data) {
        byte[] result = null;
        try {
            // 关键代码,把对象序列化为字节数组.
            result = JSON.toJSONBytes(data);
            log.info("{} is serialize after the size is {}", data, result.length);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
}

总结

序列化只是用来将非字节数据变为字节数组,最终实现数据在网络传输的目的. 然而想要通过序列化提升传输的性能(例如把序列化后的字节变少)是比较难实现的. 因为最终的字节数组要在消费端反序列化,因此消费者需要和生产者约定好(例如 1 代表 K, 2 代表 A ...).


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据库索引设计与优化

数据库索引设计与优化

【美】Tapio Lahdenmaki、【美】Michael Leach / 曹怡倩、赵建伟 / 电子工业出版社 / 2015-6 / 79.00元

《数据库索引设计与优化》提供了一种简单、高效、通用的关系型数据库索引设计方法。作者通过系统的讲解及大量的案例清晰地阐释了关系型数据库的访问路径选择原理,以及表和索引的扫描方式,详尽地讲解了如何快速地估算SQL 运行的CPU 时间及执行时间,帮助读者从原理上理解SQL、表及索引结构、访问方式等对关系型数据库造成的影响,并能够运用量化的方法进行判断和优化,指导关系型数据库的索引设计。 《数据库索......一起来看看 《数据库索引设计与优化》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

随机密码生成器
随机密码生成器

多种字符组合密码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具