内容简介:前面两篇博文,分别介绍了RabbitMq的核心知识点,以及整合SpringBoot的demo应用;接下来也该进入正题,看一下SpringBoot的环境下,如何玩转rabbitmq本篇内容主要为消息发送,包括以下几点我们借助
前面两篇博文,分别介绍了RabbitMq的核心知识点,以及整合SpringBoot的demo应用;接下来也该进入正题,看一下SpringBoot的环境下,如何玩转rabbitmq
本篇内容主要为消息发送,包括以下几点
RabbitTemplate AbstractMessageConverter
I. 基本使用姿势
1. 配置
我们借助 SpringBoot 2.2.1.RELEASE
+ rabbitmq 3.7.5
来完整项目搭建与测试
项目pom.xml如下
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置文件 application.yml
内容如下
spring: rabbitmq: virtual-host: / username: admin password: admin port: 5672 host: 127.0.0.1
2. 配置类
通过前面rabbitmq的知识点学习,我们可以知道发送端,将消息发送给exchange,然后根据不同的策略分发给对应的queue
本篇博文主要讨论的是消息发送,所以定义一个topic模式的exchange,并绑定一个的queue;(对发送端而言,不通过的exchange类型,对使用姿势影响不大)
public class MqConstants { public static final String exchange = "topic.e"; public static final String routing = "r"; public final static String queue = "topic.a"; } @Configuration public class MqConfig { @Bean public TopicExchange topicExchange() { return new TopicExchange(MqConstants.exchange); } @Bean public Queue queue() { // 创建一个持久化的队列 return new Queue(MqConstants.queue, true); } @Bean public Binding binding(TopicExchange topicExchange, Queue queue) { return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
3. 消息发送
消息发送,主要借助的是 RabbitTemplate#convertAndSend
方法来实现,通常情况下,我们直接使用即可
@Service public class BasicPublisher { @Autowired private RabbitTemplate rabbitTemplate; /** * 一般的用法,推送消息 * * @param ans * @return */ private String publish2mq1(String ans) { String msg = "Durable msg = " + ans; System.out.println("publish: " + msg); rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg); return msg; } }
上面的核心点就一行 rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
- 表示将msg发送给指定的exchange,并设置消息的路由键
通过上面的方式,发送的消息默认是持久化的,当持久化的消息,分发到持久化的队列时,会有消息的落盘操作;在某些场景下,我们对消息的要求并没有那么严格,反而更在意mq的性能,丢失一些数据也可以接收;这个时候我们可能需要定制一下发送的消息属性
下面提供两种姿势,推荐第二种
/** * 推送一个非持久化的消息,这个消息推送到持久化的队列时,mq重启,这个消息会丢失;上面的持久化消息不会丢失 * * @param ans * @return */ private String publish2mq2(String ans) { MessageProperties properties = new MessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties); rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message); System.out.println("publish: " + message); return message.toString(); } private String publish2mq3(String ans) { String msg = "Define msg = " + ans; rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("ta", "测试"); message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; } }); return msg; }
注意
- 在实际的项目开发中,推荐使用
MessagePostProcessor
来定制消息属性,其次不推荐每次发送消息时都创建一个MessagePostProcessor
对象,请定义一个通用的对象
4. 非序列化对象发送异常case
通过查看 rabbitTemplate#convertAndSend
的接口定义,我们知道发送的消息可以是Object类型,那么是不是任何对象,都可以推送给mq呢?
下面是一个测试case
private String publish2mq4(String ans) { NonSerDO nonSerDO = new NonSerDO(18, ans); System.out.println("publish: " + nonSerDO); rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO); return nonSerDO.toString(); } @Data public static class NonSerDO { private Integer age; private String name; public NonSerDO(int age, String name) { this.age = age; this.name = name; } }
当我们调用上面的 publish2mq4
方法时,会抛出一个参数类型异常
为什么会出现这个问题呢?从堆栈分析,我们知道RabbitTemplate默认是利用 SimpleMessageConverter
来实现封装Message逻辑的,核心代码为
// 下面代码来自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage @Override protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { bytes = (byte[]) object; messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES); } else if (object instanceof String) { try { bytes = ((String) object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "failed to convert to Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException e) { throw new MessageConversionException( "failed to convert to serialized Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT); } if (bytes != null) { messageProperties.setContentLength(bytes.length); return new Message(bytes, messageProperties); } throw new IllegalArgumentException(getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); }
上面逻辑很明确的指出了, 只接受byte数组,string字符串,可序列化对象(这里使用的是jdk的序列化方式来实现对象和byte数组之间的互转)
自然而然的,我们会想有没有其他的 MessageConverter
来友好的支持任何类型的对象
5. 自定义MessageConverter
接下来我们希望通过自定义一个json序列化方式的MessageConverter来解决上面的问题
一个比较简单的实现(利用FastJson来实现序列化/反序列化)
public static class SelfConverter extends AbstractMessageConverter { @Override protected Message createMessage(Object object, MessageProperties messageProperties) { messageProperties.setContentType("application/json"); return new Message(JSON.toJSONBytes(object), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { return JSON.parse(message.getBody()); } }
重新定义一个 rabbitTemplate
,并设置它的消息转换器为自定义的 SelfConverter
@Bean public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new SelfConverter()); return rabbitTemplate; }
然后再次测试一下
@Service public class JsonPublisher { @Autowired private RabbitTemplate jsonRabbitTemplate; private String publish1(String ans) { Map<String, Object> msg = new HashMap<>(8); msg.put("msg", ans); msg.put("type", "json"); msg.put("version", 123); System.out.println("publish: " + msg); jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg); return msg.toString(); } private String publish2(String ans) { BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans); System.out.println("publish: " + nonSerDO); jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO); return nonSerDO.toString(); } }
mq内接收到的推送消息如下
6. Jackson2JsonMessageConverter
上面虽然实现了Json格式的消息转换,但是比较简陋;而且这么基础通用的功能,按照Spring全家桶的一贯作风,肯定是有现成可用的,没错,这就是 Jackson2JsonMessageConverter
所以我们的使用姿势也可以如下
//定义RabbitTemplate @Bean public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } // 测试代码 @Autowired private RabbitTemplate jacksonRabbitTemplate; private String publish3(String ans) { Map<String, Object> msg = new HashMap<>(8); msg.put("msg", ans); msg.put("type", "jackson"); msg.put("version", 456); System.out.println("publish: " + msg); jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg); return msg.toString(); }
下面是通过Jackson序列化消息后的内容,与我们自定义的有一些不同,多了 headers
和 content_encoding
7. 小结
本篇博文主要的知识点如下
RabbitTemplate#convertAndSend MessagePostProcessor SimpleMessageConverter MessageConverter
在RabbitMq的知识点博文中,明确提到了,为了确保消息被brocker正确接收,提供了消息确认机制和事务机制两种case,那么如果需要使用这两种方式,消息生产者需要怎么做呢?
限于篇幅,下一篇博文将带来在消息确认机制/事务机制下的发送消息使用姿势
II. 其他
0. 系列博文&项目源码
系列博文
项目源码
- 工程: https://github.com/liuyueyi/spring-boot-demo
- 源码: https://github.com/liuyueyi/spring-boot-demo/tree/master/spring-boot/301-rabbitmq-publish
1. 一灰灰Blog
尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激
下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
- 一灰灰Blog个人博客 https://blog.hhui.top
- 一灰灰Blog-Spring专题博客 http://spring.hhui.top
打赏 如果觉得我的文章对您有帮助,请随意打赏。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 强大的姿势感知模型用于姿势不变的人脸识别
- 从姿势到图像——基于人体姿势引导的时尚图像生成算法
- SpringBoot系列(十四)集成邮件发送服务及邮件发送的几种方式
- Linux如何用脚本监控Oracle发送警告日志ORA-报错发送邮件
- 行人重识别告别辅助姿势信息,港中文、商汤等提出姿势无关的特征提取GAN
- Android 快速发送邮件
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Tomcat与Java Web开发技术详解
孙卫琴 / 电子工业出版社 / 2004-4-1 / 45.00元
《Tomcat与Java Web开发技术详解》编辑推荐:Jakarta Tomcat服务器是在SUN公司的JSWDK(JavaServer Web DevelopmentKit,SUN公司推出的小型Servlet/JSP调试工具)的基础上发展起来的一个优秀的Java Web应用容器,它是Apache-Jakarta的一个子项目。Tomcat被JavaWorld杂志的编辑选为2001年度最具创新的J......一起来看看 《Tomcat与Java Web开发技术详解》 这本书的介绍吧!