内容简介:Work Queues是为了避免在当前线程立即执行耗时的操作而导致线程阻塞。我们可以把要处理的任务封装成消息,发送到消息队列。然后把消息发送到一个或多个工作线程。由工作线程负责执行耗时的操作。用字符串模拟耗时操作。消费者收到的字符串中有几个".",就睡眠几秒钟。输出如下:
Work Queues是为了避免在当前线程立即执行耗时的操作而导致线程阻塞。我们可以把要处理的任务封装成消息,发送到消息队列。然后把消息发送到一个或多个工作线程。由工作线程负责执行耗时的操作。
用字符串模拟耗时操作。消费者收到的字符串中有几个".",就睡眠几秒钟。
NewTask.java(消息发送者)
public class NewTask { private final static String QUEUE_NAME = "task"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发送4条消息,代表要执行的耗时任务 for (int i = 0; i < 4; i++) { StringBuilder msgSb = new StringBuilder("Hello World!"); for (int j = 0; j < i; j++) { msgSb.append("."); } String message = msgSb.toString(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } } } } 复制代码
Worker.java(消息接收者)
public class Worker { private final static String QUEUE_NAME = "task"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); long start = System.currentTimeMillis(); try { //执行耗时的操作 doWork(message); } finally { System.out.println(" [x] Done. Cost seconds: " + (System.currentTimeMillis() - start)/1000); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { //模拟耗时操作 Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } } 复制代码
运行NewTask.main()
输出如下:
[x] Sent 'Hello World!' [x] Sent 'Hello World!.' [x] Sent 'Hello World!..' [x] Sent 'Hello World!...' Process finished with exit code 0 复制代码
可以看到,NewTask发送完消息后,立即就退出了。
运行Worker.main()
Work收到消息后,开始执行耗时的操作。输出如下:
[*] Waiting for messages. [x] Received 'Hello World!' [x] Done. Cost seconds: 0 [x] Received 'Hello World!.' [x] Done. Cost seconds: 1 [x] Received 'Hello World!..' [x] Done. Cost seconds: 2 [x] Received 'Hello World!...' [x] Done. Cost seconds: 3 复制代码
Round-robin dispatching
如果有多个消费者接收消息会怎样?
停止掉上一步启动的Worker.main()。运行Worker.main()两次,启动了两个Worker
(edit configurations->左侧选择Worker->右上角选中“All running parallel”->ok)
运行NewTask.main()
可以看到两个Worker的输出分别如下: Worker1
[*] Waiting for messages. [x] Received 'Hello World!' [x] Done. Cost seconds: 0 [x] Received 'Hello World!..' [x] Done. Cost seconds: 2 复制代码
Worker2
[*] Waiting for messages. [x] Received 'Hello World!.' [x] Done. Cost seconds: 1 [x] Received 'Hello World!...' [x] Done. Cost seconds: 3 复制代码
可以看到,RabbitMQ按顺序依次把消息转发给了两个消费者,第一,三个消息转发给了Worker1,第二,四个消息转发给了第Worker2。
这就是使用Work Queues的好处,它还可以允许你创建多个消费者,实现并行工作。很容易进行扩展。
Message acknowledgment(消息确认)
如果Worker在执行过程中,发生了异常或者意外退出了,再启动后,未执行完的任务还会继续执行吗,验证一下。
运行NewTask.main()
消息发送完毕,输出如下:
[x] Sent 'Hello World!' [x] Sent 'Hello World!.' [x] Sent 'Hello World!..' [x] Sent 'Hello World!...' 复制代码
运行Worker.main(),在执行过程中将Worker杀掉
输出如下:
[*] Waiting for messages. [x] Received 'Hello World!' [x] Done. Cost seconds: 0 [x] Received 'Hello World!.' [x] Done. Cost seconds: 1 [x] Received 'Hello World!..' [x] Done. Cost seconds: 2 [x] Received 'Hello World!...' Process finished with exit code 130 (interrupted by signal 2: SIGINT) 复制代码
再次运行Worker.main()
输出如下:
[*] Waiting for messages. 复制代码
可见,Worker并没有收到未处理完的消息,未处理完的任务丢失了。
这是因为默认情况下,RabbitMQ在将消息发送给消费者之后就立即将消息删除了。如果Worker意外退出,不仅会丢失正在处理的任务,还会丢失已经收到的还没来得及处理的任务消息。
为了避免消息丢失,消费者可以在收到消息并处理完之后,向RabbitMQ发送一个确认消息,RabbitMQ在收到确认消息之后才会将已发送的消息删除。如果消费者意外停止,同时还有别的消费者正常工作,RabbitMQ会把未处理的消息转发给正常工作中的消费者。否则待消费者恢复后,RabbitMQ会把未处理的消息重新转发给消费者。
要达到这个效果,只需将Worker.java中autoAck设置为false。并在处理完任务后,发送确认消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); long start = System.currentTimeMillis(); try { //执行耗时的操作 doWork(message); } finally { System.out.println(" [x] Done. Cost seconds: " + (System.currentTimeMillis() - start)/1000); //发送确认消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; //关闭自动确认 boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); 复制代码
修改完之后,在Worker处理消息过程中,将Worker杀掉,再重新启动Worker,会发现Worker会继续收到未处理完的消息。
Fair dispatch
RabbitMQ默认通过Round-robin dispatching的方式转发消息给多个消费者,可有时候,这种方式并不合适。比如上面的例子,如果第奇数个消息都代表特别耗时的操作,而第偶数个消息代表不耗时的操作,即使Worker2已处理完任务处于空闲状态,RabbitMQ已会持续将第奇数个消息转发给Worker1,这就会造成Worker1中的任务积压。
通过例子验证一下
为了效果明显,修改下NewTask.java中发送的消息的个数,改为发送10条消息
//发送10条消息,代表要执行的耗时任务 for (int i = 0; i < 10; i++) { StringBuilder msgSb = new StringBuilder("Hello World!"); for (int j = 0; j < i; j++) { msgSb.append("."); } String message = msgSb.toString(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } 复制代码
修改Worker.java 中doWork()方法
private static void doWork(String task) { int dotCount = 0; for (char ch : task.toCharArray()) { if (ch == '.') { dotCount++; } } //如果有偶数个点,代表耗时操作 if (dotCount % 2 == 0) { try { //模拟耗时操作 Thread.sleep(1000*dotCount); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } 复制代码
先启动两个Worker,再启动NewTask
可以看到收到有奇数个点的消息的Worker很快执行完了任务,而另一个Worker则一直在处理耗时的操作。 Worker1
[*] Waiting for messages. [x] Received 'Hello World!.' [x] Done. Cost seconds: 0 [x] Received 'Hello World!...' [x] Done. Cost seconds: 0 [x] Received 'Hello World!.....' [x] Done. Cost seconds: 0 [x] Received 'Hello World!.......' [x] Done. Cost seconds: 0 [x] Received 'Hello World!.........' [x] Done. Cost seconds: 0 复制代码
Worker2
[*] Waiting for messages. [x] Received 'Hello World!' [x] Done. Cost seconds: 0 [x] Received 'Hello World!..' [x] Done. Cost seconds: 2 [x] Received 'Hello World!....' [x] Done. Cost seconds: 4 [x] Received 'Hello World!......' [x] Done. Cost seconds: 6 [x] Received 'Hello World!........' [x] Done. Cost seconds: 8 复制代码
为了避免这种情况,只需在Worker.java添加如下两行代码
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages."); //fair dispatch(新添加的两行代码) int prefetchCount = 1; channel.basicQos(prefetchCount); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); long start = System.currentTimeMillis(); try { //执行耗时的操作 doWork(message); } finally { System.out.println(" [x] Done. Cost seconds: " + (System.currentTimeMillis() - start) / 1000); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; // acknowledgment is covered below channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } 复制代码
关闭Worker,再次重新启动两个Worker,启动NewTask
输出如下: Worker1
[*] Waiting for messages. [x] Received 'Hello World!' [x] Done. Cost seconds: 0 [x] Received 'Hello World!...' [x] Done. Cost seconds: 0 [x] Received 'Hello World!....' [x] Done. Cost seconds: 4 [x] Received 'Hello World!.......' [x] Done. Cost seconds: 0 [x] Received 'Hello World!........' [x] Done. Cost seconds: 8 复制代码
Worker2
[*] Waiting for messages. [x] Received 'Hello World!.' [x] Done. Cost seconds: 0 [x] Received 'Hello World!..' [x] Done. Cost seconds: 2 [x] Received 'Hello World!.....' [x] Done. Cost seconds: 0 [x] Received 'Hello World!......' [x] Done. Cost seconds: 6 [x] Received 'Hello World!.........' [x] Done. Cost seconds: 0 复制代码
可以看到,耗时的任务已经"均匀"的分配给了两个Worker。channel.basicQos(1),告诉RabbitMQ,不要一次把所有消息都给我,在我处理完消息后,再给我一个消息,否则把消息转发给别的已处理完消息的消费者。
Message durability(消息持久化)
问题又来了,如果消息处理过程中RabbitMQ Server意外停止了呢?没有转发出去的消息会丢失吗?答案是会丢失的。为了避免这种情况,我们就需要将消息队列声明为可持久化的。
首先,修改Worker.java和NewTask.java中Queue的名称,改为task_durable
因为RabbitMQ不允许修改已存在的消息队列的属性。
private final static String QUEUE_NAME = "task_durable"; 复制代码
声明消息队列时,指定为可持久化的
boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); 复制代码
发送消息时,指定消息为可持久化
//发送10条消息,代表要执行的耗时任务 for (int i = 0; i < 10; i++) { StringBuilder msgSb = new StringBuilder("Hello World!"); for (int j = 0; j < i; j++) { msgSb.append("."); } String message = msgSb.toString(); //指定消息为可持久化 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } 复制代码
这样,在消息处理的过程中,即使RabbitMQ Server重启,也不会丢失消息了。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- Golang学习笔记-调度器学习
- Vue学习笔记(二)------axios学习
- 算法/NLP/深度学习/机器学习面试笔记
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
WebWork in Action
Jason Carreira、Patrick Lightbody / Manning / 01 September, 2005 / $44.95
WebWork helps developers build well-designed applications quickly by creating re-usable, modular, web-based applications. "WebWork in Action" is the first book to focus entirely on WebWork. Like a tru......一起来看看 《WebWork in Action》 这本书的介绍吧!