rabbit mq

栏目: IT技术 · 发布时间: 4年前

内容简介:RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。 工作原理:现在索引中找到对应的值,然后根据匹配的索引记录找到对应的数据行。场景:用户注册后,需要发注册邮件和注册短信。 传统方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。

1.入门

1.简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。 工作原理:现在索引中找到对应的值,然后根据匹配的索引记录找到对应的数据行。 select *from student where id = 5 ;先在索引上按id=5查找,然后返回包含该值的数据行。

2.应用场景

异步处理

场景:用户注册后,需要发注册邮件和注册短信。 传统方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理。 应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。缺点:当库存系统出现故障时,订单就会失败;订单系统和库存系统高耦合。

消息队列:订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存系统:订阅下单的消息,获取下单消息,进行库操作。

流量削峰 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 消息队列:1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.2.秒杀业务根据消息队列中的请求信息,再做后续处理。

3.架构

rabbit mq

Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息的载体,每个消息都会被投到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。

Producer:消息生产者,就是投递消息的程序。

Consumer:消息消费者,就是接受消息的程序。

Channel:消息通道,在客户端的每个连接里,可建立多个channel。

2.特性

1.任务分发机制

Round-robin(轮询分发)

在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。 RabbbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在load加重,那么只需要创建更多的Consumer来进行任务处理。 Fair dispatch(公平分发)

轮训不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。可能存在有些服务很忙依然分发给它,有些很轻松却没有任务。为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

2.消息应答

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ可以删除它了。

如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会不丢失任何消息了。

没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

消息应答是默认打开的。我们明确地把它们关掉了(autoAck=true)。现在将应答打开,一旦我们完成任务,消费者会自动发送消息应答。

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

3.消息持久化

当消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍将失去!当RabbitMQ退出或者崩溃,将会丢失队列和消息。除非你不要队列和消息。两件事儿必须保证消息不被丢失:我们必须把“队列”和“消息”设为持久化。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

4.发布/订阅

生产者只能发送消息给Exchanges(转发器),转发器是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过转发器的类型进行定义。

类型有:Direct、Topic、Headers和Fanout。

channel.exchangeDeclare("logs", "fanout");

转发器转发消息到队列。关联转发器和队列的叫Binding。

channel.queueBind(queueName, "logs", "");

Direct exchange(直接转发) 准确匹配 Topic exchange(主题转发器) 模糊匹配

(星号)可以代替任意一个标识符 ;#(井号)可以代替零个或多个标识符。

.orange. ”,Q2绑定键是“ .*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而”lazy.brown.fox”则只发往第二个队列。 “quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

3.代码

producer:

String EXCHANGE_NAME = "direct_logs";
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
 
		//所有日志严重性级别
		String[] severities={"error","info","warning"};
		for(int i=0;i<3;i++){
			String severity = severities[i%3];//每一次发送一条不同严重性的日志
			
			// 发送的消息
			String message = "Hello World"+Strings.repeat(".", i+1);
			//参数1:exchange name
			//参数2:routing key
			channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
			system.out.println(message);
		}
		// 关闭频道和连接
		channel.close();

consumer:

ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();
 
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    
	    String severity="error";//只关注error级别的日志,然后记录到文件中去。
	    channel.queueBind(queueName, EXCHANGE_NAME, severity);
	    
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    system.out.println(message);
			  }
			};
			channel.basicConsume(queueName, true, consumer);

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据密集型应用系统设计

数据密集型应用系统设计

Martin Kleppmann / 赵军平、李三平、吕云松、耿煜 / 中国电力出版社 / 2018-9-1 / 128

全书分为三大部分: 第一部分,主要讨论有关增强数据密集型应用系统所需的若干基本原则。首先开篇第1章即瞄准目标:可靠性、可扩展性与可维护性,如何认识这些问题以及如何达成目标。第2章我们比较了多种不同的数据模型和查询语言,讨论各自的适用场景。接下来第3章主要针对存储引擎,即数据库是如何安排磁盘结构从而提高检索效率。第4章转向数据编码(序列化)方面,包括常见模式的演化历程。 第二部分,我们将......一起来看看 《数据密集型应用系统设计》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具