内容简介:RxJava是使用Java VM实现的响应式编程库:一个通过使用可观察序列来编写异步和基于事件的程序的库。它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的运算符,同时抽象出对低级线程、线程同步、线程安全和并发数据结构等问题的关注。RxJava存在1.x版本和2.x版本,API改动较大,接入方法基本不兼容,但实现思路类似,做了一定的优化,下面是官方对RxJava2的介绍。
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
RxJava是使用Java VM实现的响应式编程库:一个通过使用可观察序列来编写异步和基于事件的程序的库。
它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的运算符,同时抽象出对低级线程、线程同步、线程安全和并发数据结构等问题的关注。
发展历史
RxJava存在1.x版本和2.x版本,API改动较大,接入方法基本不兼容,但实现思路类似,做了一定的优化,下面是官方对RxJava2的介绍。
Version 2.x (Javadoc)
- single dependency: Reactive-Streams
-
continued support for Java 6+ & Android 2.3+
-
performance gains through design changes learned through the 1.x cycle and through Reactive-Streams-Commons research project.
-
Java 8 lambda-friendly API
-
non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc)
-
async or synchronous execution
-
virtual time and schedulers for parameterized concurrency
-
单一依赖:Reactive-Streams
-
继续支持 Java 6+和android 2.3+
-
通过1.x和Reactive-Streams-Commons项目的积累,实现设计变更,提高性能。
-
友好地支持Java 8 lambda表达式
-
灵活的处理并发,包括threads, pools, event loops, fibers, actors等
-
同步或异步操作
-
为参数化的并发设计了调度器
源码地址
既然都点到这篇文章里了,难道还不下载源码看看吗??
项目地址: github.com/ReactiveX/R…
这里1.x版本和2.x版本在不同的分支上
1.x基本使用
public static void main(String[] args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("test"); subscriber.onCompleted(); } } }).subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext:" + s); } }); } 复制代码
拆分一下,分为两部分
- 构建一个Observable对象
- 调用Observable对象的subscribe方法
// 通过Observable的create静态方法,传入一个OnSubscribe对象 // 这个OnSubscribe对象附带了一个call方法,用于回调 // 整个create方法返回了一个Observable对象的实例 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("test"); subscriber.onCompleted(); } } }); 复制代码
// 调用observable的subscribe方法,传入一个Observer对象, // 这个Observer对象附带了三个回调方法 // 通过这里subscribe方法调用上面Observable.OnSubscribe对象中的call方法 // 再在call方法中的subscriber对象调用这里Observer中的onNext,onCompleted方法 observable.subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext:" + s); } }); 复制代码
上面的程序打印出的效果是:
onNext:test
onCompleted
1.x源码解析
从Observable.create方法开始
@Deprecated public static <T> Observable<T> create(OnSubscribe<T> f) { // 接收一个OnSubscribe参数,调用构造函数返回一个Observable // 1)、先看Observable的构造函数做了什么 // 2)、再看OnSubscribe类是怎样的 return new Observable<T>(RxJavaHooks.onCreate(f)); } 复制代码
Observable的构造函数
// Obserbable类中存在一个final变量是OnSubscribe类型的 final OnSubscribe<T> onSubscribe; // 构造方法将传入的OnSubscribe对象赋值给局部变量onSubscribe保存起来 protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; } 复制代码
再来看OnSubscribe类,其实就是一个包含有一个call方法的类,在基本使用的第2步,调用observable对象subscribe方法时触发。
/** * 这里的OnSubscribe是一个接口,继承子Action1接口 * * 在Observable.subscribe方法被调用的时候执行 * 这里的subscribe是第2部分,如果印象模糊可以回头看一下上面的基本使用部分 * @param <T> the output value type */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity } /** * 一个call方法,observable对象subscribe方法时触发 * 用上面的基本使用的例子,这里的call携带的范型T就是Subscriber的实例subscriber * 上面call方法就是调用subscribe中的onNext方法和onCompleted * 可以看出这里的onNext和onCompleted就是基本使用例子第二步的subscribe方法传入Observer对象的方法 * * 继续跟Action和 Action继承的Function都没有实现 * @param <T> the first argument type */ public interface Action1<T> extends Action { void call(T t); } 复制代码
可以看出例子第1步实例Observable.OnSubscribe对象的时候实现了一个call方法,call方法传入的参数就是Subscriber实例,方法内调用的onNext方法和onCompleted方法执行了第2步中Observer实例的对应方法,所以这里可以猜测Observer是一个接口,Subscriber实现了Observer。我们来看一下这两个类。
/** * 很熟悉的三连,自己实现,完成业务 */ public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T t); } /** * 和我们的猜测一致,Subscriber实现了Observer */ public abstract class Subscriber<T> implements Observer<T>, Subscription { ... } 复制代码
到这里应该对整体有一个理解了,我们再来看一下observable.subscribe方法,结束后在进行总结。
public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } // 这里我门传入的是new Observer,直接进入下面部分 return subscribe(new ObserverSubscriber<T>(observer)); } ... public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... // 方法比较长,这里只分析重要部分 // 这里的RxJavaHooks.onObservableStart方法返回了observable.onSubscribe // 再调用observable类中的局部变量onSubscribe的call方法,具体的实现就是我们再例子中实现的call方法,调用了subscriber.onNext和subscriber.onCompleted RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } // RxJavaHooks.onObservableStart分析 public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart; if (f != null) { // 会进入该方法,最后返回的就是Observable构造函数中存储的局部变量onSubscribe,不要将这个call和我门自己实现的call方法弄混了 return f.call(instance, onSubscribe); } return onSubscribe; } 复制代码
其实分析到这里就结束了,将整个调用链连接了起来
我们再来看一下关于RxJavaHooks.onObservableStart分析,
分析这里的返回值就是Observable构造函数中存储的局部变量onSubscribe。
对RxJavaHooks.onObservableStart方法进行断点调试,进入f!=null 判断。
// Func2是一个接口,我们继续使用断点调试查看具体的实现 public interface Func2<T1, T2, R> extends Function { R call(T1 t1, T2 t2); } 复制代码
// 通过断点调试找到call方法的实现 onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() { @Override public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) { return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2); } }; 复制代码
// 返回了第二个参数 @Deprecated public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) { // pass through by default return onSubscribe; } 复制代码
最后返回了第二个参数,就是observable.onSubscribe。
1.x总结
第一步:在调用create方法构造了一个Observable对象,并且在Observable对象的构造方法中,将局部变量onSubscribe赋值,该onSubscribe实现了call方法等待被回调,call方法中提供了一个subscriber实例,该实例实现了Observer,有onCompleted,onError,onNext方法可以进行调用。
第二步:执行Observable实例的subscribe方法,传入了Observer对象,实现了onCompleted,onError,onNext方法。再通过 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
方法回调到了第一步中OnSubscribe的call方法,这里的subscriber方法就是第二步中实例的Observer。
整体就是在第一步中写了一个回调,等待第二步subscribe方法调起。第二步中的Observer实现了一个回调的三个方法,供第一步中的回调函数内调用。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 华为分析服务 5.3.0 版本发布
- tensorflow版本SSD网络源码分析
- TrickBot银行木马最新版本分析报告
- 神策分析 1.13 版本上线,神策产品深耕打造场景化数据分析
- UEditor SSRF漏洞(JSP版本)分析与复现
- ThinkPHP 5.0.x 版本远程代码执行漏洞分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
集创思维设计矩阵
慈思远 / 电子工业出版社 / 2017-4 / 72.00元
《集创思维设计矩阵——写给互联网人的设计指南》总结了作者从业7年以来的设计经历,在大量企业所面对的设计问题基础上,提出了枪型思维,即如何给产品更准确的定位。 在定位准确的基础上加以设计,提出了设计中高维度融合低维度的设计思维,即设计者可以从商业逻辑推演到设计逻辑,让设计更加精确;又提出了设计和计算的博弈,指出在每一步创新的基础上,设计者一定要清晰地评判设计的代价。这样设计后的产品才是可以和企......一起来看看 《集创思维设计矩阵》 这本书的介绍吧!