内容简介:深入理解RxJava中的观察者模式
对于Android开发者来说,RxJava应该是不陌生的,如果读者不了解RxJava是什么,推荐两篇文章给大家
通过这两篇文章相信你能对RxJava有一个初步的了解,这篇文章不打算介绍RxJava的具体使用方式,因为我觉得使用方式在文档中其实写的很清楚,这些属于“术”的层面,站在更高层次去理解RxJava的原理和核心内容才是“道”,只有“道”和“术”相结合才能真正的掌握RxJava,本篇文章介绍RxJava中的观察者模式。
RxJava中观察者模式
首先,我们来对RxJava做一个简单的定义和总结,来说明RxJava是什么,引用官方文档的一句话
Rx = Observables + LINQ + Schedulers
Rx本质上是一种编程模型,这种模型具有以下三个特性:
- Observables 使用观察者模式来实现
- LINQ 具有LINQ的特性,最主要其实使用了Lambda表达式
- Schedulers 线程调度器
而RxJava 本质上就是使用 Java 语言实现了Rx的编程模型,通过RxJava解决了Android开发中的回调地狱,线程切换,最重要的是让业务的可扩展性更强,业务代码书写更规范。 今天我们来聊聊Rx模型的第一个特性- 观察者模式 。
要想理解RxJava中的观察者模式我们先了解一下RxJava中的几个简单的概念:
- Observer/Subscriber
- Observable
Observer:英文翻译为观察者,顾名思义它是一个观察者对象(Observer)或者称为订阅者(Subscriber),A对象订阅了某个事件,A对象观察了某个事件。A就是观察者。
Observable既然有观察者,那必然有被观察者,Observable就是被观察者,或者更专业一点说: 可观察事对象(Observable)
一个观察者(Observer)订阅一个可观察对象(Observable),观察者对Observable发射的数据或数据序列作出响应。 这句话其实也解释了为什么Rx被称为响应式编程 。
举一个很简单的例子来对RxJava是如何串联这几个对象做到响应式编程的,
假设我们需要订阅A地的天气状况,当A地天气发生变化或者气温下降的时候通知到对应的订阅者。
RxJava2中是这样处理的,
Observable.create(new ObservableOnSubscribe<Weather>() { @Override public void subscribe(ObservableEmitter<Weather> observableEmitter) throws Exception { if (!observableEmitter.isDisposed()) { Weather weather = WeatherHelper.getCityWeather("A"); if(weather != null){ observableEmitter.onNext(weather); } else { observableEmitter.onError(new Exception("获取A地天气失败")); } observableEmitter.onComplete(); } } }).map(new Function<Weather, Weather>() { @Override public Weather apply(Weather weather) throws Exception { // 各种变换 return weather; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<Weather>() { @Override public void accept(Weather weather) throws Exception { // 处理成功获取天气后的回调 } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { // 处理成功获取天气失败的回调 } }, new Action() { @Override public void run() throws Exception { // 结束方法 } });
使用输入输出流表示
上面代码省略了对数据的变换和组合操作,做最基础的代码分析。 分析上面的过程发现
new ObservableOnSubscribe()
对应的是Observable的可观察对象;
new Consumer()
对应的是观察者对象,Rx在观察者的基础上扩展了2个对象 new Consumer() / new Action() 分别用于处理异常和一次任务完成的回调处理;使用了泛型的方式来处理不同可观察对象的数据传递。
从代码层面来分析他是如何把各个对象组合成观察者模式的: 大概的图示如下:
为什么图这样的?
- Observable对象创建了一个可观察对象ObservableOnSubscribe
- 中间观察对象数据做各种变换处理(这个代码中省略),本质上还是返回了一个ObservableOnSubscri对象
- subscribeOn和observeOn对线程做简单的调度处理
- Observable对象subscribe了Observer
- 观察者主动去拉取可观察者的数据,
观察者主动去拉取可观察者的数据,这一点是从源码角度分析得出的,
@SchedulerSupport("none") public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); this.subscribeActual(observer); } catch (NullPointerException var4) { throw var4; } catch (Throwable var5) { Exceptions.throwIfFatal(var5); RxJavaPlugins.onError(var5); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(var5); throw npe; } }
简单来理解就是A().B().C().subscribe(xxx);A().B().C()依然返回了一个 Observable对象,最终执行了subscribe方法来,源码中可以看到
subscribeActual(observer);
方法最终拉取可观察者的数据,也就是执行了ObservableOnSubscribe的subscribe方法。
至此,我们有发现有几个问题无法解释:
- 为什么是Observable对象订阅了Observer,传统的观察者模式不应该是Observer订阅Observable吗?
- 为什么是Observer去主动拉取事件信息,而不是可观察对象主动推动呢?是否有违观察者模式的定义?
观察者模式的定义和变种
聊到这里必须理清一下什么是观察者模式,观察者模式真的有我们所理解的这么简单吗?我们先看看维基百科关于观察者模式的简洁
观察者模式是软件 设计模式 的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实时事件处理系统。
注意这一句话“ 并且在它本身的状态改变时主动发出通知 ”,这是官方的设计模式UML图,实际上在观察者模式中还存在着另外一种观察模式,那就是拉取模型:
- 推模型,事件通知源主动发起消息推送,订阅者自动接收数据
- 拉模型,订阅者主动去拉取事件通知
拉取模型和推送模型唯一的区别在于notify方法的触发是依赖于订阅者主动发起通知,事件源才会把数据推送过来的。在执行notify方法的时候,会主动请求事件源的数据:
@Override public void notify(Subject subject) { // 主动去事件源里拿数据 State state = ((ConcreteSubjectA) subject).getState(); System.out.println(getName() + "观察者状态更新为:" + state); }
这就是推模型的实现;在Android中典型如消息推动,如果App正常接受消息,那么消息通知是推动过去的,如果App没有启动,那么App会主动去消息服务端拉取未接收的消息。
理解RxJava中的观察者模式
明白了观察者模式的2种基本模型,我们来理解一下前面的2个问题
- 为什么是Observable对象订阅了Observer,传统的观察者模式不应该是Observer订阅Observable吗?
- 为什么是Observer去主动拉取事件信息,而不是可观察对象主动推动呢?是否有违观察者模式的定义?
第一层意义:subscribe方法本质上是一个注册方法,所以这就解释了第一个问题,并不是Observable对象订阅了Observer,而是向Observable注册了一个观察者(Observer); 第二层意义:subscribe方法在注册了一个观察者的同时,向事件源拉进行消息的拉取
@SchedulerSupport("none") public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); this.subscribeActual(observer); } catch (NullPointerException var4) { throw var4; } catch (Throwable var5) { Exceptions.throwIfFatal(var5); RxJavaPlugins.onError(var5); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(var5); throw npe; } }
subscribeActual就是向事件源拉取消息,类似于前面提到的getState()方法。只不过这个方法不直接返回值,而是通过Consumer回调的方式返回。
至此,整个过程就非常清晰了:
创建一个事件源(creat方法)-》对数据源做变换(Map..)-》通过subscribe方法注册一个观察者,并执行拉取数据操作-》线程变换-》通过回调返回数据结果
总结
RxJava中对观察者模式的理解是最关键的,理解了观察者模式后,才能对RxJava的整个设计有本质的了解,知其然更要知其所以然,subscribe方法的两层含义需要结合源码仔细体会,然后在实践中不断的加深理解,才能做到融会贯通。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
深入React技术栈
陈屹 / 人民邮电出版社 / 2016-11-1 / CNY 79.00
全面讲述React技术栈的第一本原创图书,pure render专栏主创倾力打造 覆盖React、Flux、Redux及可视化,帮助开发者在实践中深入理解技术和源码 前端组件化主流解决方案,一本书玩转React“全家桶” 本书讲解了非常多的内容,不仅介绍了面向普通用户的API、应用架构和周边工具,还深入介绍了底层实现。此外,本书非常重视实战,每一节都有实际的例子,细节丰富。我从这......一起来看看 《深入React技术栈》 这本书的介绍吧!