基于RabbitMQ解决微信大量并发回调的问题(微信产品服务端工程师值得看)
顶
原
荐
字数 990
阅读 943
收藏 0
RabbitMQ HAproxy Lua cjson stomp
年底了,该给自己写个总结了,一个六年女 Java 程序员的心声 >>>
前言
- 大量粉丝的公众号已经有非常多了,数百万数千万的都很多,再加上很多微信开放平台,导致接收的微信回调非常多,如:关注/取消关注,获取地理位置,发消息,向用户推送模板消息,点击菜单,扫描带参二维码。这么多场景都会导致微信回调,若单纯使用Tomcat,Undertow,apache之类的Web容器去处理,使用资源成本比较大,而且微信要求在5s内进行回复直接异步处理也需要很多机器才处理得来。可以选择使用些高性能IO框架去解决 如果:smart-io, tio, netty, swoole之类去增加吞吐量,但是这样增加了学习成本和维护成本。我之前写过一篇文章是用 Redis 来解决的 解决模板消息回调的困扰 , 但是死循环会长期占用一条RUNNABLE线程,比较浪费cpu资源,可以优化成用分布式锁+rangeAll降低循环频率从而减轻负载,这次我要介绍的是使用RabbitMQ去处理(Nginx + Lua + RabbitMQ)。
准备工作
-
搭建RabbitMQ集群,我有个小伙伴写了篇博客,现在分享一下: RabbitMQ集群原理和部署
- Lua连接RabbitMQ官方推介使用rabbitmqstomp
# 开启RabbitMQ的rabbitmq_stomp插件(版本过低的可能会没有这个插件) rabbitmq-plugins enable rabbitmq_stomp
-
开启* rabbitmq_stomp 插件后会监听多一个端口 61613 (很重要,没有测试过端口冲突的情况),如果是单机RabbitMQ的话,在 Lua 脚本中应该连接这个端口。
-
都说是针对生产环境搭建的,所以HAProxy肯定少不了,修改haproxy的配置文件 /etc/haproxy/haproxy.cfg ,不一样的自行切换。下面贴上HAProxy的配置:
listen rabbitmqstomp 10.0.0.1:5670 mode tcp balance roundrobin server rabbit-master 10.0.0.2:61613 check inter 5s rise 2 fall 3 server rabbit-node1 10.0.0.3:61613 check inter 5s rise 2 fall 3
Lua脚本
-- 打印日志 function log(fileName, content) local f = assert(io.open(fileName,'a')) f:write(content..'\n') f:close() end -- 读取请求Get参数 ngx.req.read_body() local data, err = ngx.req.get_body_data() if (nil ~= err) then log(log_file, "body is null: " + err) return end if(nil == data) then params = ngx.req.get_uri_args() ngx.say(ngx.var.arg_echostr) ngx.exit(ngx.HTTP_OK); return end local log_file = '/var/log/nginx/logs/redis.log' local rabbitmq = require "resty.rabbitmqstomp" local cjson = require "cjson" local opts = { host = '10.0.0.1', port = '5670', username = 'abcd', password = 'abcd', vhost = 'abc' } local mq, err = rabbitmq:new(opts) if not mq then log(log_file, "can not new rabbitmq: " .. err) return end mq:set_timeout(10000) local ok, err = mq:connect(opts['host'], opts['port']) if not ok then log(log_file, "Connect Error: " .. err) mq:close() return end local args, err = ngx.req.get_uri_args() if (nil ~= err) then log(log_file, "getURI: " + err) mq:close() return end local message = {} message['signature'] = args['msg_signature'] message['timestamp'] = args['timestamp'] message['nonce'] = args['nonce'] message['data'] = data local text = cjson.encode(message) ngx.log(ngx.ERR, text) local headers = {} -- 消息发送到哪里 /exchange/交换机名称/routing_key名称 headers["destination"] = "/exchange/topic.wechatEvent.receive/wechatEvent.receive" -- -- 是否持久化 headers["persistent"] = "true" -- -- 消息格式 headers["content-type"] = "application/json" local ok, err = mq:send(text, headers) if not ok then ngx.log(ngx.ERR, "cannot send mq") mq:close() return end -- 消息保持长连接,第一个参数表示连接超时时间,第二个参数是表示连接池大小 -- -- 由于 rabbitmq 连接建立比较耗时,所以保持连接池是非常必要的 local ok, err = mq:set_keepalive(10000, 500) if not ok then log(log_file, "set keepalive error: " .. err) mq:close() return end ngx.say('success') ngx.exit(ngx.HTTP_OK)
Java代码
@RabbitListener(queues = MQConstants.Queue.WECHAT_EVENT_RECEIVE) public void handle(WechatReceiveMessage receiveMessage) throws Exception { //TODO }
总结
对比之前之前用Redis处理,用RabbitMQ更加实时,更加稳定更高并发,应该算是初创团队的最优方案了。觉得好的可以收藏下点个赞支持一下。
© 著作权归作者所有
共有人打赏支持
上一篇: 做个情商高的程序员
下一篇: 这可能是搭建Codis集群介绍最全的文章
相关文章 最新文章
背景知识 RabbitMQ RabbitMQ 是基于 AMQP 协议实现的一个消息队列(Message Queue),Message Queue 是一个典型的生产者/消费者模式。生产者发布消息,消费者消费消息,生产者和消费者之间是...
Java干货分享
10/26
0
0
1、前言 在IM这种讲究高并发、高消息吞吐的互联网场景下,MQ消息中间件是个很重要的基础设施,它在IM系统的服务端架构中担当消息中转、消息削峰、消息交换异步化等等角色,当然MQ消息中间件的...
JackJiang2011
06/12
0
0
微服务架构与MQ RabbitMQ场景分析与优化 RabbitMQ在网易蜂巢中的应用和案例分享 1微服务架构与MQ 微服务架构是一种架构模式,它将单体应用划分成一组微小的服务,各服务之间使用轻量级的通信...
andrewniu
05/10
0
0
作者介绍 章为忠,随变科技.net架构师。致力于电商领域的开发与架构设计工作,拥有丰富的电商网站架构搭建经验。博客:http://www.cnblogs.com/zhangweizhong/。 本文大纲: 1. RabbitMQ简介...
章为忠
2017/04/27
0
0
在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写。今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布。 注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代...
长平狐
2012/11/06
148
0
没有更多内容
加载失败,请刷新页面
加载更多JMM怎么解决原子性、可见性、有序性的问题? 在Java中提供了一系列和并发处理相关的关键字, 比如volatile、Synchronized、final、juc(java.util.concurrent)等, 这些就是Java内存模型封装...
Java搬砖工程师
11分钟前
2
0
Mybatis错误:Parameter 'XXX' not found. Available parameters are [1, 0, param1, param2] 原因:传递的参数超过一个 记 解决办法: (1)直接把值改成数字 1 2 3 <select id="LoginUser"......
xiaomin0322
12分钟前
0
0
随着两会中间央视新闻天天说大数据,很多人纷纷开始关注大数据和Hadoop以及数据挖掘和数据可视化了,我现在创业,遇到很多传统数据行业往Hadoop上面去转型的公司和个人,提了很多问题,大多数...
技术阿飞
18分钟前
1
0
MyEclipse 在线订购年终抄底促销!火爆开抢>> MyEclipse最新版下载 使用MyEclipse开发RESTWeb服务来放大您的Web应用程序。在本教程示例中,您将创建一个简单的Web服务来维护客户列表。你将学...
电池盒
21分钟前
0
0
没有更多内容
加载失败,请刷新页面
加载更多以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- [OC] 关于block回调、高阶函数“回调再调用”及项目实践
- 即使回调IsOneWay,WCF客户端也会因回调而死锁
- Java 回调机制解读
- 如何避免回调地狱
- 如何使用JPA回调?
- callback回调函数-python
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。