RxJava练武场之——Token前置请求

栏目: Java · 发布时间: 7年前

内容简介:Rxjava这个库和其他常见库不太一样,一般的库例如Glide,ButterKnife都是为了解决实际问题出现的,一定程度上是刚需。Glide库如果不用他,那么应用自己就要处理图片下载、压缩、内存管理、多级缓存等等复杂的逻辑。这类问题复杂而常见,而像Glide这类的轮子,Api的设计都比较友好,一个简单的api调用就能完成一个原本很复杂的功能,简直不要太爽。而Rxjava,你刚开始看起来,都不知道他是干什么的。“异步处理”?不是一般都使用观察者模式吗?AsyncTask,Handler也可以,要rxjav
RxJava练武场之——Token前置请求

RxJava的用武之地

Rxjava这个库和其他常见库不太一样,一般的库例如Glide,ButterKnife都是为了解决实际问题出现的,一定程度上是刚需。Glide库如果不用他,那么应用自己就要处理图片下载、压缩、内存管理、多级缓存等等复杂的逻辑。这类问题复杂而常见,而像Glide这类的轮子,Api的设计都比较友好,一个简单的api调用就能完成一个原本很复杂的功能,简直不要太爽。

Glide.with(context)
    .load(url)//图片加载
    .crossFade()//动画设置
    .placeholder(R.drawable.place_image)//占位图
    .error(R.drawable.error_image)//失败占位图
    .override(width,height)//图片裁剪
    .thumbnail(thumbnailRequest)//配置缩略图
    .diskCacheStrategy(DiskCacheStrategy.SOURCE)//缓存策略
    .into(imageView);
复制代码

而Rxjava,你刚开始看起来,都不知道他是干什么的。“异步处理”?不是一般都使用观察者模式吗?AsyncTask,Handler也可以,要rxjava干嘛?如果你有兴趣研究过一点rxjava,会发现网上的教程都会说:"zip map flatmap debounce等操作符把异步回调变得‘简洁’‘优雅’",然后对比一下原来的代码和使用rxjava后的代码,最后感叹一下rxjava设计的鬼才和功能的强大。我自己在初次接触rxjava时也感觉,这些rxjava的优点描述比较空洞,这项技术的意义大于实用。 实际情况是这样么?在具体开发中,异步调用给我们的最大困扰是:异步回调的时间并不可控。当有多个异步回调时,这些调用相互联系和依赖,搞清楚每个回调何时返回是个重要的问题。在每个关键时间节点对‘分散的callback’做正确的事,有过类似编程经验的人都知道,是非常痛苦的事,如果还想代码容易看懂,简直是疯了。

RxJava练武场之——Token前置请求

rxjava号称异步调用的终极解决方案,能否解决以上困扰?随着学习和应用的深入,体会会更明显。以下会用一个稍复杂的例子,实操一个复杂异步场景,看看rxjava处理的怎么样。

典型复杂异步场景 -- Token的前置校验

经常遇到这种需求,接口的请求依赖token信息。一个请求需要先请求token(token如果存在缓存则使用缓存),依赖这个token才能进行正常网络请求。这个token有一定的时效性,在时效性内可以使用缓存,过期后需要重新请求token并重新发起一次请求。这个流程可以归纳如下图:

RxJava练武场之——Token前置请求

光看这些需求,是不是觉得已经够你喝一壶了,别忙,还有些潜在的逻辑这个图没有表现出来: 1 高并发网络请求时,如果token正在请求,需要对请求阻塞(token请求过程中,不再接受新的token请求) 2 阻塞的同时,要把这些请求记录下来,token请求成功后,再‘依次’发送这些阻塞的请求。 3 token失效情况下,网络请求限制重试次数。(防止递归调用) 4 token请求本身,重试策略需单独配置。

不使用rxjava,我们如何实现上述需求:

1、网络请求前,对token是否有缓存判断,如果没有先请求token,并把这个请求阻塞且缓存 2、token请求过程中,如果有新的token请求进来,加入阻塞队列 3、token请求后,通知阻塞的队列(广播等方式),依次进行阻塞的请求 4、对两种次数限制,分别做逻辑判断

以上就是传统实现方法,就不贴代码了,这样实现有以下特点: 1、要时刻维护一个阻塞队列 (注意其添加和清空的时机) 2、token请求结束后,有一个回调机制通知阻塞队列,(这个回调需要注册和反注册) 3、两处的次数限制,次数维护的变量,不好维护(一般动态秘钥为了便于使用会做成单例,单例内的变量类似static,维护较复杂) 4、请求重试的逻辑不好实现,

我们可以看到这里涉及到很多静态变量的维护,广播等异步回调的处理,这种情况一多,编程者会变得很被动。而且token的异步请求和真正的网络异步请求杂糅在一起,增大了问题的复杂性。

我们来看下rxjava如何处理:

一些代码网络请求部分与前一篇博客《基于RxJava Retrofit的网络框架》相关。

先看看完整的请求过程

public static <R> Observable send(final MapiHttpRequest request, final MapiTypeReference<R> t){
    return Observable.defer(new Callable<ObservableSource<String>>() {
                @Override
                public ObservableSource<String> call() throws Exception {
                    //传入token缓存
                    return Observable.just(Store.sToken);
                }
            }).flatMap(new Function<String, ObservableSource<R>>() {
                @Override
                public ObservableSource<R> apply(String key) throws Exception {
                    if(TextUtils.isEmpty(key) && !request.skipCheckKeyValid()){
                        //token没有缓存,需要请求Token
                        return Observable.<R>error(new KeyNotValidThrowable());
                    } else {
                        //Token存在缓存,直接请求
                        return sendRequestInternal(request,t);
                    }
                }
            })
            //进入失败重试流程
            .retryWhen(new Function<Observable<? extends Throwable>, ObservableSource<String>>() {
                private int retryCount = 0;
                @Override
                public ObservableSource<String> apply(Observable<? extends Throwable> throwableObservable) throws Exception {
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(Throwable throwable) throws Exception {
                            if (throwable instanceof KeyNotValidThrowable){
                                //同一Request,有过一次KeyNotValidThrowable,则不再重试
                                if (retryCount > 0){
                                    return Observable.error(throwable);
                                } else {
                                //token缓存不在,进入TokenLoader请求token
                                    retryCount++;
                                    return TokenLoader.getInstance().getNetTokenLocked();
                                }
                            } else if (throwable instanceof ApiException){
                                  //token过期的情况,重新获取token,并重试
                                  ApiException apiException = (ApiException)throwable;
                                  if (apiException.getCode() == MapiResultCode.SECRETKEY_EXPIRED.value()){
                                      if (retryCount > 0){
                                          return Observable.error(throwable);
                                      } else {
                                          //token缓存失效,进入TokenLoader请求token
                                          retryCount++;
                                          return DynamicKeyLoader.getInstance().getNetTokenLocked();
                                      }
                                  }
                            }
                            //其他类型错误,直接抛出,不再重试
                            return Observable.error(throwable);
                        }
                    });
                }
            });
}
复制代码

也许你第一次看也挺晕,别怕,你顺着注释捋捋逻辑,是不是感觉代码的实现好像画了一个时序图。 除了注释以外,几点说明: 1、defer操作符的作用是在retry时,会重新创建新的Observable,否则会使用上次的Observable,不会重新获取Store.sToken 2、retryWhen操作符,与sendRequestInternal内部统一配置的retryWhen并不冲突,相当于二次retry 3、retryWhen中如果抛出error ,则不再重试; 4、重试请求,通过返回getNetTokenLocked这个subject实现。(下面详述)

阶段总结:

整体的流程被压缩到了一个函数中,rxjava本身的retrywhen和subject机制,已经替我们完成了这么几点: 1、自动重试的注册和反注册,subject被回调完直接失效,再次请求要重新注册。 2、高并发request,维护队列,通过mTokenObservable的回调自动解决了这个问题 3、retry次数的维护,由于每次request的retry都是重新创建的内部类,所以变量的维护变的简单。 4、重试的逻辑被retry操作符自动实现了,只要重写retry的返回值就可以控制重试的策略。

TokenLoader:Token的获取过程

public class TokenLoader {

    public static final String TAG = TokenLoader.class.getSimpleName();

    private AtomicBoolean mRefreshing = new AtomicBoolean(false);
    private PublishSubject<String> mPublishSubject;
    private Observable<String> mTokenObservable;

    private TokenLoader() {
        final TokenRequest request = new TokenRequest(CarOperateApplication.getInstance());
        mTokenObservable = Observable
                  .defer(new Callable<ObservableSource<TokenRequest>>() {
                      @Override
                      public ObservableSource<TokenRequest> call() throws Exception {
                          return Observable.just(request);
                      }
                  })
                  .flatMap(new Function<TokenRequest, ObservableSource<MapiHttpResponse<Boolean>>>() {
                      @Override
                      public ObservableSource<MapiHttpResponse<Boolean>> apply(RefreshKeyRequest refreshKeyRequest) throws Exception {
                          //Token请求接口
                          return ApiHelper.sendDynamicKey(refreshKeyRequest,new MapiTypeReference<MapiHttpResponse<Boolean>>(){});
                      }
                  })
                  .retryWhen(new Function<Observable<Throwable>, ObservableSource<TokenRequest>>() {
                      private int retryCount = 0;
                      @Override
                      public ObservableSource<TokenRequest> apply(Observable<Throwable> throwableObservable) throws Exception {
                          return throwableObservable.flatMap(new Function<Throwable, ObservableSource<TokenRequest>>() {
                              @Override
                              public ObservableSource<RefreshKeyRequest> apply(Throwable throwable) throws Exception {
                                  retryCount++;
                                  if (retryCount == 3){
                                      //失败次数达到阈值,更改请求策略
                                      request.setFlag(0);
                                      return Observable.just(request);
                                  } else if (retryCount > 3){
                                      //失败次数超过阈值,抛出失败,放弃请求
                                      mRefreshing.set(false);
                                      return Observable.error(throwable);
                                  } else {
                                      //再次请求token
                                      return Observable.just(request);
                                  }
                              }
                          });

                      }
                  })
    //                      .delay(6000, TimeUnit.MILLISECONDS) //模拟token请求延迟
                  .map(new Function<MapiHttpResponse<Boolean>,String>() {
                      @Override
                      public String apply(MapiHttpResponse<Boolean> response) throws Exception {
                          //成功,保存token缓存
                          if (response.getContent().booleanValue() == true){
                              setCacheToken(response.getToken());
                          } else if (response.getContent().booleanValue() == false){
                              setCacheToken(UcarK.getSign());
                          }
                          //请求完成标识
                          mRefreshing.set(false);
                          return getCacheToken();
                      }
                  });
    }

    public static TokenLoader getInstance() {
        return Holder.INSTANCE;
    }

    private static class Holder {
        private static final TokenLoader INSTANCE = new TokenLoader();
    }

    public String getCacheToken() {
        return Store.sToken;
    }

    public void setCacheToken(String key){
        Store.sToken = key;
    }

    /**
     *
     * @return
     */
    public Observable<String> getNetTokenLocked() {
        if (mRefreshing.compareAndSet(false, true)) {
            Log.d(TAG, "没有请求,发起一次新的Token请求");
            startTokenRequest();
        } else {
            Log.d(TAG, "已经有请求,直接返回等待");
        }
        return mPublishSubject;
    }

    private void startTokenRequest() {
        mPublishSubject = PublishSubject.create();
        mTokenObservable.subscribe(mPublishSubject);
    }

}
复制代码

还是读注释,除了注释以外,几点说明: 1、mRefreshing的作用是在token请求过程中,不再允许新的token请求, 变量采用原子类,而非boolean;这样在多线程环境下,原子类的方法是线程安全的。 compareAndSet(boolean expect, boolean update)这个方法两个作用 1)比较expect和mRefresh是否一致 2)将mRefreshing置为update

2、startTokenRequest()方法开启token请求,注意Observable在subscribe时才正式开始

3、这里使用了PublishSubject较为关键,在rxjava中Subject既是observable,又是observer,在TokenLoader中,mPublishSubject是mTokenObservable的观察者,token请求的会由mPublishSubject响应,同时mPublishSubject也作为Observable返回给TokenLoader的调用者作为retryWhen的返回值返回。(所以这里PublishSubject的泛型与send()方法中Observable的泛型应该是一致的)

4、对于mRefreshing是true的情况,直接返回mPublishSubject,这样每个阻塞的请求retryWhen都会等待mPublishSubject的返回值,回调通知的顺序与加入阻塞的顺序是队列关系(先请求的接口,先回调),满足我们的需求。

最后: 感觉怎么样,是豁然开朗还是越陷越深,不管那样都没有关系,你需要的是了解还存在另一种处理异步任务的方法。在你下一次遇到同样让你头疼的问题时,你可以把这篇文章拿起来再看看,也许你的头疼会好一点了。。。


以上所述就是小编给大家介绍的《RxJava练武场之——Token前置请求》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Shallows

The Shallows

Nicholas Carr / W. W. Norton & Company / 2010-6-15 / USD 26.95

"Is Google making us stupid?" When Nicholas Carr posed that question, in a celebrated Atlantic Monthly cover story, he tapped into a well of anxiety about how the Internet is changing us. He also crys......一起来看看 《The Shallows》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具