内容简介:前言:到现在这个阶段,网上关于RxJava2源码分析的文章已经满天飞了,我写这篇文章的目的并不是说我会分析的比他们好,比他们透彻,这篇文章的目的只是单纯的记录自己分析RxJava2源码的成功及收获。对于一个编程人的技术成长,一般会经历三个阶段,首先是学会使用开源库,然后是知道开源库的原理,最后就是自己写一个开源库。虽然在日常的开发中使用RxJava2已经达到了得心应手的地步,但是不了解具体的原理,总感觉有点虚。于是就想静下心来,好好的分析一下RxJava源码,达到不仅知其然更知其所以然的地步。下图是分析Rx
前言:到现在这个阶段,网上关于RxJava2源码分析的文章已经满天飞了,我写这篇文章的目的并不是说我会分析的比他们好,比他们透彻,这篇文章的目的只是单纯的记录自己分析RxJava2源码的成功及收获。
概述
对于一个编程人的技术成长,一般会经历三个阶段,首先是学会使用开源库,然后是知道开源库的原理,最后就是自己写一个开源库。虽然在日常的开发中使用RxJava2已经达到了得心应手的地步,但是不了解具体的原理,总感觉有点虚。于是就想静下心来,好好的分析一下RxJava源码,达到不仅知其然更知其所以然的地步。
下图是分析RxJava基本流程后,画的UML图,对于已经分析过源码的大神,可以看下图画的是否正确,对于没有分析过源码的人,可以看下,先有个映像,然后再跟着文章的内容,一点点的理解。 (点击图片查看大图)
源码分析
先看RxJava2基础用法的代码
private void basicUseRxJava() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("wizardev", "onNext: "+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
复制代码
以上代码,只是RxJava2的基本使用,并没有涉及任何的操作符代码,下面我们就按方法顺序开始分析源码。
create方法分析
看下 create() 方法的代码
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//1、判空
ObjectHelper.requireNonNull(source, "source is null");
//2、
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码
从以上代码可以看出, create 方法的返回值类型是 Observable ,参数是 ObservableOnSubscribe<T> ,可以先看下这个 ObservableOnSubscribe 类,源码如下
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
复制代码
可以发现 ObservableOnSubscribe 类是一个接口,里面有一个 subscribe 方法 。现在继续看 create 方法中的代码,在“1”处代码是判断传入的参数是否为空。这里主要看下“2”处,这句 RxJavaPlugins.onAssembly 其实是一个Hook方法,**“2”处代码实质就是 return new ObservableCreate<T>(source); ,**不信的话,可以看下 onAssembly 方法,如下
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
复制代码
经调试, onObservableAssembly 为null,所以上面的代码就直接返回了 new ObservableCreate<T>(source) 。
现在看下 ObservableCreate 类,如下
public final class ObservableCreate<T> extends Observable<T> {
//1、全局变量
final ObservableOnSubscribe<T> source;
//2、构造方法中将source赋值
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//3、这个方法是在调用subscribe方法才调用的
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//...省略部分代码
}
复制代码
从上面的代码可以知道, ObservableCreate 类继承自 Observable ,在实例化的时候将 create 方法中的 ObservableOnSubscribe<T> source 参数注入了进来,作为成员变量 source 。
结论
通过分析 Observable 类的 create 方法,可以有以下结论:
-
create方法的返回值类型是Observable; -
create方法的参数的类型是接口; -
create方法实际返回的是ObservableCreate类,而ObservableCreate类是Observable的子类; - 在实例化
ObservableCreate类的时候将create的方法的参数注入到了ObservableCreate类中,作为它的成员变量source。
这里重点看下第4个结论,在这里 create 方法的参数实际就是下面的代码
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}
复制代码
subscribe方法分析
分析完了 create 方法,接着来分析 subscribe 方法,其方法代码如下
public final void subscribe(Observer<? super T> observer) {
//1、判空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//2、Hook方法,实质就是observer
observer = RxJavaPlugins.onSubscribe(this, observer);
//判空
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//4、重点,
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
复制代码
这里重点看下“4”处, 这里调用了 Obeservable 的 subscribeActual 方法,可以看下 Obeservable 类中的这个方法,如下
protected abstract void subscribeActual(Observer<? super T> observer); 复制代码
这个方法是抽象的,实际调用的是它子类中的方法,通过上文的分析,我们知道 ObservableCreate 就 Obeservable 类的子类,所以,这里调用的实际就是 ObservableCreate 类中的 subscribeActual 方法。现在,我们再看下这个方法中的代码,如下
@Override
protected void subscribeActual(Observer<? super T> observer) {
//1、实例化CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2、回调方法
observer.onSubscribe(parent);
try {
//3、回调方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
我们一步步的分析这个方法中的代码,先看“1”处的代码,这里实例化了 CreateEmitter 这个类,在实例化的同时将 observer 传了进去。看下 CreateEmitter 这个类的代码,如下
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//...省略部分代码
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
//...省略部分代码
}
复制代码
通过上面的代码,可以发现 CreateEmitter 这个类实现了 ObservableEmitter 这个接口,而这个接口是 ObservableOnSubscribe 接口中 subscribe 方法的参数,是不是发现什么了?现在继续往下看,看下“2”处的代码,这里回调了 Observer 的 onSubscribe 方法,分析到这里,可以得出下面的结论
onSubscribe()回调所在的线程是ObservableCreate执行subscribe()所在的线程,和subscribeOn()/observeOn()无关!
重点来了,这里看下“3”处的代码,还记得 source 是谁吗?**它就是执行 Observable.create 方法时,我们注入给 ObservableCreate 类的成员变量,是 ObservableOnSubscribe 接口的实例。**这里调用的 subscribe 方法,实际就是下面代码的 subscribe 方法,
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
复制代码
这段代码中的 subscribe 方法的参数实质就是 CreateEmitter ,调用的 onNext 方法就是 CreateEmitter 类中的 onNext 方法。继续看下 CreateEmitter 类中的 onNext 方法,代码如下
@Override
public void onNext(T t) {
//1、判断传入的参数是否为null
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//2、调用Observer中的onNext方法
observer.onNext(t);
}
}
复制代码
分析到这里,就可以得出以下结论了
subscribe方法中发射器所调用的onNext方法,如果代码没有出错的话,最终调用的就是Observer中的onNext方法。
分析 CreateEmitter 中的其他方法,还可以知道为什么 Observer 中的 onError 和 onComplete 方法只有一个会回调的原因了,原因就是无论调用的是哪一个方法都会调用dispose()方法取消订阅。
结论
对 Observable.subscribe 方法的分析可以得出以下结论
-
subscribe方法最终调用了ObservableCreate类中的subscribeActual方法。 -
subscribeActual方法中,实例化了发射器,并开始发射数据。 -
subscribe方法中发射器所调用的onNext方法,如果代码没有出错的话,最终调用的就是Observer接口中的onNext方法。
总结
通过对RxJava基本流程的源码分析,是不是对RxJava的原理有了更清晰的认识呢?分析完之后,我们再看下这张图,是不是感觉现在看起来就明白多了呢?
结束语
想要了解一些开源库的原理,我们必须要阅读其源码,只有从源码中才能得到想要的答案,才能对库的原理有更清晰的认识。
再说下,阅读开源库的注意事项,阅读源码时,我们最好带着问题来阅读,阅读前先有个目标,比如我这次阅读要搞懂什么问题,然后再开始阅读,不然就会很容易在茫茫代码中迷失。还有就是不要想着每句代码都搞懂,搞懂与自己想要获取的答案有关的代码即可。
转载请注明出处:www.wizardev.cn
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python Algorithms
Magnus Lie Hetland / Apress / 2010-11-24 / USD 49.99
Python Algorithms explains the Python approach to algorithm analysis and design. Written by Magnus Lie Hetland, author of Beginning Python, this book is sharply focused on classical algorithms, but it......一起来看看 《Python Algorithms》 这本书的介绍吧!