rabbit mq

栏目: IT技术 · 发布时间: 4年前

内容简介:RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。 工作原理:现在索引中找到对应的值,然后根据匹配的索引记录找到对应的数据行。场景:用户注册后,需要发注册邮件和注册短信。 传统方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。

1.入门

1.简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。 工作原理:现在索引中找到对应的值,然后根据匹配的索引记录找到对应的数据行。 select *from student where id = 5 ;先在索引上按id=5查找,然后返回包含该值的数据行。

2.应用场景

异步处理

场景:用户注册后,需要发注册邮件和注册短信。 传统方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理。 应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。缺点:当库存系统出现故障时,订单就会失败;订单系统和库存系统高耦合。

消息队列:订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存系统:订阅下单的消息,获取下单消息,进行库操作。

流量削峰 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 消息队列:1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.2.秒杀业务根据消息队列中的请求信息,再做后续处理。

3.架构

rabbit mq

Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息的载体,每个消息都会被投到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。

Producer:消息生产者,就是投递消息的程序。

Consumer:消息消费者,就是接受消息的程序。

Channel:消息通道,在客户端的每个连接里,可建立多个channel。

2.特性

1.任务分发机制

Round-robin(轮询分发)

在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。 RabbbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在load加重,那么只需要创建更多的Consumer来进行任务处理。 Fair dispatch(公平分发)

轮训不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。可能存在有些服务很忙依然分发给它,有些很轻松却没有任务。为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。

2.消息应答

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ可以删除它了。

如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会不丢失任何消息了。

没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

消息应答是默认打开的。我们明确地把它们关掉了(autoAck=true)。现在将应答打开,一旦我们完成任务,消费者会自动发送消息应答。

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

3.消息持久化

当消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍将失去!当RabbitMQ退出或者崩溃,将会丢失队列和消息。除非你不要队列和消息。两件事儿必须保证消息不被丢失:我们必须把“队列”和“消息”设为持久化。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

4.发布/订阅

生产者只能发送消息给Exchanges(转发器),转发器是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过转发器的类型进行定义。

类型有:Direct、Topic、Headers和Fanout。

channel.exchangeDeclare("logs", "fanout");

转发器转发消息到队列。关联转发器和队列的叫Binding。

channel.queueBind(queueName, "logs", "");

Direct exchange(直接转发) 准确匹配 Topic exchange(主题转发器) 模糊匹配

(星号)可以代替任意一个标识符 ;#(井号)可以代替零个或多个标识符。

.orange. ”,Q2绑定键是“ .*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而”lazy.brown.fox”则只发往第二个队列。 “quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

3.代码

producer:

String EXCHANGE_NAME = "direct_logs";
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
 
		//所有日志严重性级别
		String[] severities={"error","info","warning"};
		for(int i=0;i<3;i++){
			String severity = severities[i%3];//每一次发送一条不同严重性的日志
			
			// 发送的消息
			String message = "Hello World"+Strings.repeat(".", i+1);
			//参数1:exchange name
			//参数2:routing key
			channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
			system.out.println(message);
		}
		// 关闭频道和连接
		channel.close();

consumer:

ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();
 
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    
	    String severity="error";//只关注error级别的日志,然后记录到文件中去。
	    channel.queueBind(queueName, EXCHANGE_NAME, severity);
	    
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    system.out.println(message);
			  }
			};
			channel.basicConsume(queueName, true, consumer);

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

因计算机而强大

因计算机而强大

[美]西摩 佩珀特 Seymour Papert / 梁栋 / 新星出版社 / 2019-1 / 38

本书有两个中心主题—— 孩子可以轻松自如地学习使用计算机; 学习使用计算机能够改变他们学习其他知识的方式。 (前苹果公司总裁 约翰·斯卡利) 最有可能带来文化变革的就是计算机的不断普及。 计算机不仅是一个工具,它对我们的心智有着根本和深远的影响。 计算机不仅帮助我们学习 ,还帮助我们学习怎样学习。 计算机是一种调解人与人之间关系的移情对象。 一个数学的头脑......一起来看看 《因计算机而强大》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具