内容简介:RxJava是使用Java VM实现的响应式编程库:一个通过使用可观察序列来编写异步和基于事件的程序的库。它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的运算符,同时抽象出对低级线程、线程同步、线程安全和并发数据结构等问题的关注。RxJava存在1.x版本和2.x版本,API改动较大,接入方法基本不兼容,但实现思路类似,做了一定的优化,下面是官方对RxJava2的介绍。
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
RxJava是使用Java VM实现的响应式编程库:一个通过使用可观察序列来编写异步和基于事件的程序的库。
它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的运算符,同时抽象出对低级线程、线程同步、线程安全和并发数据结构等问题的关注。
发展历史
RxJava存在1.x版本和2.x版本,API改动较大,接入方法基本不兼容,但实现思路类似,做了一定的优化,下面是官方对RxJava2的介绍。
Version 2.x (Javadoc)
- single dependency: Reactive-Streams
-
continued support for Java 6+ & Android 2.3+
-
performance gains through design changes learned through the 1.x cycle and through Reactive-Streams-Commons research project.
-
Java 8 lambda-friendly API
-
non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc)
-
async or synchronous execution
-
virtual time and schedulers for parameterized concurrency
-
单一依赖:Reactive-Streams
-
继续支持 Java 6+和android 2.3+
-
通过1.x和Reactive-Streams-Commons项目的积累,实现设计变更,提高性能。
-
友好地支持Java 8 lambda表达式
-
灵活的处理并发,包括threads, pools, event loops, fibers, actors等
-
同步或异步操作
-
为参数化的并发设计了调度器
源码地址
既然都点到这篇文章里了,难道还不下载源码看看吗??
项目地址: github.com/ReactiveX/R…
这里1.x版本和2.x版本在不同的分支上
1.x基本使用
public static void main(String[] args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("test"); subscriber.onCompleted(); } } }).subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext:" + s); } }); } 复制代码
拆分一下,分为两部分
- 构建一个Observable对象
- 调用Observable对象的subscribe方法
// 通过Observable的create静态方法,传入一个OnSubscribe对象 // 这个OnSubscribe对象附带了一个call方法,用于回调 // 整个create方法返回了一个Observable对象的实例 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("test"); subscriber.onCompleted(); } } }); 复制代码
// 调用observable的subscribe方法,传入一个Observer对象, // 这个Observer对象附带了三个回调方法 // 通过这里subscribe方法调用上面Observable.OnSubscribe对象中的call方法 // 再在call方法中的subscriber对象调用这里Observer中的onNext,onCompleted方法 observable.subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext:" + s); } }); 复制代码
上面的程序打印出的效果是:
onNext:test
onCompleted
1.x源码解析
从Observable.create方法开始
@Deprecated public static <T> Observable<T> create(OnSubscribe<T> f) { // 接收一个OnSubscribe参数,调用构造函数返回一个Observable // 1)、先看Observable的构造函数做了什么 // 2)、再看OnSubscribe类是怎样的 return new Observable<T>(RxJavaHooks.onCreate(f)); } 复制代码
Observable的构造函数
// Obserbable类中存在一个final变量是OnSubscribe类型的 final OnSubscribe<T> onSubscribe; // 构造方法将传入的OnSubscribe对象赋值给局部变量onSubscribe保存起来 protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; } 复制代码
再来看OnSubscribe类,其实就是一个包含有一个call方法的类,在基本使用的第2步,调用observable对象subscribe方法时触发。
/** * 这里的OnSubscribe是一个接口,继承子Action1接口 * * 在Observable.subscribe方法被调用的时候执行 * 这里的subscribe是第2部分,如果印象模糊可以回头看一下上面的基本使用部分 * @param <T> the output value type */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity } /** * 一个call方法,observable对象subscribe方法时触发 * 用上面的基本使用的例子,这里的call携带的范型T就是Subscriber的实例subscriber * 上面call方法就是调用subscribe中的onNext方法和onCompleted * 可以看出这里的onNext和onCompleted就是基本使用例子第二步的subscribe方法传入Observer对象的方法 * * 继续跟Action和 Action继承的Function都没有实现 * @param <T> the first argument type */ public interface Action1<T> extends Action { void call(T t); } 复制代码
可以看出例子第1步实例Observable.OnSubscribe对象的时候实现了一个call方法,call方法传入的参数就是Subscriber实例,方法内调用的onNext方法和onCompleted方法执行了第2步中Observer实例的对应方法,所以这里可以猜测Observer是一个接口,Subscriber实现了Observer。我们来看一下这两个类。
/** * 很熟悉的三连,自己实现,完成业务 */ public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T t); } /** * 和我们的猜测一致,Subscriber实现了Observer */ public abstract class Subscriber<T> implements Observer<T>, Subscription { ... } 复制代码
到这里应该对整体有一个理解了,我们再来看一下observable.subscribe方法,结束后在进行总结。
public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } // 这里我门传入的是new Observer,直接进入下面部分 return subscribe(new ObserverSubscriber<T>(observer)); } ... public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... // 方法比较长,这里只分析重要部分 // 这里的RxJavaHooks.onObservableStart方法返回了observable.onSubscribe // 再调用observable类中的局部变量onSubscribe的call方法,具体的实现就是我们再例子中实现的call方法,调用了subscriber.onNext和subscriber.onCompleted RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } // RxJavaHooks.onObservableStart分析 public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart; if (f != null) { // 会进入该方法,最后返回的就是Observable构造函数中存储的局部变量onSubscribe,不要将这个call和我门自己实现的call方法弄混了 return f.call(instance, onSubscribe); } return onSubscribe; } 复制代码
其实分析到这里就结束了,将整个调用链连接了起来
我们再来看一下关于RxJavaHooks.onObservableStart分析,
分析这里的返回值就是Observable构造函数中存储的局部变量onSubscribe。
对RxJavaHooks.onObservableStart方法进行断点调试,进入f!=null 判断。
// Func2是一个接口,我们继续使用断点调试查看具体的实现 public interface Func2<T1, T2, R> extends Function { R call(T1 t1, T2 t2); } 复制代码
// 通过断点调试找到call方法的实现 onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() { @Override public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) { return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2); } }; 复制代码
// 返回了第二个参数 @Deprecated public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) { // pass through by default return onSubscribe; } 复制代码
最后返回了第二个参数,就是observable.onSubscribe。
1.x总结
第一步:在调用create方法构造了一个Observable对象,并且在Observable对象的构造方法中,将局部变量onSubscribe赋值,该onSubscribe实现了call方法等待被回调,call方法中提供了一个subscriber实例,该实例实现了Observer,有onCompleted,onError,onNext方法可以进行调用。
第二步:执行Observable实例的subscribe方法,传入了Observer对象,实现了onCompleted,onError,onNext方法。再通过 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
方法回调到了第一步中OnSubscribe的call方法,这里的subscriber方法就是第二步中实例的Observer。
整体就是在第一步中写了一个回调,等待第二步subscribe方法调起。第二步中的Observer实现了一个回调的三个方法,供第一步中的回调函数内调用。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 华为分析服务 5.3.0 版本发布
- tensorflow版本SSD网络源码分析
- TrickBot银行木马最新版本分析报告
- 神策分析 1.13 版本上线,神策产品深耕打造场景化数据分析
- UEditor SSRF漏洞(JSP版本)分析与复现
- ThinkPHP 5.0.x 版本远程代码执行漏洞分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Compilers
Alfred V. Aho、Monica S. Lam、Ravi Sethi、Jeffrey D. Ullman / Addison Wesley / 2006-9-10 / USD 186.80
This book provides the foundation for understanding the theory and pracitce of compilers. Revised and updated, it reflects the current state of compilation. Every chapter has been completely revised ......一起来看看 《Compilers》 这本书的介绍吧!