RxJava2源码分析(二):操作符原理分析

栏目: Java · 发布时间: 5年前

内容简介:前言:上一篇文章为了方便理解RxJava2操作符的原理,这里选择最常用的因为这一篇的内容是在上一篇的基础上进行讲解的,所以在讲解操作符之前,先回顾一下前一篇主要的知识,如下:

前言:上一篇文章 RxJava2源码分析(一):基本流程分析 ,是对RxJava2基本流程的分析,有了上一篇的基础,这篇就再深入一点,开始分析一下RxJava2操作符的原理。

为了方便理解RxJava2操作符的原理,这里选择最常用的 map 操作符来讲解操作符的原理,示例代码如下

private void basicUseRxJava() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "我是数字" + integer;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("wizardev", "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

    }
复制代码

内容回顾

因为这一篇的内容是在上一篇的基础上进行讲解的,所以在讲解操作符之前,先回顾一下前一篇主要的知识,如下:

  1. Observable执行的create方法后返回的是ObservableCreate实例。
  2. create方法的参数,实际是注入到ObservableCreate类中,作为它的成员变量。
  3. 调用Observable的subscribe方法最终调用的是ObservableCreate类中的subscribeActual方法。

操作符分析

同样,这里分析源码的顺序依然按照代码的执行顺序, create 方法前文已经分析过了,这里就直接看 map 方法, map 方法的代码如下

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
复制代码

有了前一篇的经验,可以直接从这个方法中得出结论 map 方法返回的是ObservableMap实例,同时将 map 方法的参数及Observable自身注入了其构造方法中。

现在看下ObservableMap类的源码,如下

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);//调用了其父类的构造方法
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    //省略部分无关代码
    //...
}
复制代码

从上面的代码中可以看出, ObservableMap 继承至 AbstractObservableWithUpstream ,继续进入 AbstractObservableWithUpstream 类中看下源码,如下

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}
复制代码

从上面的一段代码,可以知道 AbstractObservableWithUpstream 类其实就是 Observable 类的装饰类,这个类的作用就是将实例化的 Observable 注入进来,作为其成员变量。分析到这里可以得出这几个类的关系如下

RxJava2源码分析(二):操作符原理分析

注:ObservableMap中的成员变量 function 就是我们写的这段代码

new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "我是数字" + integer;
            }
        }
复制代码

好了,到这里算是将 map 方法所做的事情分析完了,下面来看实例代码的最后一个方法 subscribe .

subscribe方法分析

通过上一篇文章可以知道 subscribe 方法实际调用的是Observable子类的 subscribeActual 方法,而这里调用 subscribe 方法的类是 ObservableMap ,所以这里调用的就是 ObservableMap 类的 subscribeActual 方法。现在来看下ObservableMap类的 subscribeActual 方法的源码,如下 (为了分析方便,这里将与subscribeActual方法有关的代码一起贴了出来)

final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
复制代码

从上面的代码可以发现 ObservableMap 类的 subscribeActual 方法又调用了其上游的 subscribe 方法,为了便于理解这里解释一下文中所说的上游下游

文中说的“上游”及“下游”其实是相对而言的,这里的“上游”是靠近Observable的,如示例代码中的subscribe是最下游,map是其上游,而map操作符又是crate方法的下游。

这里上游的 subscribe 方法就是ObservableCreate调用的 subscribe 方法,实际就是调用ObservableCreate的 subscribeActual 方法,接着就是前一篇讲过的流程了。

结论

分析到现在可以得出以下结论

  • subscribe 方法的调用流程是从下往上的,就是从下游往上游分别调用其 subscribe 方法。

为了方便理解,这里我用时序图表示 subscribe 的调用顺序,如下图

RxJava2源码分析(二):操作符原理分析

接收数据流程分析

上面的内容分析了RxJava2基本流程加入操作符后的 subscribe 方法的执行顺序,接着就来看下,数据的接收顺序。经过上面的分析可以知道最终调用的是ObservableCreate类的 subscribeActual 方法,这里与前一篇文章 subscribeActual 方法不同的就是 subscribeActual 方法的参数改变了,这里的参数是 MapObserver 类的实例,再来看下ObservableCreate类的 subscribeActual 方法的源码,如下

//这里的参数实际是MapObserver类的实例
protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            //这句代码的最终调用的就是MapObserver类的onNext方法。
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

一些重要的内容已经在上面的代码中进行了注释,可能这句 source.subscribe(parent); 代码,不好理解,这里就来解释一下这句代码,由上一篇文章可知,这里的 source 就是示例代码中的 new ObservableOnSubscribe<Integer>()... 实例,这里就是调用了这个实例的 subscribe 方法,而这个方法中的代码就是调用了其参数的 onNext 方法,**最终调用的就是MapObserver类的onNext方法。**现在,来看下MapObserver类的onNext方法的代码,如下

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
            //1、
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //2、
            downstream.onNext(v);
        }
复制代码

主要来看下 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); 这句代码,这句代码中的mapper就是注入到ObservableMap中的成员变量function,详细内容可以查看源码。调用的apply方法,就是示例代码中的这句代码

@Override
            public String apply(Integer integer) throws Exception {
                return "我是数字" + integer;
            }
复制代码

接着,可以发现又调用了“2”处的代码 downstream.onNext(v); ,这句代码中的 downstream 就是示例代码中最下游的 subscribe 方法中的参数即是下面的代码

new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("wizardev", "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        }
复制代码

所以这句 downstream.onNext(v); 调用的就是 new Observer<String>() 匿名类中的 onNext 方法。关于 downstream 的是何时初始化的,可以从MapObserver的父类BasicFuseableObserver类中知晓。

结论

分析到这里,又可以得出一些结论

  • 关于数据的处理上从上游到下游一级级的处理的。
  • 在MapObserver类中的 onNext 方法,首先调用的是 function 中的 apply 方法,然后再调用下游的 onNext 方法并将处理后的参数传入。

总结

通过分析 map 操作符,可以知道订阅方法(subscribe)是从下游到上游进行订阅的,而数据的发射是从上游到下游进行的。这两个特性不仅仅是 map 操作符的特性,对其他的操作符同样适用。为了讲清楚这两个特性,本文就选了比较具有代表性的 map 操作符,如果想了解其他操作符的原理,就顺着这两个特性分析就行了。

授之以鱼,不如授之以渔。本文的目的就是解释清楚操作符的思想及原理,理解了这种思想及原理,分析其他的操作符也就不在话下了。

RxJava2源码分析(二):操作符原理分析
扫码关注公众号,回复“获取资料”有惊喜

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

程序员第二步

程序员第二步

尹华山 / 人民邮电出版社 / 2013-11 / 45.00元

这本书是写给程序员和项目经理的。作者结合自身的丰富成长历程,通俗易懂地讲述了一名程序员如何才能成为一名优秀的项目经理。内容涉及职业规划、学习方法、自我修炼、团队建设、项目管理等,书中理清了项目管理领域中典型的误区及具有迷惑性的观点,并对项目中的难点问题提出了针对性的解决方法。 全书行文流畅,严谨中带着活泼,理智中透着情感,给读者带来轻松愉快的阅读感受。书中诸多富有创见的观点,让人耳目一新,引......一起来看看 《程序员第二步》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试