内容简介:MessageConverter 即消息转换器我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到 MessageConverter 了。自定义常用转换器: MessageConverter, 一般来说都需要实现这个接口,然后重写以下两个方法。
MessageConverter 即消息转换器
我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到 MessageConverter 了。
自定义常用转换器: MessageConverter, 一般来说都需要实现这个接口,然后重写以下两个方法。
toMessage: java 对象转换为 Message fromMessage: Message 对象转换为 Java 对象 复制代码
转换器类别:
json 转换器: jackson2JsonMessageConverter 可以进行 java 对象的转换功能 DefaultJackson2JavaTypeMapper 映射器:可以进行Java对象的映射关系 自定义二进制转换器: 比如图片类型、PDF、PPT、流媒体 复制代码
代码示例:
代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下 复制代码
1. json 转换器
先创建一个 Java 实体 Order
public class Order { private String id; private String name; private String content; public Order() { } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Order(String id, String name, String content) { this.id = id; this.name = name; this.content = content; } @Override public String toString() { return "Order{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", content='" + content + '\'' + '}'; } } 复制代码
接着在上一篇的 RabbitMQConfig 里面 配置支持 json 格式的转换器
@Bean //connectionFactory 也是要和最上面方法名保持一致 public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003()); //监听的队列 container.setConcurrentConsumers(1); //当前的消费者数量 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); //是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式 container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略 @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); //3 支持json格式的转换器 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } 复制代码
在委派 adapter 里面声明入参为 Map 的消费方法
public void consumeMessage(Map messageBody) { log.info("map方法, 消息内容:" + messageBody); } 复制代码
功能就完成了,接着写个单元测试,注意 ContentType 一定要是 json !!
@Test public void testSendJsonMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("test1001消息订单"); order.setContent("test1001订单描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); log.info("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); } 复制代码
运行单元测试,消息就被消费了
2. DefaultJackson2JavaTypeMapper 转换 Java 对象
messageContainer 修改成如下的
@Bean //connectionFactory 也是要和最上面方法名保持一致 public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003()); //监听的队列 container.setConcurrentConsumers(1); //当前的消费者数量 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); //是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式 container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略 @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); // 4 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } 复制代码
public void consumeMessage(Order order) { log.info("order对象, 消息内容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } 复制代码
@Test public void testSendJavaMessage() throws Exception { Order order = new Order(); order.setId("1002"); order.setName("test1002消息订单"); order.setContent("test1002订单描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); log.info("order java: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); //注意这里要写你的实体类路径 messageProperties.getHeaders().put("__TypeId__", "com.hmily.rabbitmqapi.spring.domain.Order"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); } 复制代码
运行单元测试
报错提示:如出现 If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
的异常提示,这是因为 Jackson 在把字节流转换为 Java 对象时发生安全提醒,粗暴的解决方式如下:
新建一个 EnableAllJackson2JavaTypeMapper 类,其继承 DefaultJackson2JavaTypeMapper 这个类,然后在这里配置允许转换哪些对象,我这是是直接允许所有。
然后在 刚才的 SimpleMessageListenerContainer
里面, new EnableAllJackson2JavaTypeMapper()
改为 new EnableAllJackson2JavaTypeMapper()
,即可。
再改进一下代码,转换 2 个 Java 对象
public class Packaged { private String id; private String name; private String description; public Packaged() { } public Packaged(String id, String name, String description) { this.id = id; this.name = name; this.description = description; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } @Override public String toString() { return "Packaged{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", description='" + description + '\'' + '}'; } } 复制代码
messageContainer 修改成如下的
@Bean //connectionFactory 也是要和最上面方法名保持一致 public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003()); //监听的队列 container.setConcurrentConsumers(1); //当前的消费者数量 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); //是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式 container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略 @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); // 5 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>(); idClassMapping.put("order", com.hmily.rabbitmqapi.spring.domain.Order.class); idClassMapping.put("packaged", com.hmily.rabbitmqapi.spring.domain.Packaged.class); javaTypeMapper.setIdClassMapping(idClassMapping); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } 复制代码
public void consumeMessage(Packaged pack) { log.info("package对象, 消息内容, id: " + pack.getId() + ", name: " + pack.getName() + ", content: "+ pack.getDescription()); } 复制代码
@Test public void testSendMappingMessage() throws Exception { ObjectMapper mapper = new ObjectMapper(); Order order = new Order(); order.setId("1001"); order.setName("1001订单消息"); order.setContent("1001订单描述信息"); String json1 = mapper.writeValueAsString(order); log.info("order java: " + json1); MessageProperties messageProperties1 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties1.setContentType("application/json"); messageProperties1.getHeaders().put("__TypeId__", "order"); Message message1 = new Message(json1.getBytes(), messageProperties1); rabbitTemplate.send("topic001", "spring.order", message1); Packaged pack = new Packaged(); pack.setId("1002"); pack.setName("1002包裹消息"); pack.setDescription("1002包裹描述信息"); String json2 = mapper.writeValueAsString(pack); log.info("pack java: " + json2); MessageProperties messageProperties2 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties2.setContentType("application/json"); messageProperties2.getHeaders().put("__TypeId__", "packaged"); Message message2 = new Message(json2.getBytes(), messageProperties2); rabbitTemplate.send("topic001", "spring.pack", message2); } 复制代码
注意这里面就不是写类的路径了,而是写刚才起的别名了
因为junitTest的关系,它发送完就关闭了,还有一条消息没被消费
可以上管控台确认一下
这时候直接运行一下我们项目 RabbitmqApiApplication ,就把刚才剩余的那条消息消费了
3. 二进制转换器
先写转换处理
public class ImageMessageConverter implements MessageConverter { private static final Logger log = LoggerFactory.getLogger(ImageMessageConverter.class); @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { log.info("-----------Image MessageConverter----------"); Object _extName = message.getMessageProperties().getHeaders().get("extName"); String extName = _extName == null ? "png" : _extName.toString(); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "G:/test/file/new/" + fileName + "." + extName; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } } 复制代码
消息接收
public void consumeMessage(File file) { log.info("文件对象 方法, 消息内容:" + file.getName()); } 复制代码
声明一个全局的转换器
@Bean //connectionFactory 也是要和最上面方法名保持一致 public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003()); //监听的队列 container.setConcurrentConsumers(1); //当前的消费者数量 container.setMaxConcurrentConsumers(5); // 最大的消费者数量 container.setDefaultRequeueRejected(false); //是否重回队列 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式 container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略 @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); // 6 ext convert MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //全局的转换器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/png", imageConverter); convert.addDelegate("image", imageConverter); PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter); return container; } 复制代码
编写单元测试来测试图片
@Test public void testSendExtConverterMessage() throws Exception { byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("image/png"); messageProperties.getHeaders().put("extName", "png"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "image_queue", message); } 复制代码
原图片的本地文件夹路径
运行测试后,图片生成到指定的目录下
这次来试试PDF
public class PDFMessageConverter implements MessageConverter { private static final Logger log = LoggerFactory.getLogger(PDFMessageConverter.class); @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { log.info("-----------PDF MessageConverter----------"); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "G:/test/file/new/" + fileName + ".pdf"; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } } 复制代码
@Test public void testSendExtConverterMessage() throws Exception { // byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png")); // MessageProperties messageProperties = new MessageProperties(); // messageProperties.setContentType("image/png"); // messageProperties.getHeaders().put("extName", "png"); // Message message = new Message(body, messageProperties); // rabbitTemplate.send("", "image_queue", message); byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "AliTech101_RD.pdf")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/pdf"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "pdf_queue", message); } 复制代码
验证 PDF 的处理是否成功了。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- flask视图函数自定义转换器
- 原 荐 转换器(Converter)设计模式
- Mybatis使用小技巧-自定义类型转换器
- 深入理解 Kafka Connect:转换器和序列化
- OpenAPI Generator 4.0.3 发布,使转换器公开访问
- js视频转字符画 —— 写一个属于自己的字符转换器
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Java多线程编程实战指南(设计模式篇)
黄文海 / 电子工业出版社 / 2015-10 / 59.00
随着CPU 多核时代的到来,多线程编程在充分利用计算资源、提高软件服务质量方面扮演了越来越重要的角色。而 解决多线程编程中频繁出现的普遍问题可以借鉴设计模式所提供的现成解决方案。然而,多线程编程相关的设计模式书籍多采用C++作为描述语言,且书中所举的例子多与应用开发人员的实际工作相去甚远。《Java多线程编程实战指南(设计模式篇)》采用Java(JDK1.6)语言和UML 为描述语言,并结合作者多......一起来看看 《Java多线程编程实战指南(设计模式篇)》 这本书的介绍吧!