Message
在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:
- MessageProperties // 消息属性
- byte[] body // 消息内容
如下使用 Message 类型接收数据,当监听到队列 hello中有消息时则会进行接收并处理,MessageConvert 会直接转换成消息类型,并绑定在对应被注解的方法中。
消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)
@RabbitListener(queues = "hello")
public void receive1(Message message) {
// 消息携带属性参数
MessageProperties properties = message.getMessageProperties();
// 消息内容,二进制数据
byte[] body = message.getBody();
}
MessageConvert
- 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等
消息转换器 MessageConverter 重要的两个方法
// 将 java 对象和属性对象转换成 Message 对象。
Message toMessage(Object object, MessageProperties messageProperties);// 将消息对象转换成 java 对象。
Object fromMessage(Message message) throws MessageConversionException;
在 @RabbitListener 注解的方法中,使用 @Payload 和 @Headers 注解可以获取消息中的 body 和 headers 消息。它们都会被 MessageConvert 转换器解析转换后(使用 fromMessage 方法进行转换),将结果绑定在对应注解的方法中。
/**
* 这里是监听 hello 队列,并将接收到的消息体 body 根据 MessageConvert 转换器转换成 String 类型输出
* @Headers 获取所有头部属性消息,也可以用 @Header 获取单个 header 消息
*/
@RabbitListener(queues = "hello")
public void processMessage1(@Payload String message,
@Headers Map<String,Object> headers,
@Header(value = AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {
System.out.println("message:" + message);
System.out.println("Headers:" + headers);
System.out.println("routingKey: " + routingKey);
}
默认消费者消费时,消息的 content_type 属性表示消息 body 数据以什么数据格式传输存储,直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
- application/octet-stream:二进制字节数组存储,使用 byte[]
- application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
- text/plain:文本数据类型存储,使用 String
- application/json:JSON 格式,使用 Object、相应类型
自定义 MessageConverter 方法,进行消费消息的处理
如下是自定义针对 fromMessage 方法,将消费的消息转换成 TestA 对象
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 设置自定义的 MessageConverter
factory.setMessageConverter(new MessageConverter() {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return null;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))){
// 返回自定义的对象
return (TestA)ois.readObject();
}catch (Exception e){
e.printStackTrace();
return null;
}
}
});
return factory;
}
}
class TestA implements Serializable {
String fieldA;
String fieldB;
// 省略 Get Set ToString
}
生产者:
@Test
public void producer() {
String routingKey = "hello";
TestA a = new TestA();
a.setFieldA("FBI WARNING");
rabbitTemplate.convertAndSend(routingKey, a);
System.out.println("发送成功");
}
消费者:
@Component
@RabbitListener(queues = "hello")
public class receiver {
@RabbitHandler
public void processMessage1(TestA test) {
System.out.println(test.getFieldA());
}
}
输出:FBI WARNING
通过 @RabbitListener 注解声明 Binding
- 通过 @RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、RouteKey 则自动创建,若存在则抛出异常)
- @RabbitListener 注解可以指定目标方法来作为消费信息的方法,可以通过注解参数指定所监听的队列或 binding。使用@RabbitListener 可以设置一个自己明确默认值的 RabbitListenerContainerFactory 对象
下面是一个 demo 的示例,供参考:
@RabbitListener(bindings = {
@QueueBinding(
// 队列配置
value = @Queue(value = "rabbit.mq.test",
durable = "true",
// 配置死信队列的参数
arguments = {
@Argument(name = "x-dead-letter-exchange", value = "target_exchange"),
@Argument(name = "x-dead-letter-routing-key", value = "target_routing_key")
}),
// 交换机配置
exchange = @Exchange(
value = "rabbit_test_exchange",
durable = "true",
type = ExchangeTypes.TOPIC),
key = "rabbit.test.*")},
// 可以指定容器工厂,默认使用 rabbitListenerContainerFactory
containerFactory = "rabbitListenerContainerFactory",
// 指定消费者的线程数量,一个线程会打开一个Channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些
concurrency = "5-10")
public void handleMessage(@Payload String message,
@Headers Map<String,Object> headers,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
// 消费者标签,用来区分多个消费者
@Header(AmqpHeaders.CONSUMER_TAG) String consumerTag,
// message_id
@Header(value = AmqpHeaders.MESSAGE_ID, required = false) String messageId,
// 该消息是否多次(>1)交付
@Header("amqp_redelivered") boolean redelivered) {
System.out.println("====消费消息===handleMessage");
System.out.println("消息:" + message);
System.out.println("头:" + headers);
System.out.println("deliveryTag:" + deliveryTag);
System.out.println("messageId:" + messageId);
System.out.println("redelivered:" + redelivered);
}
@RabbitListener 和 @RabbitHandler 搭配使用
- @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用。
- @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 注解的方法进行分发处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
消费者:
@Component
@RabbitListener(queues = "hello")
public class consumer {
@RabbitHandler
public void receive(byte[] msg) {
System.out.println("byte[] 消费者消费信息:" + new String(msg));
}
@RabbitHandler
public void receive(String msg) {
System.out.println("String 消费者消费信息:" + msg);
}
@RabbitHandler
public void receive(Integer msg) {
System.out.println("Integer 消费者消费信息:" + msg);
}
@RabbitHandler
public void receive(TestA a) {
System.out.println("对象 TestA 消费者消费信息:" + a.toString());
}
}
class TestA implements Serializable {
String fieldA;
String fieldB;
// 省略 Get Set ToString
}
生产者:
@Test
public void demo_06_Producer() {
String routingKey = "hello";
String str = FBI OPEN THE DOOR";
Integer i = 102424;
TestA a = new TestA();
a.setFieldA("FBI WARNING");
// rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(routingKey, a);
rabbitTemplate.convertAndSend(routingKey, i);
rabbitTemplate.convertAndSend(routingKey, str);
System.out.println("发送成功");
}
输出:
对象 TestA 消费者消费信息:”FBI WARNING
Integer 消费者消费信息:102424
String 消费者消费信息:FBI OPEN THE DOOR
这里生产者在发送消息时,调用了 RabbitTemplate 中的 convertAndSend 方法会使用 MessageConvert 对 TestA 对象进行消息的序列化,其默认的实现也是 SimpleMessageConverter
为您推荐与 rabbitmq 相关的帖子:
- RabbitMQ 学习笔记 -- 01 简介
- RabbitMQ 学习笔记 -- 02 一个 HelloWorld
- RabbitMQ 学习笔记 -- 03 多消费者
- RabbitMQ 学习笔记 -- 04 扇形交换机
- RabbitMQ 学习笔记 -- 05 路由模式
- RabbitMQ 学习笔记 -- 06 Topic 交换机
- RabbitMQ 学习笔记 -- 08 RabbitTemplate 及消息序列化
- RabbitMQ 学习笔记 -- 09 RabbitMQ 的持久化
- RabbitMQ 学习笔记 -- 10 RabbitMQ 消费者确认和发布者确认
- RabbitMQ 学习笔记 -- 11 RabbitMQ 死信队列
- RabbitMQ 消息队列模型使用介绍
- RabbitMQ 3.11.0 已发布
- RabbitMQ 3.11.6 发布
- RabbitMQ 3.11.8 已发布,AMQP 开源实现
- RabbitMQ 3.11.13 已发布,AMQP 开源实现
- RabbitMQ 3.11.14 已发布,AMQP 开源实现