内容简介:前言:到现在这个阶段,网上关于RxJava2源码分析的文章已经满天飞了,我写这篇文章的目的并不是说我会分析的比他们好,比他们透彻,这篇文章的目的只是单纯的记录自己分析RxJava2源码的成功及收获。对于一个编程人的技术成长,一般会经历三个阶段,首先是学会使用开源库,然后是知道开源库的原理,最后就是自己写一个开源库。虽然在日常的开发中使用RxJava2已经达到了得心应手的地步,但是不了解具体的原理,总感觉有点虚。于是就想静下心来,好好的分析一下RxJava源码,达到不仅知其然更知其所以然的地步。下图是分析Rx
前言:到现在这个阶段,网上关于RxJava2源码分析的文章已经满天飞了,我写这篇文章的目的并不是说我会分析的比他们好,比他们透彻,这篇文章的目的只是单纯的记录自己分析RxJava2源码的成功及收获。
概述
对于一个编程人的技术成长,一般会经历三个阶段,首先是学会使用开源库,然后是知道开源库的原理,最后就是自己写一个开源库。虽然在日常的开发中使用RxJava2已经达到了得心应手的地步,但是不了解具体的原理,总感觉有点虚。于是就想静下心来,好好的分析一下RxJava源码,达到不仅知其然更知其所以然的地步。
下图是分析RxJava基本流程后,画的UML图,对于已经分析过源码的大神,可以看下图画的是否正确,对于没有分析过源码的人,可以看下,先有个映像,然后再跟着文章的内容,一点点的理解。 (点击图片查看大图)
源码分析
先看RxJava2基础用法的代码
private void basicUseRxJava() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.e("wizardev", "onNext: "+s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } 复制代码
以上代码,只是RxJava2的基本使用,并没有涉及任何的操作符代码,下面我们就按方法顺序开始分析源码。
create方法分析
看下 create()
方法的代码
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { //1、判空 ObjectHelper.requireNonNull(source, "source is null"); //2、 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
从以上代码可以看出, create
方法的返回值类型是 Observable
,参数是 ObservableOnSubscribe<T>
,可以先看下这个 ObservableOnSubscribe
类,源码如下
public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * @param emitter the safe emitter instance, never null * @throws Exception on error */ void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; } 复制代码
可以发现 ObservableOnSubscribe
类是一个接口,里面有一个 subscribe
方法 。现在继续看 create
方法中的代码,在“1”处代码是判断传入的参数是否为空。这里主要看下“2”处,这句 RxJavaPlugins.onAssembly
其实是一个Hook方法,**“2”处代码实质就是 return new ObservableCreate<T>(source);
,**不信的话,可以看下 onAssembly
方法,如下
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } 复制代码
经调试, onObservableAssembly
为null,所以上面的代码就直接返回了 new ObservableCreate<T>(source)
。
现在看下 ObservableCreate
类,如下
public final class ObservableCreate<T> extends Observable<T> { //1、全局变量 final ObservableOnSubscribe<T> source; //2、构造方法中将source赋值 public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } //3、这个方法是在调用subscribe方法才调用的 @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } //...省略部分代码 } 复制代码
从上面的代码可以知道, ObservableCreate
类继承自 Observable
,在实例化的时候将 create
方法中的 ObservableOnSubscribe<T> source
参数注入了进来,作为成员变量 source
。
结论
通过分析 Observable
类的 create
方法,可以有以下结论:
-
create
方法的返回值类型是Observable
; -
create
方法的参数的类型是接口; -
create
方法实际返回的是ObservableCreate
类,而ObservableCreate
类是Observable
的子类; - 在实例化
ObservableCreate
类的时候将create
的方法的参数注入到了ObservableCreate
类中,作为它的成员变量source
。
这里重点看下第4个结论,在这里 create
方法的参数实际就是下面的代码
new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } } 复制代码
subscribe方法分析
分析完了 create
方法,接着来分析 subscribe
方法,其方法代码如下
public final void subscribe(Observer<? super T> observer) { //1、判空 ObjectHelper.requireNonNull(observer, "observer is null"); try { //2、Hook方法,实质就是observer observer = RxJavaPlugins.onSubscribe(this, observer); //判空 ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); //4、重点, subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } 复制代码
这里重点看下“4”处, 这里调用了 Obeservable
的 subscribeActual
方法,可以看下 Obeservable
类中的这个方法,如下
protected abstract void subscribeActual(Observer<? super T> observer); 复制代码
这个方法是抽象的,实际调用的是它子类中的方法,通过上文的分析,我们知道 ObservableCreate
就 Obeservable
类的子类,所以,这里调用的实际就是 ObservableCreate
类中的 subscribeActual
方法。现在,我们再看下这个方法中的代码,如下
@Override protected void subscribeActual(Observer<? super T> observer) { //1、实例化CreateEmitter CreateEmitter<T> parent = new CreateEmitter<T>(observer); //2、回调方法 observer.onSubscribe(parent); try { //3、回调方法 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
我们一步步的分析这个方法中的代码,先看“1”处的代码,这里实例化了 CreateEmitter
这个类,在实例化的同时将 observer
传了进去。看下 CreateEmitter
这个类的代码,如下
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { //...省略部分代码 final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } //...省略部分代码 } 复制代码
通过上面的代码,可以发现 CreateEmitter
这个类实现了 ObservableEmitter
这个接口,而这个接口是 ObservableOnSubscribe
接口中 subscribe
方法的参数,是不是发现什么了?现在继续往下看,看下“2”处的代码,这里回调了 Observer
的 onSubscribe
方法,分析到这里,可以得出下面的结论
onSubscribe()回调所在的线程是ObservableCreate执行subscribe()所在的线程,和subscribeOn()/observeOn()无关!
重点来了,这里看下“3”处的代码,还记得 source
是谁吗?**它就是执行 Observable.create
方法时,我们注入给 ObservableCreate
类的成员变量,是 ObservableOnSubscribe
接口的实例。**这里调用的 subscribe
方法,实际就是下面代码的 subscribe
方法,
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } 复制代码
这段代码中的 subscribe
方法的参数实质就是 CreateEmitter
,调用的 onNext
方法就是 CreateEmitter
类中的 onNext
方法。继续看下 CreateEmitter
类中的 onNext
方法,代码如下
@Override public void onNext(T t) { //1、判断传入的参数是否为null if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { //2、调用Observer中的onNext方法 observer.onNext(t); } } 复制代码
分析到这里,就可以得出以下结论了
subscribe方法中发射器所调用的onNext方法,如果代码没有出错的话,最终调用的就是Observer中的onNext方法。
分析 CreateEmitter
中的其他方法,还可以知道为什么 Observer
中的 onError
和 onComplete
方法只有一个会回调的原因了,原因就是无论调用的是哪一个方法都会调用dispose()方法取消订阅。
结论
对 Observable.subscribe
方法的分析可以得出以下结论
-
subscribe
方法最终调用了ObservableCreate
类中的subscribeActual
方法。 -
subscribeActual
方法中,实例化了发射器,并开始发射数据。 -
subscribe
方法中发射器所调用的onNext
方法,如果代码没有出错的话,最终调用的就是Observer
接口中的onNext
方法。
总结
通过对RxJava基本流程的源码分析,是不是对RxJava的原理有了更清晰的认识呢?分析完之后,我们再看下这张图,是不是感觉现在看起来就明白多了呢?
结束语
想要了解一些开源库的原理,我们必须要阅读其源码,只有从源码中才能得到想要的答案,才能对库的原理有更清晰的认识。
再说下,阅读开源库的注意事项,阅读源码时,我们最好带着问题来阅读,阅读前先有个目标,比如我这次阅读要搞懂什么问题,然后再开始阅读,不然就会很容易在茫茫代码中迷失。还有就是不要想着每句代码都搞懂,搞懂与自己想要获取的答案有关的代码即可。
转载请注明出处:www.wizardev.cn
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。