生产者发送消息时可以为消息指定一些参数
- Delivery mode: 是否持久化,1 - Non-persistent,2 - Persistent
- Headers:头文件可以有任何名称。这里只能设置长字符串头。
- Properties: 设置消息属性(传递模式和头信息是最常见的情况)。无效的属性将被忽略. Valid properties are:
content_type : 消息内容的类型
content_encoding: 消息内容的编码格式
priority: 消息的优先级
correlation_id:关联id
reply_to: 用于指定回复的队列的名称
expiration: 消息的失效时间
message _id: 消息id
timestamp:消息的时间戳
type: 类型
user_id: 用户id
app_id: 应用程序id
cluster_id: 集群id
- Payload: 消息内容
在 web 控制台发布消息配置:
web 控制台
RabbitTemplate
RabbitTemplate中,发现不管是初始化还是默认的MessageConverter都是SimpleMessageConverter
// messageConverter 默认实例化 SimpleMessageConverter
private MessageConverter messageConverter = new SimpleMessageConverter();
...
/**
* 便捷的构造器与setter注入一起使用
*/
public RabbitTemplate() {
initDefaultStrategies();
}
...
/**
* 设置默认策略。如果需要,子类可以重写。
*/
protected void initDefaultStrategies() {
setMessageConverter(new SimpleMessageConverter());
}
我们进入SimpleMessageConverter类中看其默认的转换逻辑
/**
* Converts from a AMQP Message to an Object.
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
// 如果 content_type 是以 text 为开头,则把消息转换成 String 类型
if (contentType != null && contentType.startsWith("text")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}
try {
content = new String(message.getBody(), encoding);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException("failed to convert text-based Message content", e);
}
}
// 其他类型转换为 application/x-java-serialized-object
else if (contentType != null && contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
try {
content = SerializationUtils.deserialize(
createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
}
catch (IOException | IllegalArgumentException | IllegalStateException e) {
throw new MessageConversionException("failed to convert serialized Message content", e);
}
}
}
if (content == null) {
content = message.getBody();
}
return content;
}
MessageConverter:
消息转换器 MessageConverter 重要的两个方法
// 将 java 对象和属性对象转换成 Message 对象。
Message toMessage(Object object, MessageProperties messageProperties);
// 将消息对象转换成 java 对象。
Object fromMessage(Message message) throws MessageConversionException;
RabbitTemplate 内部通过 MessageConverter 把 Message 转换成 java 对象,用于将对象参数解析为convertAndSend方法以及 receiveAndConvert 方法的对象结果。
默认转换器是 SimpleMessageConverter,它能够根据消息内容类型头处理字节数组、字符串和可序列化对象。
SimpleMessageConverter 处理的逻辑
- 如果 content_type 是以 text 为开头,则把消息转换成 String 类型
- 如果 content_type的 值是 application/x-java-serialized-object 则把消息序列化为 java 对象,否则,把消息转换成字节数组。
Message 内容对象序列化与反序列化
使用 Java 序列化与反序列化
- 默认的 SimpleMessageConverter 在发送消息时会将对象序列化成字节数组,若要反序列化对象,需要自定义 MessageConverter
- SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差。当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
使用 JSON 序列化与反序列化
- RabbitMQ 提供了 Jackson2JsonMessageConverter 来支持消息内容 JSON 序列化与反序列化
消息发送者在发送消息时应设置 MessageConverter 为 Jackson2JsonMessageConverter
@Test
public void demo_06_Producer() {String routingKey = "hello"; TestA a = new TestA(); a.setFieldA("FBI WARNING"); // 设置 MessageConverter rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.convertAndSend(routingKey, a); System.out.println("发送成功");
}
消费者也应该配置 MessageConverter 为 Jackson2JsonMessageConverter,这样消费者反序列化就能匹配成功
@Configuration
public class RabbitMQConfig {@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 临时设置 MessageConverter 为 Jackson2JsonMessageConverter factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; }
}
注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常
为您推荐与 rabbitmq 相关的帖子:
- RabbitMQ 学习笔记 -- 01 简介
- RabbitMQ 学习笔记 -- 02 一个 HelloWorld
- RabbitMQ 学习笔记 -- 03 多消费者
- RabbitMQ 学习笔记 -- 04 扇形交换机
- RabbitMQ 学习笔记 -- 05 路由模式
- RabbitMQ 学习笔记 -- 06 Topic 交换机
- RabbitMQ 学习笔记 -- 07 初探@RabbitListener
- RabbitMQ 学习笔记 -- 09 RabbitMQ 的持久化
- RabbitMQ 学习笔记 -- 10 RabbitMQ 消费者确认和发布者确认
- RabbitMQ 学习笔记 -- 11 RabbitMQ 死信队列
- RabbitMQ 消息队列模型使用介绍
- RabbitMQ 3.11.0 已发布
- RabbitMQ 3.11.6 发布
- RabbitMQ 3.11.8 已发布,AMQP 开源实现
- RabbitMQ 3.11.13 已发布,AMQP 开源实现
- RabbitMQ 3.11.14 已发布,AMQP 开源实现