内容简介:Maxiee 的 RxJava 学习指南 (2)
在上一篇中主要对官方文档进行了学习.
如果只看文档里的知识而不去实践, 自己只是知道而非理解, 必须要在实际项目中进行实践, 才能得到真知.
我采取的办法就是找一个实际的开源项目来学习, 最终我选择了 RxJava-Android-Samples 这个项目.
RxJava-Android-Samples
选择 RxJava-Android-Samples 这个项目的原因通过它的介绍可以看出:
This is a repository with real-world useful examples of using RxJava with Android.
这不正是我们需要的吗! 感谢 kaushikgopal 的无私奉献.
下面我们通过对这个项目的源码阅读, 来学习 RxJava 的实践知识.
整个代码结构十分清晰, 在 MainFragment 上点击对应的按钮进入对应 Demo 的 Fragment.
后续的文章中会针对代码的要点进行讲解, 因此建议将这个项目 Clone 下来结合文章一起看, 否则可能会觉得有些跳跃, 但是结合代码看就很容易明白了 .
后台任务与并发
本文中, 我们学习 RxJava-Android-Samples 的第一个 Demo -- 后台任务与并发, 它位于 ConcurrencyWithSchedulersDemoFragment
.
这个页面的功能是点击一个按钮, 然后会在后台做一个延时操作 (以定时器模拟), 待定时操作完成后, 在界面中打一段 Log.
这部分代码大体流程如下图所示:
下面来具体分析.
创建 Observable
创建 Observable 的代码位于 _getObservable()
方法, 它的代码为:
private Observable<Boolean> _getObservable() { return Observable.just(true) .map( aBoolean -> { _log("Within Observable"); _doSomeLongOperation_thatBlocksCurrentThread(); return aBoolean; }); }
其中:
- 源 Observable 发出一个 true 数据
- 数据传入 map 运算符
- map 运算符的
_doSomeLongOperation_thatBlocksCurrentThread()
方法里执行了一个阻塞线程的耗时操作 - 当执行完毕后, 返回
aBoolean
也就是传递数据
_doSomeLongOperation_thatBlocksCurrentThread()
的代码为:
private void _doSomeLongOperation_thatBlocksCurrentThread() { _log("performing long operation"); try { Thread.sleep(3000); } catch (InterruptedException e) { Timber.d("Operation was interrupted"); } }
功能就是一个阻塞线程 3s.
从中我们可以看出, 这个 map 操作符里面, 就是执行耗时操作的地方.
创建 Observer
创建 Observer 的代码位于 _getDisposableObserver() 方法中, 它的代码为:
private DisposableObserver<Boolean> _getDisposableObserver() { return new DisposableObserver<Boolean>() { @Override public void onComplete() { _log("On complete"); _progress.setVisibility(View.INVISIBLE); } @Override public void onError(Throwable e) { Timber.e(e, "Error in RxJava Demo concurrency"); _log(String.format("Boo! Error %s", e.getMessage())); _progress.setVisibility(View.INVISIBLE); } @Override public void onNext(Boolean bool) { _log(String.format("onNext with return value \"%b\"", bool)); } }; }
其中, onComplete
, onError
和 onNext
都比较易懂, 这里有一个问题就是 DisposableObserver
是什么? Disposable
是什么意思?
DisposableObserver
DisposableObserver 的 在线注释文档 中做出了说明:
DisposableObserver 是一个抽象的 Observer, 它通过实现了 Disposable 接口允许 异步取消 .
现在知道 DisposableObserver 是对被观察者做了什么扩展了, 它支持 异步取消 .
所有 DisposableObserver 预先实现的 final 方法都是线程安全的.
取消 DisposableObserver 需要调用它的 dispose()
方法, 这个方法既可以在 DisposableObserver 之外调用, 也可以在它的 onNext
里面调用, 实现自己取消自己.
我的第一反应就是 DisposableObserver 很适合配合 Fragment 的生命周期释放资源, 这里的确也是这样用的, 这个放在后面部分再说.
建立订阅关系
有了被观察者, 也有了观察者, 下面要做的就是建立订阅关系, 具体的代码为:
_getObservable() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(d);
这里有一个线程切换的操作. 这里我把上一篇中梳理的规则再摆过来, 作为参考:
-
subscribeOn
出现的顺序位置没有关系, 只要指定了subscribeOn
, 那么被观察者就在它指定的线程上运行 - 在整个链条中
subscribeOn
只调用一次 -
observeOn
可以多次调用, 每调用一次, 链条下游的就切换一次线程
参照着规则看代码, 就理解了.
除此之外, 我的第一印象是, 订阅成功后, 会返回一个订阅对象, 但看代码并没有返回. 这是为什么呢? 通过查阅 文档 , 原来这是 RxJava 2 的一点变化,
订阅 (Subscription) 的变化
在 RxJava 1 中, rx.Subscription
负责流和资源的生命周期管理.
在 RxJava 2 中 Subscription 这个名称改作他用了, Reactive-Streams 规范使用这个名字专门用于表示源于消费者之间的交互点 (interaction point), 类名为 org.reactivestreams.Subscription
, 允许请求一个正的数量值并允许取消序列.
既然 Subscription 这个名称被占用了, 在 RxJava 2 中原本的订阅概念改了一个新名字 io.reactivex.Disposable
.
看到这里, 就更加理解 DisposableObserver 的意思了, 原来 DisposableObserver 中的那个异步取消, 就是新的取消订阅关系.
变化中还有一点, 是原来的 CompositeSubscription
改名为 CompositeDisposable
, 这个东东在下面会用到.
生命周期管理
上一节建立订阅关系后, 点击按钮就能够正常执行异步操作了. 这一节主要讨论生命周期.
上一节建立订阅关系后, 在后面紧跟着的还有一行代码:
_disposables.add(d);
_disposables
这个成员的定义:
private CompositeDisposable _disposables = new CompositeDisposable();
这就是上一节最后提到的.
这样, 当建立一个订阅关系后, 会把 DisposableObserver 添加到 CompositeDisposable 中, 等到 Fragment 生命周期结束时, 一并清掉:
@Override public void onDestroy() { super.onDestroy(); _disposables.clear(); }
这个 clear 里面的内部实现就是对所有添加进去的 Disposable 挨个执行 dispose 进行取消操作.
从而实现 Fragment 对多个订阅关系进行统一生命周期管理, 在生命周期结束时统一取消的逻辑.
总结
在这一篇中, 完成了对第一个 Demo 的学习.
通过学习, 对很多概念有了更加全面的认识.
这一篇中主要包含了以下重点内容:
- 建立基本的观察者-被观察者订阅关系
-
用 RxJava 来执行异步操作, 可以替代 AsyncTask
- Fragment 结合 CompositeDisposable 进行多个订阅关系的生命周期统一管理
在下一篇中, 我将继续对剩下 Demo 进行代码阅读, 希望有了前两篇的基础, 学习的脚步能够更快一些.
微信公众号
我开通了微信公众号: MaxieeTalk
, 欢迎订阅, 及时收到我的最新技术文章~
打赏
如果这篇文章对您有所帮助, 欢迎扫描下面支付宝二维码请我喝杯可乐~
您的打赏将会极大提高我的写作热情, 激励我更新更多更好的博客.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Cypherpunks
Julian Assange、Jacob Appelbaum、Andy Müller-Maguhn、Jérémie Zimmermann / OR Books / 2012-11 / GBP 8.99
Cypherpunks are activists who advocate the widespread use of strong cryptography (writing in code) as a route to progressive change. Julian Assange, the editor-in-chief of and visionary behind WikiLea......一起来看看 《Cypherpunks》 这本书的介绍吧!