内容简介:Webhook是一种非常强大的推送机制,如果熟悉WordPress的同学可以类比构建WP生态的各类钩子函数。Githubt通过webhook让开发人员可以监听仓库的变化触发持续集成工具的运作,比如Travis CI。大家都看过Github上的webhook,可以对某一个repository设置webhook监听仓库变化,比如push,page_build等event(X-GitHub-Event)。
Webhook是一种非常强大的推送机制,如果熟悉WordPress的同学可以类比构建WP生态的各类钩子函数。Githubt通过webhook让开发人员可以监听仓库的变化触发持续集成 工具 的运作,比如Travis CI。
需求
大家都看过Github上的webhook,可以对某一个repository设置webhook监听仓库变化,比如push,page_build等event(X-GitHub-Event)。
每一次发送都会有个uuid作为标记,并写入到HTTP Header的X-GitHub-Delivery,并且对于发送失败的历史记录,可以点击Redeliver进行重发。
如果把该功能作为一个单独的服务提供方,其根本诉求就是要准确记录到此服务与Internet每一次网络交互的过程,包括发送请求数据和响应结果数据。继续细化,
- 该服务提供方接收客户端的调用,发送请求到客户端所指定的url并获取响应。
- 记录每次客户端的原始请求内容(url, method, header, body)以及response(header, body, code, etc.)
- 需要考虑到客户端重试或重复调用的情况,需要记录每个请求的调用次数以及最后一次调用时间。(客户端调用时可能会传入一个clientId用于接收端去重,如未传服务提供方根据请求生成一个唯一uuid)
- 提供接口对指定的某个发送历史进行重发。
思考
拿到需求首先要思考一下这个服务会与哪些系统有交互?
- 请求要发送到指定的URL上,那么第一个交互的系统是某个公网服务。
- 发送的历史要能保留,说明数据是需要持久化的。第二个交互的系统是数据库。
好了,交互的系统确定后,接下来应该考虑顺序问题,是先发送请求到公网服务还是先操作数据库?我们逐个来分析一下
- 方案A:先发送请求再将记录写入数据库。问题:如果请求发送了但是数据库写入失败,此时就会造成数据不一致,因为遗漏了发送历史。
- 方案B:先写入数据库再发送请求。问题:与A类似的,如果数据库写入成功,请求发送失败,比如网络断开等原因。此时数据也会不一致。虽然有了发送历史,但实际发送是失败的。
- 方案C:先写入数据库接着发送请求最后更新数据。这种方案相对来说比A和B要可靠。第一步写入请求的数据并将状态(status)置为
sending
,发送完成再更新status为success
或者failure
。 - 方案D:先写入数据库,将status设置为
sending
,启动一个新的线程扫描该表,对status为ready
的记录进行发送,发送完成再更新status为success
或者failure
。
前两个方案肯定是不可取的,我们来分析一下后两个方案的优劣。方案C的缺点在于第一步写入数据库完成后,发送请求时系统宕机,该记录会一直处于 sending
状态。好在整体方案会提供一个人工重试(点击Redeliver)的机制,可以事后弥补。优点在于串行化的思维编码比较容易。方案D的优点就是对于一直处于 sending
状态的历史记录,可以自动进行补发,因为有线程不断扫描。缺点在于这个扫描线程可能会加重数据库的负担。如果要想并行扫描那么又要解决任务分片和编排的问题(参考elestic-job),编码相对较难。
针对以上所述的利弊,最终我们选择方案C。其实把发送网络请求换成发送消息到MQ,那么方案D就很类似大家所熟知的“本地事务表”的解决方案,是将MQ的事务和本地数据库事务绑定的一种思路。
实现
数据结构,定义一个request和response
@Data @NoArgsConstructor public class WebHookRequest { @NotBlank private String url; private String method; private Map<String, String> headers; private String body; } @Data @NoArgsConstructor @AllArgsConstructor public class WebHookResponse { private String id; private String data; private int code; public boolean isSuccessful() { return this.code >= 200 && this.code < 300; } }
发送方法
public WebHookResponse send(WebHookRequest webHookRequest, String id, boolean retryOnServerError) { //如果traceId为空则根据请求参数生成一个md5的值作为traceId if (StringUtils.isBlank(id)) { //拼接请求参数 String content = webHookRequest.getUrl() .concat(webHookRequest.getMethod()) .concat(jsonMapper.toJson(webHookRequest.getHeaders())) .concat(webHookRequest.getBody()); id = DigestUtils.md5Hex(content); } // 查找发送记录 WebHookRecord webHookRecord = webHookRecordMapper.findById(id, TenantContext.getCurrentTenant()); if (webHookRecord == null) { webHookRecord = createFromRequest(webHookRequest); webHookRecord.setId(id); try { // 新建发送记录 webHookRecordMapper.insert(webHookRecord); } catch (Exception ex) { LOGGER.warn("Duplicate key for [{}]", webHookRecord.getId()); throw new BizException(B_01000, ex); } } else { // 如果历史已经存在,可能是客户端发送重发请求,那么先判断是否可以重发,sending状态在一定时间间隔内不能重发,避免频繁失败 if (this.shouldResend(webHookRecord.getSendTime(), webHookRecord.getStatus())) { // 可以重发那么更新状态为sending this.updateResendStatus(webHookRecord); } else { LOGGER.warn("Should not resend key for [{}]", webHookRecord.getId()); throw new BizException(B_01000); } } // 通过网络发送请求,如果失败会直接更新status为failure并抛出异常,发送过程的异常和得到响应对方服务报异常还是不一样的 WebHookResponse webHookResponse = this.doSend(id, webHookRequest); // 发送完成后更新status this.onResponse(id, webHookResponse); if (retryOnServerError && webHookResponse.getCode() >= 500) { throw new RetryException("server error!"); } return webHookResponse; } /* * 判断是否可以进行重发 */ private boolean shouldResend(DateTime sendTime, String status) { if (!STATUS_SENDING.equals(status)) { return true; } // 发送状态超过60s可重发 return sendTime.plusSeconds(60).isBefore(DateTime.now()); } /* * 调用网络接口进行发送 */ private WebHookResponse doSend(String id, WebHookRequest webHookRequest) { //调用httpClient String responseBodyString = null; Response response; try { response = webHookIntegrationService.send(webHookRequest.getUrl(), webHookRequest.getMethod(), webHookRequest.getHeaders(), webHookRequest.getBody()); if (response.body() != null) { responseBodyString = response.body().string(); } } catch (Exception ex) { this.onSendFailed(id, ExceptionUtils.getStackTrace(ex)); throw new RetryException("http send error!", ex); } return new WebHookResponse(id, responseBodyString, response.code()); } /* * 请求正常返回后的处理 * 这里有一个乐观锁的问题,如果同时有多个线程调用改办法修改同一个历史请求,只有一个线程会更新成功 */ private void onResponse(String id, WebHookResponse webHookResponse) { WebHookRecord webHookRecord = webHookRecordMapper.findById(id, TenantContext.getCurrentTenant()); webHookRecord.setResponse(webHookResponse.getData()); webHookRecord.setResponseCode(webHookResponse.getCode()); //更新状态 if (webHookResponse.isSuccessful()) { webHookRecord.setStatus(STATUS_SUCCESS); } else { webHookRecord.setStatus(STATUS_ERROR); } int count = webHookRecordMapper.updateResponse(webHookRecord); if (count == 0) { LOGGER.warn("Attempt to update WebHook id={} with wrong version ({})", id, webHookRecord.getVersion()); } } /* * 再次发送时更新状态 (真正发送网络请求前) */ private void updateResendStatus(WebHookRecord webHookRecord) { webHookRecord.setStatus(STATUS_SENDING); webHookRecord.setSendTime(DateTime.now()); int count = webHookRecordMapper.resend(webHookRecord); if (count == 0) { throw new OptimisticLockingFailureException("Attempt to update WebHook id=" + webHookRecord.getId() + " with wrong version (" + webHookRecord.getVersion() + ")"); } }
为了避免对于同一个请求有多个线程同时发起重试的问题,我们在 updateResendStatus
方法上使用了乐观锁,如果其中一个线程更新状态成功,那么其他线程会因乐观锁问题直接失败,不会走到真正发送网络的请求的那步。也就是说在发送网络请求前过滤绝大部分并发问题。
最后,可以使用一个单元测试,模拟并发请求进行验证。
@Test public void testMultiThreads() throws InterruptedException { //调用send接口 WebHookRequest webHookRequest = new WebHookRequest(); webHookRequest.setUrl("http://www.qq.com"); webHookRequest.setMethod("POST"); Map<String, String> headers = new HashMap<>(1); webHookRequest.setHeaders(headers); int nLoop = 100; String clientId = UUID.randomUUID().toString(); CountDownLatch countDownLatch = new CountDownLatch(nLoop); Runnable task = () -> { try { givenToken().when().body(webHookRequest).post("/webhooks/send?clientId=" + clientId) .then() .statusCode(HttpStatus.OK.value()) .extract() .response(); } finally { countDownLatch.countDown(); } }; ExecutorService executorService = new ThreadPoolBuilder.FixedThreadPoolBuilder().setThreadNamePrefix("thread-webhook").setPoolSize(100).build(); for (int i = 0; i < nLoop; i++) { executorService.execute(task); } countDownLatch.await(); int times = webHookMapper.findById(clientId, 1L).getTimes(); // 验证数据库里记录的发送次数是否 等于 真正调用发送网络请求接口的次数 Mockito.verify(webHookIntegrationService, Mockito.times(times)).send(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); }
总结
如果想进一步提升性能,可以使用支持异步的httpclient工具包, onResponse
在callback中进行处理。
整体来说,这是个很简单的小需求,但要考虑周全其实还是要费一番功夫的。究其本质就是异构系统间的数据一致性问题。当我们把发送网络请求换成写数据到redis,到MQ,到另一个微服务时,就会发现它们存在的共通性。一次请求涉及多个系统,并且无法包裹进同一个事务,就会产生这样的问题。至于解决方案是二阶段提交,事后补偿,还是自动对账,就要根据自己的业务特点来选择了。
以上所述就是小编给大家介绍的《类似Github的webhook实现》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ASP实现类似hashMap功能的类
- PHP实现类似题库抽题效果
- java实现golang类似的chan
- splash 实现类似selenium的WebDriverWait
- C++ 中类似 Python 的 dict 的实现
- 如何实现类似 lodash 的 get 与 merge 函数
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
我是90后,我是创业家
腾讯互联网与社会研究院 / 华章图书 / 2015-1-1 / 48.00元
第1本揭秘17个90后精彩创业故事 他们是:脸萌创始人郭列、北大硕士卖米粉的张天一、微博《我只过1%的生活》短时间转发35万多次的伟大的安妮、备受争议的90后总裁余佳文、节操姐CEO陈桦……17位90后的创业家为你分享他们的创业故事!从这些90后孩子的经历中,还可以看到互联网带来的巨大好处,这又是这一代人的幸运。这些创业者有一个共同特点,即他们在做自己事业的时候,会经常遇到来自家庭和社会的阻......一起来看看 《我是90后,我是创业家》 这本书的介绍吧!