内容简介:随着微服务概念发展,大应用逐步拆分为小应用,提高开发效率,专门的人做专门的事情,逐渐的流行起来。在微服务上实现通信的方式大部分是采用rpc方式,也有升级版本的grpc。还有另外一种实现就是使用mq来进行解耦。
随着微服务概念发展,大应用逐步拆分为小应用,提高开发效率,专门的人做专门的事情,逐渐的流行起来。
在微服务上实现通信的方式大部分是采用rpc方式,也有升级版本的grpc。
还有另外一种实现就是使用mq来进行解耦。
今天初识mq,快速入门先,准备一个环境实现案例,该文涉及以下内容:
- 安装rabbitmq
- mq能解决的问题
- 实战演练
安装
rabbitmq的安装我们采用 docker 的方式,docker方便我们快速的实现rabbitmq的安装,不需要再对安装mq进行头疼。
docker 的两种方式
docker方式
//拉取mq镜像 docker pull rabbitmq //启动mq docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9 复制代码
说明:
- -d 后台运行容器;
- --name 指定容器名;
- -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
- -v 映射目录或文件;
- --hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
- -e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
docker-compose 方式
version: "3" services: rabbit: image: docker.infervision.com/library/rabbitmq:3-management ports: - "4369:4369" - "5671:5671" - "5672:5672" - "15671:15671" - "15672:15672" restart: always environment: - RABBITMQ_DEFAULT_USER=test - RABBITMQ_DEFAULT_PASS=test volumes: - /home/ruiqi/Desktop/disk/rabbitmq:/var/lib/rabbitmq container_name: rabbitmq 在该文件目录下执行:docker-compose up -d 复制代码
下载的rabbitmq内置管理界面,ip:15672 用户名与密码是我们在启动是写入的。
mq能解决什么?
通俗的来说,主要使用MQ来解决以下三个问题。
异步消息
在业务中,经常会遇到同时发送邮件,短信或者其他通知内容服务。业务初期,采用同步或者异步处理方式都需要等发送完毕后再返回给客户端。中间有一定的延迟
业务增长后,此方式系统性能就会造成很大的浪费。采用消息队列,将这几个服务进行解耦,只需将消息内容发送到消息队列中,降低用户的等待时间,体验效果比原先好很多。
应用间解耦
同一个服务中可能需要其他服务的配合才能完成一项业务操作.还是拿常见的购物案例来说明。
在京东下单支付后,消息要通知到商家,邮件通知用户已经购买某商品。
如果这两种操作都采用同步执行,用户等待时间会变长。
采用mq方式之后,订单系统将消息持久化到mq上,返回给用户下单成功。
- 商家接收到用户的下单信息,进行处理,如果有库存管理那么需要进行库存处理。
- 邮件通知用户,告知用户下单成功。
mq保证消息的可靠投递,不会导致消息丢失,保证消息的高可靠性。如果库存出现失败也不会导致用户下单失败的情况,可以重新进行投递。
流量削峰
流量削峰,一般是同一时间涌进来很多请求,后台处理不过来。那么需要采用削峰方式来处理。
简单来说是通过一个队列承接瞬时过来流量洪峰,在消费端平滑的将消息推送出去,如果消费者消费不及时可以将消息内容持久化在队列中,消息不存在丢失。
- 消费端不及时进行消费,还可以动态的扩增消费者数量,提高消费速度。
- 设定相关的阀值,多余的消息直接丢弃,告知用户秒杀失败等业务消息内容。
实战案例
本文是按照 Java 语言进行,使用Spring boot搭建,包管理工具Gradle。
导入rabbitmq jar包
compile("org.springframework.boot:spring-boot-starter-amqp:1.5.10.RELEASE") 复制代码
配置mq
yaml 文件配置
spring: rabbitmq: host: 192.168.110.5 port: 5672 username: tuixiang password: tuixiang 复制代码
准备好模板类,供后面直接使用
package com.infervision.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author: fruiqi * @date: 19-2-18 下午2:42 * @version:1.0 rabbit配置 **/ @Configuration public class RabbitConfig { /** * 日志 **/ private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); @Value("${spring.rabbitmq.username}") String userName; @Value("${spring.rabbitmq.password}") String userPassword; @Value("${spring.rabbitmq.host}") String host; @Value("${spring.rabbitmq.port}") Integer port; /** * 注入 * * @param * @return com.rabbitmq.client.Connection * @author fruiqi * @date 19-1-22 下午5:41 **/ @Bean public ConnectionFactory getConnection() throws Exception { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setUsername(userName); factory.setPassword(userPassword); factory.setHost(host); factory.setPort(port); return factory; } /** * 创建制定的 监听容器 * * @param queueName 监听的队列名字 * @param listenerChannel 设置是否将监听的频道 公开给已注册的 * @param PrefetchCount 告诉代理一次请求多少条消息过来 * @param ConcurrentConsumers 制定创建多少个并发的消费者数量 * @param acknowledgeMode 消息确认模式 * @param listener 监听器 * @return org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer **/ public SimpleMessageListenerContainer setSimpleMessageListenerContainer(String queueName, boolean listenerChannel, int PrefetchCount, int ConcurrentConsumers, AcknowledgeMode acknowledgeMode, ChannelAwareMessageListener listener) throws Exception { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnection()); container.setQueueNames(queueName); container.setExposeListenerChannel(listenerChannel); container.setPrefetchCount(PrefetchCount); container.setConcurrentConsumers(ConcurrentConsumers); container.setAcknowledgeMode(acknowledgeMode); container.setMessageListener(listener); return container; } } package com.infervision.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author: fruiqi * @date: 19-2-18 下午2:51 * @version:1.0 **/ @Component public class MsgSender { private static final Logger logger = LoggerFactory.getLogger(MsgSender.class); @Autowired private RabbitTemplate rabbitTemplate; /** * @param exchange 交换机名称 * @param routingKey 路由名称 * @param message 消息内容 * @return void * @description //TODO 发送消息到消息队列中 **/ public void sendMsg(String exchange, String routingKey, Object message) { try { rabbitTemplate.convertAndSend(exchange,routingKey,message); }catch (Exception e){ logger.error("[ERROR] send statistic message error ",e); } } } 复制代码
实例链接mq
在使用rabbitmq 有的时候需要自己客户端创建queue,但有的时候并不是自己创建,在rabbitmq页面上进行创建queue,其他消费者直接引用。
客户端创建mq
//初始化队列,如果队列已存在,则不作任何处理 如果有权限控制如下操作并不能实现 @Bean public Queue dicomQueue() { return new Queue(getMacPreStr(DICOM_QUEUE_NAME)); } //初始化交换机 @Bean public Exchange topicExchange() { return ExchangeBuilder.topicExchange((DEFAULT_TOPIC_EXCHANGE).durable(true).build(); } // 将队列与交换机按照路由规则进行绑定 @Bean Binding bindingExchangeDicomQueue(Queue dicomQueue, TopicExchange topicExchange) { return BindingBuilder.bind(dicomQueue).to(topicExchange).with(DICOM_QUEUE_ROUTING_KEY); } 复制代码
使用
队列的使用:一个是发送,属于生产者;一个是监听,属于消费者.
生产者实现
在mq配置模板类中,专门实现了一个发送类,发送文件内容,直接调用发送接口即可。
@Autowired RabbitService rabbitService; /** * 练习 发送数据到 mq中 * 1. 发送的数据会到 mq中 * 2. 我们配置的 listener 是用来消费消息的 * 3. 客户端配置 可以参考 RabbitClientConfig * @param name 名字编号 * @param vo 实体内容 * @return: com.infervision.model.NameVo */ @ApiOperation(value = "增加name信息", notes = "实体信息") @PostMapping(value = "/{name}") @ApiImplicitParam(paramType = "query", name = "name", value = "用户名字", required = true, dataType = "string") public NameVo addNameVo(@RequestParam String name, @RequestBody NameVo vo) { rabbitService.sendMessage(DEFAULT_TOPIC_TEST_EXCHANGE, LABEL_FIEL_XML_QUEUE_ROUTING_KEY, JSON.toJSONString(vo)); return vo; } @Service public class RabbitServiceImpl implements RabbitService { @Autowired MsgSender msgSender; /** * 尝试发送 message 到mq中 * @param message * @return: void */ @Override public void sendMessage(String exchange, String routingKey,String message) { msgSender.sendMsg(exchange, routingKey, message); } } 复制代码
消费者实现
消费者实现有两种方式,一种通过注解的方式监听,一种是实现ChannelAwareMessageListener类来实现消费。
注解实现监听
//在方法上进行注入。配置工厂帮助提高单个消费者一次性消费的消息数量,设置多少个消费者,用来提高程序的性能 @RabbitListener(queues = "dicom.queue",containerFactory = "multipleConsumerContainerFactory") public void processDicomMessage(Message message, Channel channel) { logger.info(message); } // 工厂可以在配置模板类中中配置好。 @Bean("multipleConsumerContainerFactory") public SimpleRabbitListenerContainerFactory multipleConsumerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setPrefetchCount(50); factory.setConcurrentConsumers(10); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, connectionFactory); return factory; } 复制代码
实现接口方式
/** * 创建监听器。 * @author fruiqi * @date 19-2-11 下午4:18 * @param labelStatisticsListener 监听器 * 调用我们公用的方法 **/ @Bean public SimpleMessageListenerContainer mqMessageContainer(LabelStatisticsListener labelStatisticsListener) throws Exception { SimpleMessageListenerContainer container = rabbitConfig.setSimpleMessageListenerContainer(“queue_name”, true, rabbitProperties.getMaximumDelivery(), rabbitProperties.getConsumer(), AcknowledgeMode.MANUAL, labelStatisticsListener); return container; } @Component public class LabelStatisticsListener implements ChannelAwareMessageListener { private static final Logger logger = LoggerFactory.getLogger(LabelStatisticsListener.class); /** * 处理传输过来的数据 * @param message 传送的消息内容 * @param channel 实现通道 * @return: void */ @Override public void onMessage(Message message, Channel channel) throws Exception { String mes = new String(message.getBody()); logger.info("[INFO] message is {}",mes); // 手动应答 消息已消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } } 复制代码
总结
以上内容就完成了rabbitmq 从搭建到使用全部的流程。当然里面还有更多的可以让我们去探讨,比如mq的队列模式,一个系统配置多个mq等等内容。敬请期待我们下一篇mq系列内容。
大家在系统中使用过mq吗?你们使用的mq是什么样的?可以在留言区我们一起探讨哦。
代码存放在: github中
·END·
路虽远,行则必至
本文原发于 同名微信公众号「胖琪的升级之路」,回复「1024」你懂得,给个赞呗。
微信ID:YoungRUIQ
以上所述就是小编给大家介绍的《一文看懂Rabbitmq,从安装到实战演练》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- windows 应急流程及实战演练
- Knative 入门系列7:实战演练
- 攻防演练实战中的若干Tips
- Pwn学习笔记:Defcon靶机实战演练
- 【实战演练】kubernetes&docker-kubernetes管理平台安装与使用
- 原 荐 缓存架构之史上讲的最明白的RabbitMQ可靠消息传输实战演练
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Spark SQL内核剖析
朱锋、张韶全、黄明 / 电子工业出版社 / 2018-8 / 69.00元
Spark SQL 是 Spark 技术体系中较有影响力的应用(Killer application),也是 SQL-on-Hadoop 解决方案 中举足轻重的产品。《Spark SQL内核剖析》由 11 章构成,从源码层面深入介绍 Spark SQL 内部实现机制,以及在实际业务场 景中的开发实践,其中包括 SQL 编译实现、逻辑计划的生成与优化、物理计划的生成与优化、Aggregation 算......一起来看看 《Spark SQL内核剖析》 这本书的介绍吧!
RGB转16进制工具
RGB HEX 互转工具
HEX CMYK 转换工具
HEX CMYK 互转工具