RxJava2源码初探

栏目: Java · 发布时间: 5年前

内容简介:众所周知RxJava有许多优点比如强大的链式调用,方便的线程调度,但是我对其原理还是了解的太少了,因此打算阅读下源码,先从一个最基本的例子开始这个例子只是为了示例,正常情况下也不会这么写输出结果:

众所周知RxJava有许多优点比如强大的链式调用,方便的线程调度,但是我对其原理还是了解的太少了,因此打算阅读下源码,先从一个最基本的例子开始

一、例子

这个例子只是为了示例,正常情况下也不会这么写

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }.subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }
        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }
        override fun onNext(t: Int) {
            println("onNext $t")
        }
        override fun onError(e: Throwable) {
            println("onError")
        } 
    })
}
复制代码

输出结果:

onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 3
onComplete
复制代码

那么为什么会按这个顺序输出呢?从代码中也可以看出从始至终也只调用了create、subscribe两个方法,先来看看create的源码

// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
复制代码

可以看出当我们没有设置onObservableAssembly时其实就是直接创建了一个ObservableCreate实例返回,接着看看subscribe

public final void subscribe(Observer<? super T> observer) {
    subscribeActual(observer);
}
复制代码

可以看到内部就是调用了subscribeActual方法,而这个方法是个抽象方法,ObservableCreate实现了该方法

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    source.subscribe(parent);
}
复制代码

内部主要就是先创建了一个CreateEmitter实例,然后调用observer的onSubscribe方法,最后再调用source的subscribe方法,这就解释了onObserverSubscribe和onSourceSubscribe的输出,而source的subscribe方法又调用了三次onNext方法和一次onComplete方法,先看看onNext

// CreateEmitter.java
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}
复制代码

如果还没dispose那么直接就调用了observer的onNext,这也就解释了onNext 1、onNext 2、onNext 3三个输出接着看onComplete

// CreateEmitter.java
public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}
复制代码

如果还没dispose就直接调用observer的onComplete,直接解释了onComplete的输出,我们注意到Observer还有一个onError回调,该方法可以通过调用emitter.onError手动触发

// CreateEmitter.java
public void onError(Throwable t) {
    if (!tryOnError(t)) {
        RxJavaPlugins.onError(t);
    }
}
public boolean tryOnError(Throwable t) {
    if (t == null) {
        t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (!isDisposed()) {
        try {
            observer.onError(t);
        } finally {
            dispose();
        }
        return true;
    }
    return false;
}
复制代码

可以看到当还没被dispose就会调用到observer的onError方法,至此这个基本demo的源码已经分析完毕。 总结下上述代码其实就分为如下几步

  1. 创建Observable实例
  2. 调用该实例的subscribeActual方法
  3. 回调observer的onSubscribe方法
  4. 调用source的subscribe方法
  5. 上述的subscribe方法内部可以执行若干次onNext,最多一个onError,最多一次onComplete

上述实例的整体流程图如下

RxJava2源码初探

下面来从源码的角度研究研究RxJava中的几个基本方法

二、基本方法

首先从最基本的map方法开始

1. map

map方法将每个onNext事件都调用所传入的Function实例的apply方法来达到转化数据源的效果,如下图所示

RxJava2源码初探

实例代码如下所示

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }
    .map {
       it + 1
    }
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }
        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }
        override fun onError(e: Throwable) {
            println("onError")
        }
        override fun onNext(t: Int) {
            println("onNext $t")
        }
    })
}
复制代码

输出结果:

onObserverSubscribe
onSourceSubscribe
onNext 2
onNext 3
onNext 4
onComplete
复制代码

很明显map方法会对所有的next的数据做一次变化这里是加1,接着看看map的源码实现

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码

内部创建了一个ObservableMap实例并将当前的Observable实例和Function实例传入,根据本文一开始的分析当调用Observable的subscribe方法其实会调用subscribeActual方法

// ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}
复制代码

创建了MapObserver实例将Observer实例进行包装然后调用source.subscribe,这个source其实就是上一级Observable实例本例中对应ObservableCreate实例,接着根据上文的分析会调用该MapObserver实例的onNext三次然后调用一次onComplete

// MapObserver.java
public void onNext(T t) {
    // 初始化的时候done为false
    if (done) {
        return;
    }
    // 初始化的时候就是NONE
    if (sourceMode != NONE) {
        downstream.onNext(null);
        return;
    }
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}
复制代码

我们可以看到内部调用了mapper.apply方法,接着将拿到的结果v当做参数调用downstream的onNext方法,注意这里的downStream就是外界创建的一个Observer对象。上述实例是整体流程图如下。注:绿色框表示对象,蓝色框表示方法调用,括号内为简称

RxJava2源码初探

综上我们可以知道map通过代理下游Observer实例完成数据转换,接着看看flatMap的源码实现

2. flatMap

flatMap方法用于将上游的每一个onNext事件都转换成一个Observable实例,如下图所示

RxJava2源码初探
fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }
    .flatMap {
        Observable.just(it, it + 1)
    }
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }

        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }

        override fun onError(e: Throwable) {
            println("onError")
        }

        override fun onNext(t: Int) {
            println("onNext $t")
        }
    })
}
复制代码

输出结果:

onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 2
onNext 3
onNext 3
onNext 4
onComplete
复制代码

很显然flatMap将每一个事件比如1转换成一个拥有1、2两个事件的Observable实例,来看看其源码实现

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码

默认delayErrors为false表示当一个事件出现异常就会停止整个事件序列,默认并发数为Int的最大值,默认缓存大小为128,然后根据这些参数和当前Observable实例构建出一个ObservableFlatMap实例,我们看看其subscribeActual方法

// ObservableFlatMap.java
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码

内部又通过这些参数和下游的Observer实例构建了一个MergeObserver实例,直接看看其onSubscribe方法

// MergeObserver.java
public void onSubscribe(Disposable d) {
    // 只会回调一次下游的onSubscribe方法
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        downstream.onSubscribe(this);
    }
}
复制代码

如果已经有上游了就不做任何处理不然进行上游的赋值,然后回调了下游也就是自定义的那个Observer的onSubscribe方法,接着看看其onNext方法是怎么把一个输入源转化成一个Observable的

public void onNext(T t) {
    ObservableSource<? extends U> p;
    p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    subscribeInner(p);
}
复制代码

先是调用了传入的apply方法将每个onNext数据源转化为Observable实例,接着调用subscribeInner方法

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) {
            ...
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}
boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        InnerObserver<?, ?>[] a = observers.get();
        int n = a.length;
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}
复制代码

为每个Observable对象都创建了一个InnerObserver实例,然后将其放入到一个数组中去,最后调用subscribe方法进行订阅,由于apply方法返回了一个ObservableFromArray实例,所以看看其subscribeActual方法

// ObservableFromArray.java
public void subscribeActual(Observer<? super T> observer) {
    FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
    observer.onSubscribe(d);
    if (d.fusionMode) {
        return;
    }
    d.run();
}
复制代码

observer指代InnerObserver,看看其onSubscribe方法

public void onSubscribe(Disposable d) {
    // 第一次调用会返回true,d就是FromArrayDisposable实例其派生自QueueDisposable
    if (DisposableHelper.setOnce(this, d)) {
        if (d instanceof QueueDisposable) {
            QueueDisposable<U> qd = (QueueDisposable<U>) d;
            // 相当于传入了7
            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
            if (m == QueueDisposable.SYNC) {
                fusionMode = m;
                queue = qd;
                done = true;
                parent.drain();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                fusionMode = m;
                queue = qd;
            }
        }
    }
}
//FromArrayDisposable.java
public int requestFusion(int mode) {
    // 很明显7 & 1 != 0
    if ((mode & SYNC) != 0) {
        fusionMode = true;
        return SYNC;
    }
    return NONE;
}
复制代码

这里暂时还没法理解这个fusionMode(混合模式)是干什么用的,接着会调用到MergeObserver的drain方法

void drain() {
    // 只会执行一次,循环将所有事件取出
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}
// 当取消了或者出现了错误并其dealyErrors为false时会将所有InnerObserver都dispose掉
boolean checkTerminate() {
    if (cancelled) {
        return true;
    }
    Throwable e = errors.get();
    if (!delayErrors && (e != null)) {
        disposeAll();
        e = errors.terminate();
        if (e != ExceptionHelper.TERMINATED) {
            downstream.onError(e);
        }
        return true;
    }
    return false;
}
void drainLoop() {
    // 这里的downstream就是外界自定义的Observer实例
    final Observer<? super U> child = this.downstream;
    for (;;) {
        ...
        for (int i = 0; i < n; i++) {
            if (checkTerminate()) {
                return;
            }
            InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
            SimpleQueue<U> q = is.queue;
            if (q != null) {
                for (;;) {
                    U o;
                    try {
                        o = q.poll();
                    } catch (Throwable ex) {
                        ...
                        if (checkTerminate()) {
                            return;
                        }
                        continue sourceLoop;
                    }
                    // 每取出一个便会调用下游的onNext方法
                    child.onNext(o);
                }
                // 会把一个Observable源所有的数据都取完了以后才会进入下一个
                if (o == null) {
                    break;
                }
            }
            ...
        }
        ..
    }
}
复制代码

drainLoop内部会从数组中一个个取出InnerObserver实例,并取出所对应的数据源然后每取出一个回调下游Observer的onNext方法,下面用一张图来总结下实例的流程

RxJava2源码初探

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Hadoop in Action

Hadoop in Action

Chuck Lam / Manning Publications / 2010-12-22 / USD 44.99

HIGHLIGHT Hadoop in Action is an example-rich tutorial that shows developers how to implement data-intensive distributed computing using Hadoop and the Map- Reduce framework. DESCRIPTION Hadoop i......一起来看看 《Hadoop in Action》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具