【MQ系列】SprigBoot + RabbitMq发送消息基本使用姿势

栏目: IT技术 · 发布时间: 5年前

内容简介:前面两篇博文,分别介绍了RabbitMq的核心知识点,以及整合SpringBoot的demo应用;接下来也该进入正题,看一下SpringBoot的环境下,如何玩转rabbitmq本篇内容主要为消息发送,包括以下几点我们借助

前面两篇博文,分别介绍了RabbitMq的核心知识点,以及整合SpringBoot的demo应用;接下来也该进入正题,看一下SpringBoot的环境下,如何玩转rabbitmq

本篇内容主要为消息发送,包括以下几点

RabbitTemplate
AbstractMessageConverter

I. 基本使用姿势

1. 配置

我们借助 SpringBoot 2.2.1.RELEASE + rabbitmq 3.7.5 来完整项目搭建与测试

项目pom.xml如下

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件 application.yml 内容如下

spring:
  rabbitmq:
    virtual-host: /
    username: admin
    password: admin
    port: 5672
    host: 127.0.0.1

2. 配置类

通过前面rabbitmq的知识点学习,我们可以知道发送端,将消息发送给exchange,然后根据不同的策略分发给对应的queue

本篇博文主要讨论的是消息发送,所以定义一个topic模式的exchange,并绑定一个的queue;(对发送端而言,不通过的exchange类型,对使用姿势影响不大)

public class MqConstants {

    public static final String exchange = "topic.e";

    public static final String routing = "r";

    public final static String queue = "topic.a";

}

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.exchange);
    }

    @Bean
    public Queue queue() {
        // 创建一个持久化的队列
        return new Queue(MqConstants.queue, true);
    }

    @Bean
    public Binding binding(TopicExchange topicExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

3. 消息发送

消息发送,主要借助的是 RabbitTemplate#convertAndSend 方法来实现,通常情况下,我们直接使用即可

@Service
public class BasicPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    private String publish2mq1(String ans) {
        String msg = "Durable msg = " + ans;
        System.out.println("publish: " + msg);
        rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg;
    }
}

上面的核心点就一行 rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);

  • 表示将msg发送给指定的exchange,并设置消息的路由键

通过上面的方式,发送的消息默认是持久化的,当持久化的消息,分发到持久化的队列时,会有消息的落盘操作;在某些场景下,我们对消息的要求并没有那么严格,反而更在意mq的性能,丢失一些数据也可以接收;这个时候我们可能需要定制一下发送的消息属性

下面提供两种姿势,推荐第二种

/**
 * 推送一个非持久化的消息,这个消息推送到持久化的队列时,mq重启,这个消息会丢失;上面的持久化消息不会丢失
 *
 * @param ans
 * @return
 */
private String publish2mq2(String ans) {
    MessageProperties properties = new MessageProperties();
    properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
    Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);

    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);

    System.out.println("publish: " + message);
    return message.toString();
}


private String publish2mq3(String ans) {
    String msg = "Define msg = " + ans;
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setHeader("ta", "测试");
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        }
    });

    return msg;
}

【MQ系列】SprigBoot + RabbitMq发送消息基本使用姿势

注意

  • 在实际的项目开发中,推荐使用 MessagePostProcessor 来定制消息属性,其次不推荐每次发送消息时都创建一个 MessagePostProcessor 对象,请定义一个通用的对象

4. 非序列化对象发送异常case

通过查看 rabbitTemplate#convertAndSend 的接口定义,我们知道发送的消息可以是Object类型,那么是不是任何对象,都可以推送给mq呢?

下面是一个测试case

private String publish2mq4(String ans) {
    NonSerDO nonSerDO = new NonSerDO(18, ans);
    System.out.println("publish: " + nonSerDO);
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
    return nonSerDO.toString();
}


@Data
public static class NonSerDO {
    private Integer age;
    private String name;

    public NonSerDO(int age, String name) {
        this.age = age;
        this.name = name;
    }
}

当我们调用上面的 publish2mq4 方法时,会抛出一个参数类型异常

【MQ系列】SprigBoot + RabbitMq发送消息基本使用姿势

为什么会出现这个问题呢?从堆栈分析,我们知道RabbitTemplate默认是利用 SimpleMessageConverter 来实现封装Message逻辑的,核心代码为

// 下面代码来自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
	byte[] bytes = null;
	if (object instanceof byte[]) {
		bytes = (byte[]) object;
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
	}
	else if (object instanceof String) {
		try {
			bytes = ((String) object).getBytes(this.defaultCharset);
		}
		catch (UnsupportedEncodingException e) {
			throw new MessageConversionException(
					"failed to convert to Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		messageProperties.setContentEncoding(this.defaultCharset);
	}
	else if (object instanceof Serializable) {
		try {
			bytes = SerializationUtils.serialize(object);
		}
		catch (IllegalArgumentException e) {
			throw new MessageConversionException(
					"failed to convert to serialized Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
	}
	if (bytes != null) {
		messageProperties.setContentLength(bytes.length);
		return new Message(bytes, messageProperties);
	}
	throw new IllegalArgumentException(getClass().getSimpleName()
			+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}

上面逻辑很明确的指出了, 只接受byte数组,string字符串,可序列化对象(这里使用的是jdk的序列化方式来实现对象和byte数组之间的互转)

自然而然的,我们会想有没有其他的 MessageConverter 来友好的支持任何类型的对象

5. 自定义MessageConverter

接下来我们希望通过自定义一个json序列化方式的MessageConverter来解决上面的问题

一个比较简单的实现(利用FastJson来实现序列化/反序列化)

public static class SelfConverter extends AbstractMessageConverter {
    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        messageProperties.setContentType("application/json");
        return new Message(JSON.toJSONBytes(object), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return JSON.parse(message.getBody());
    }
}

重新定义一个 rabbitTemplate ,并设置它的消息转换器为自定义的 SelfConverter

@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new SelfConverter());
    return rabbitTemplate;
}

然后再次测试一下

@Service
public class JsonPublisher {
    @Autowired
    private RabbitTemplate jsonRabbitTemplate;
      
    private String publish1(String ans) {
        Map<String, Object> msg = new HashMap<>(8);
        msg.put("msg", ans);
        msg.put("type", "json");
        msg.put("version", 123);
        System.out.println("publish: " + msg);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg.toString();
    }

    private String publish2(String ans) {
        BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans);
        System.out.println("publish: " + nonSerDO);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
        return nonSerDO.toString();
    }
}

mq内接收到的推送消息如下

【MQ系列】SprigBoot + RabbitMq发送消息基本使用姿势

6. Jackson2JsonMessageConverter

上面虽然实现了Json格式的消息转换,但是比较简陋;而且这么基础通用的功能,按照Spring全家桶的一贯作风,肯定是有现成可用的,没错,这就是 Jackson2JsonMessageConverter

所以我们的使用姿势也可以如下

//定义RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}


// 测试代码
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
    Map<String, Object> msg = new HashMap<>(8);
    msg.put("msg", ans);
    msg.put("type", "jackson");
    msg.put("version", 456);
    System.out.println("publish: " + msg);
    jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
    return msg.toString();
}

下面是通过Jackson序列化消息后的内容,与我们自定义的有一些不同,多了 headerscontent_encoding

【MQ系列】SprigBoot + RabbitMq发送消息基本使用姿势

7. 小结

本篇博文主要的知识点如下

RabbitTemplate#convertAndSend
MessagePostProcessor
SimpleMessageConverter
MessageConverter

在RabbitMq的知识点博文中,明确提到了,为了确保消息被brocker正确接收,提供了消息确认机制和事务机制两种case,那么如果需要使用这两种方式,消息生产者需要怎么做呢?

限于篇幅,下一篇博文将带来在消息确认机制/事务机制下的发送消息使用姿势

II. 其他

0. 系列博文&项目源码

系列博文

项目源码

1. 一灰灰Blog

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

【MQ系列】SprigBoot + RabbitMq发送消息基本使用姿势

打赏 如果觉得我的文章对您有帮助,请随意打赏。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

A Philosophy of Software Design

A Philosophy of Software Design

John Ousterhout / Yaknyam Press / 2018-4-6 / GBP 14.21

This book addresses the topic of software design: how to decompose complex software systems into modules (such as classes and methods) that can be implemented relatively independently. The book first ......一起来看看 《A Philosophy of Software Design》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具