More of RxJava

栏目: Java · 发布时间: 5年前

内容简介:这是一系列文章的第一篇,这个系列没有什么非常明确的主题,可以看做是日常开发和协作里悟出来的一些技术相关的事情——或者说是技术方面的随笔。这一篇是关于曾经看到这么一篇文章,称赞 Kotlin Coroutine 大好,RxJava 可以直接丢弃了;也曾经看到一篇文章(事实上正是我刚开始学习 RxJava 的时候看的),称 RxJava 的本质就是异步。

这是一系列文章的第一篇,这个系列没有什么非常明确的主题,可以看做是日常开发和协作里悟出来的一些技术相关的事情——或者说是技术方面的随笔。

这一篇是关于 RxJava

曾经看到这么一篇文章,称赞 Kotlin Coroutine 大好,RxJava 可以直接丢弃了;也曾经看到一篇文章(事实上正是我刚开始学习 RxJava 的时候看的),称 RxJava 的本质就是异步。

如果把这两篇文章放在一起看,那么似乎 “合理”:Kotlin Coroutine 解决了 JVM 世界里过去的 Callback 形式的异步回调,而 RxJava 的本质就是处理异步,那么 RxJava 自然不需要了,用 Kotlin Coroutine 就行了。

上面这段话的表述是有问题的。

这引出我下面要说的一部分:

RxJava 不只是异步

先放一段大家熟悉的代码:

getUser("id").subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : SingleObserver<UnsplashUser> {
            override fun onSuccess(user: UnsplashUser) {
                nameTextView.text = user.name
            }
            override fun onSubscribe(d: Disposable) {
                // do on subscribe
            }
            override fun onError(e: Throwable) {
                e.printStackTrace()
            }
        })

getUser() 返回一个 Single<UnsplashUser> ,里面可能是网络请求,又或者是查询数据库,但这不重要,重要的是这两句话:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

第一句是把订阅的操作放到 IO 线程里执行,然后在主线程里观察:因此我们能在 onSuccess 方法里给 TextViewtext 属性赋值。

相信上述代码大家一定在项目里用过,甚至把中间的切换线程的操作放到一个 Transformer<T> 里,实现自己的 Observer 简化一些通用的处理,理所当然地把 RxJava 等同于异步。

但是 RxJava 真的非要异步吗?试想一下,如果在调用的时候,完全没有 subscribeOnobserveOn ,那么这样算什么呢?

你可能会回答:

这情况就不该用 RxJava 了。

这可以说对,同时也可以说不对。

首先,RxJava 是一个模式,一个建立在上游发射数据,然后被下游观察的观察者模式,在上游发出数据后,你可以做一些数据转换(Map),可以接到另一个流(FlatMap),可以让他经过你自定义的转换器(Transformer),当然也可以指定订阅和观察的线程(SubscribeOn & ObserveOn),只使用其中一部分也是没问题。

一个合适的例子是 RxBus,跟 EventBus不一样的是,它是基于 RxJava 实现的,是站在 RxJava 的肩膀上的。当然它可以切换线程,但这个不重要:这是一个上游发射数据,下游接收数据然后做对应处理的好例子。

Rx 的世界,不只有 Observable

自 RxJava2 开始,就支持了 Flowable , Single , Maybe , Completable 这几个类型,跟 Observable 相比,他们表示的意思更明确:

  • Flowable:支持背压的 Observable
  • Single:成功发射一个 item 或者失败
  • Maybe:可能发射一个 item,或者直接完成,或者失败
  • Completable:完成或者失败

为什么需要知道这个?我真的需要知道是该用 Observable 还是 Single 吗?

如果所有代码都是你自己写的,那么你可能觉得区分与否都没所谓:一个获取用户信息的网络请求 API,返回 Observable / Flowable / Single 都是一样的,对于前两者,你只会关心 onNext (而且只期望发生一次),对于后者,你会只关心 onSuccess ,同时错误处理都在 onError 里处理。所以,为什么不直接使用 Single 呢?

如果你是写给别人服务的 API,那么区分这几种类型还是很有必要的:通过返回的类型能简单地告知调用方这个流应该是什么类型,不明确的类型(比如一律是 Observable)可能让调用方在 Observer 处理事件的时候里做一些无用功。

不要跟 Callback 混用

看一下这样的代码:

interface Callback {
    fun onSuccess(value: Int)
    fun onError(t: Throwable)
}

private fun foo(callback: Callback?) {
    Single.just(0)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : SingleObserver<Int> {
            override fun onSuccess(t: Int) {
                callback?.onSuccess(t)
            }
            override fun onSubscribe(d: Disposable) {
            }
            override fun onError(e: Throwable) {
                callback?.onError(e)
            }
        })
}

fun main() {
    foo(object : Callback {
        override fun onSuccess(value: Int) {
        }
        override fun onError(t: Throwable) {
        }
    })
}

万不得已不要跟 Callback  混用。每调用一次就多产生一个对象(这个可能是最无关紧要的理由,毕竟使用 Rx 意味着产生的对象就比较多了),变成 Callback 风格对调用方来说可能 并没有什么好处 ;相反,如果返回的类型是 Rx 的,那么调用方在调用的时候就可以考虑使用 flatMap 的形式跟上游连接起来。

同时,正如第一点说的,Rx 的本质并不是异步,因此同样地 万不得已 不要在方法里面指定调度器(也就是不要使用 subscribeOn)。如果你有观察 Rx 内部的 public 方法,你可以看到基本上都有这么一段注释:

* <dl>
*  <dt><b>Scheduler:</b></dt>
*  <dd>{@code defer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>

这表示内部没有指定调度器执行。

上面之所以说 万不得已 ,是因为有一些 API 需要在特定的线程上处理,比如 delay 方法默认在 Computation 调度器执行的:

*  <dt><b>Scheduler:</b></dt>
*  <dd>{@code interval} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>

所以对于写 SDK 的同学来说,请尽可能地把调度器的指定权交给调用方,这样既能减少测试的粒度,又能让 API 设计得更灵活。

一个例子:

fun main() {
    val uri = Uri.parse("")
    decodeBitmap(uri)
        .flatMap {
            saveToInternalStorage(it)
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(object : SingleObserver<File> {
            override fun onSuccess(t: File) {
            }
            override fun onSubscribe(d: Disposable) {
            }
            override fun onError(e: Throwable) {
            }
        })
}
private fun saveToInternalStorage(bm: Bitmap): Single<File> {
    return Single.create { e ->
        val file = File(getExternalFilesDir(Environment.DIRECTORY_PICTURES), "${System.nanoTime()}.jpg")
        val fos = FileOutputStream(file)
        try {
            fos.use {
                bm.compress(Bitmap.CompressFormat.JPEG, 100, it)
            }
            e.onSuccess(file)
        } catch (t: Throwable) {
            e.onError(t)
        }
    }
}
private fun decodeBitmap(uri: Uri): Single<Bitmap> {
    return Single.create<Bitmap> { e ->
        val fd = contentResolver.openFileDescriptor(uri, "r")?.fileDescriptor ?: kotlin.run {
            e.onError(RuntimeException("Failed to open fd"))
            return@create
        }
        val bm = BitmapFactory.decodeFileDescriptor(fd) ?: kotlin.run {
            e.onError(RuntimeException("Failed to decode bitmap"))
            return@create
        }
        e.onSuccess(bm)
    }
}

善用「起死回生」

好吧这里并不是真的什么「起死回生」。

有几个方法你可能会忽略,拿 Single 为例子:

More of RxJava

这几个方法分别可以在遇到错误的时候(前提是 onError 能执行),接上一个 Single 流,或者直接发射一个 item。如果你的 onError 和 onSuccess 里的处理很类似的话,那么这些方法将可能帮助你简化 Observer 里处理的代码。

同样地,Observable 和其他也有类似的代码,对于 Observable 来说,可以忽略其中一些 item 的错误,然后转化为正确的或者经过特殊装饰的 item 发射出去。

当然,onErrorxxx 里面也是可以抛异常的,如果在里面抛异常了,那么依然能在 onError 里处理这个异常——略有不同的是,这个异常是 CompositeException ,你需要 getCause() 才能获得真正的异常。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

PHP and MySQL Web Development

PHP and MySQL Web Development

Luke Welling、Laura Thomson / Sams / July 25, 2007 / $49.99

Book Description PHP and MySQL Web Development teaches you to develop dynamic, secure, commerical Web sites. Using the same accessible, popular teaching style of the three previous editions, this b......一起来看看 《PHP and MySQL Web Development》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具