内容简介:这个 handleMessage 方法名要根据运行之前的测试用例 testSendMessage ,handleMessage 方法进行消息的消费将上面的 messageContainer 修改成如下的
代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下 复制代码
1.简单使用默认方法
修改上一节 SpringAMQP 消息容器 - SimpleMessageListenerContainer 的 RabbitMQConfig 的 messageContainer 方法 复制代码
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //监听的队列
container.setConcurrentConsumers(1); //当前的消费者数量
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); //是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 1.1 适配器方式. 默认是有自己的方法名字的:handleMessage
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
container.setMessageListener(adapter);
return container;
}
复制代码
public class MessageDelegate {
private static final Logger log = LoggerFactory.getLogger(MessageDelegate.class);
//这个handleMessage方法名要根据org.springframework.amqp.rabbit.listener.adapter包下的
// MessageListenerAdapter.ORIGINAL_DEFAULT_LISTENER_METHOD的默认值来确定
public void handleMessage(byte[] messageBody) {
log.info("默认方法, 消息内容:" + new String(messageBody));
}
}
复制代码
这个 handleMessage 方法名要根据 org.springframework.amqp.rabbit.listener.adapter 包下的 MessageListenerAdapter.ORIGINAL_DEFAULT_LISTENER_METHOD 的默认值来确定,源码如下
运行之前的测试用例 testSendMessage ,handleMessage 方法进行消息的消费
2.采用自己指定一个方法的名字
将上面的 messageContainer 修改成如下的
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //监听的队列
container.setConcurrentConsumers(1); //当前的消费者数量
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); //是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//1.2 适配器方式. 可以自己指定一个方法的名字: consumeMessage
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
container.setMessageListener(adapter);
return container;
}
复制代码
MessageDelegate 里面的消费方法改成 consumeMessage
public void consumeMessage(byte[] messageBody) {
log.info("字节数组方法, 消息内容:" + new String(messageBody));
}
复制代码
继续运行 testSendMessage, 查看到消费
3. 添加一个转换器,从字节数组转换为 String
//1.3 适配器方式.也可以添加一个转换器: 从字节数组转换为String
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
复制代码
toMessage 就是 Java 对象转换为 Message,fromMessage 就是 Message 转为 Java 对象
将上面的 messageContainer 修改成如下的
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //监听的队列
container.setConcurrentConsumers(1); //当前的消费者数量
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); //是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//1.3 适配器方式.也可以添加一个转换器: 从字节数组转换为String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
复制代码
关键点,这里使用的不再是字节数组了!!
//1.3 适配器方式.也可以添加一个转换器: 从字节数组转换为String
public void consumeMessage(String messageBody) {
log.info("字符串方法, 消息内容:" + messageBody);
}
复制代码
写个单元测试用例,注意 contentType 要包含 text !!
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
}
复制代码
运行单元测试
4. 队列名称 和 方法名称 也可以进行一一的匹配
将上面的 messageContainer 修改成如下的
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //监听的队列
container.setConcurrentConsumers(1); //当前的消费者数量
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); //是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
return container;
}
复制代码
// 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
public void method1(String messageBody) {
log.info("method1 收到消息内容:" + new String(messageBody));
}
public void method2(String messageBody) {
log.info("method2 收到消息内容:" + new String(messageBody));
}
复制代码
看一下之前建立的绑定关系
修改一下单元测试用例
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.send("topic002", "rabbit.abc", message);
}
复制代码
运行测试, 查看到两个队列的消费
综上,通过上面 MessageListenerAdapter 的使用代码,我们可以看出它有如下核心属性
-
defaultListenerMethod 默认监听方法名称:用于设置监听方法的名称
-
delegate 委派对象: 实际真实的委派对象,用于处理消息
-
queueOrTagMethodName 队列标识于方法名称组成的集合。
-
可以一一进行队列于方法名称的匹配。
-
队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Mastering Regular Expressions, Second Edition
Jeffrey E F Friedl / O'Reilly Media / 2002-07-15 / USD 39.95
Regular expressions are an extremely powerful tool for manipulating text and data. They have spread like wildfire in recent years, now offered as standard features in Perl, Java, VB.NET and C# (and an......一起来看看 《Mastering Regular Expressions, Second Edition》 这本书的介绍吧!