RabbitMQ RabbitMQ 学习笔记 -- 07 初探@RabbitListener

tamir · 2020-10-24 16:14:37 · 热度: 565

Message

在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

  1. MessageProperties // 消息属性
  2. byte[] body // 消息内容

如下使用 Message 类型接收数据,当监听到队列 hello中有消息时则会进行接收并处理,MessageConvert 会直接转换成消息类型,并绑定在对应被注解的方法中。

消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

@RabbitListener(queues = "hello")
public void receive1(Message message) {
    // 消息携带属性参数
    MessageProperties properties = message.getMessageProperties();
    // 消息内容,二进制数据
    byte[] body = message.getBody();
}

MessageConvert

  • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
  • RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等

    消息转换器 MessageConverter 重要的两个方法
    // 将 java 对象和属性对象转换成 Message 对象。
    Message toMessage(Object object, MessageProperties messageProperties);

    // 将消息对象转换成 java 对象。
    Object fromMessage(Message message) throws MessageConversionException;

@RabbitListener

@RabbitListener 注解的方法中,使用 @Payload@Headers 注解可以获取消息中的 body 和 headers 消息。它们都会被 MessageConvert 转换器解析转换后(使用 fromMessage 方法进行转换),将结果绑定在对应注解的方法中。

/**
*    这里是监听 hello 队列,并将接收到的消息体 body 根据 MessageConvert 转换器转换成 String 类型输出
*    @Headers 获取所有头部属性消息,也可以用 @Header 获取单个 header 消息
*/
@RabbitListener(queues = "hello")
public void processMessage1(@Payload String message, 
                            @Headers Map<String,Object> headers, 
                            @Header(value = AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {
    System.out.println("message:" + message);
    System.out.println("Headers:" + headers);
    System.out.println("routingKey: " + routingKey);
}

默认消费者消费时,消息的 content_type 属性表示消息 body 数据以什么数据格式传输存储,直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

  • application/octet-stream:二进制字节数组存储,使用 byte[]
  • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
  • text/plain:文本数据类型存储,使用 String
  • application/json:JSON 格式,使用 Object、相应类型

自定义 MessageConverter 方法,进行消费消息的处理

如下是自定义针对 fromMessage 方法,将消费的消息转换成 TestA 对象

@Configuration
public class RabbitMQConfig {
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 设置自定义的 MessageConverter
        factory.setMessageConverter(new MessageConverter() {
            @Override
            public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
                return null;
            }

            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))){
                    // 返回自定义的对象
                    return (TestA)ois.readObject();
                }catch (Exception e){
                    e.printStackTrace();
                    return null;
                }
            }
        });
        return factory;
    }
}
class TestA implements Serializable {
    String fieldA;
    String fieldB;
    // 省略 Get Set ToString
}

生产者:

@Test
public void producer() {
    String routingKey = "hello";
    TestA a = new TestA();
    a.setFieldA("FBI WARNING");
    rabbitTemplate.convertAndSend(routingKey, a);
    System.out.println("发送成功");
}

消费者:

@Component
@RabbitListener(queues = "hello")
public class receiver {
    @RabbitHandler
    public void processMessage1(TestA test) {
        System.out.println(test.getFieldA());
    }
}

输出:FBI WARNING

通过 @RabbitListener 注解声明 Binding

  • 通过 @RabbitListener 的 bindings 属性声明 Binding(若 RabbitMQ 中不存在该绑定所需要的 Queue、Exchange、RouteKey 则自动创建,若存在则抛出异常)
  • @RabbitListener 注解可以指定目标方法来作为消费信息的方法,可以通过注解参数指定所监听的队列或 binding。使用@RabbitListener 可以设置一个自己明确默认值的 RabbitListenerContainerFactory 对象

下面是一个 demo 的示例,供参考:

@RabbitListener(bindings = {
                    @QueueBinding(
                    // 队列配置
                    value = @Queue(value = "rabbit.mq.test", 
                                    durable = "true",
                                    // 配置死信队列的参数
                                    arguments = {
                                        @Argument(name = "x-dead-letter-exchange", value = "target_exchange"),
                                        @Argument(name = "x-dead-letter-routing-key", value = "target_routing_key")
                                    }),
                    // 交换机配置
                    exchange = @Exchange(
                                value = "rabbit_test_exchange",
                                durable = "true",
                                type = ExchangeTypes.TOPIC),
                        key = "rabbit.test.*")},
                    // 可以指定容器工厂,默认使用 rabbitListenerContainerFactory
                    containerFactory = "rabbitListenerContainerFactory",
                    // 指定消费者的线程数量,一个线程会打开一个Channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些
                    concurrency = "5-10")
public void handleMessage(@Payload String message,
                            @Headers Map<String,Object> headers,
                            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                            // 消费者标签,用来区分多个消费者
                            @Header(AmqpHeaders.CONSUMER_TAG) String consumerTag,
                              // message_id
                            @Header(value = AmqpHeaders.MESSAGE_ID, required = false) String messageId,
                            // 该消息是否多次(>1)交付
                            @Header("amqp_redelivered") boolean redelivered) {
    System.out.println("====消费消息===handleMessage");
    System.out.println("消息:" + message);
    System.out.println("头:" + headers);
    System.out.println("deliveryTag:" + deliveryTag);
    System.out.println("messageId:" + messageId);
    System.out.println("redelivered:" + redelivered);
}

@RabbitListener@RabbitHandler 搭配使用

  • @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用。
  • @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 注解的方法进行分发处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

消费者:

@Component
@RabbitListener(queues = "hello")
public class consumer {

    @RabbitHandler
    public void receive(byte[] msg) {
        System.out.println("byte[] 消费者消费信息:" + new String(msg));
    }
    @RabbitHandler
    public void receive(String msg) {
        System.out.println("String 消费者消费信息:" + msg);
    }
    @RabbitHandler
    public void receive(Integer msg) {
        System.out.println("Integer 消费者消费信息:" + msg);
    }
    @RabbitHandler
    public void receive(TestA a) {
        System.out.println("对象 TestA 消费者消费信息:" + a.toString());
    }
}

class TestA implements Serializable {
    String fieldA;
    String fieldB;
    // 省略 Get Set ToString
}

生产者:

@Test
public void demo_06_Producer() {
    String routingKey = "hello";
    String str = FBI OPEN THE DOOR";
    Integer i = 102424;
    TestA a = new TestA();
    a.setFieldA("FBI WARNING");
    // rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.convertAndSend(routingKey, a);
    rabbitTemplate.convertAndSend(routingKey, i);
    rabbitTemplate.convertAndSend(routingKey, str);
    System.out.println("发送成功");
}

输出:

对象 TestA 消费者消费信息:”FBI WARNING

Integer 消费者消费信息:102424

String 消费者消费信息:FBI OPEN THE DOOR

这里生产者在发送消息时,调用了 RabbitTemplate 中的 convertAndSend 方法会使用 MessageConvert 对 TestA 对象进行消息的序列化,其默认的实现也是 SimpleMessageConverter

为您推荐与 rabbitmq 相关的帖子:

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册