内容简介:前面提到 Coroutine 是轻量级的线程,并不意味着就不消耗系统资源。 当异步操作比较耗时的时候,或者当异步操作出现错误的时候,需要把这个 Coroutine 取消掉来释放系统资源。在 Android 环境中,通常每个界面(Activity、Fragment 等)启动的 Coroutine 只在该界面有意义,如果用户在等待 Coroutine 执行的时候退出了这个界面,则再继续执行这个 Coroutine 可能是没必要的。另外 Coroutine 也需要在适当的 context 中执行,否则会出现错误,
前面提到 Coroutine 是轻量级的线程,并不意味着就不消耗系统资源。 当异步操作比较耗时的时候,或者当异步操作出现错误的时候,需要把这个 Coroutine 取消掉来释放系统资源。在 Android 环境中,通常每个界面(Activity、Fragment 等)启动的 Coroutine 只在该界面有意义,如果用户在等待 Coroutine 执行的时候退出了这个界面,则再继续执行这个 Coroutine 可能是没必要的。另外 Coroutine 也需要在适当的 context 中执行,否则会出现错误,比如在非 UI 线程去访问 View。 所以 Coroutine 在设计的时候,要求在一个范围(Scope)内执行,这样当这个 Scope 取消的时候,里面所有的子 Coroutine 也自动取消。所以要使用 Coroutine 必须要先创建一个对应的 CoroutineScope
。
在前一节示例中使用的是 GlobalScope 来创建 Coroutine,后面会详细解释。
CoroutineScope 接口
CoroutineScope 是一个接口,要是查看这个接口的源代码的话就发现这个接口里面只定义了一个属性 CoroutineContext
:
// 这个接口定义的简单的不像话!
public interface CoroutineScope {
// Scope 的 Context
public val coroutineContext: CoroutineContext
}
所以 CoroutineScope 只是定义了一个新 Coroutine 的执行 Scope。每个 coroutine builder
都是 CoroutineScope 的扩展函数,并且自动的继承了当前 Scope 的 coroutineContext
和取消操作。
每个 coroutine builder
和 scope 函数(withContext、coroutineScope 等)都使用自己的 Scope 和 自己管理的 Job 来运行提供给这些函数的代码块。并且也会等待该代码块中所有子 Coroutine 执行,当所有子 Coroutine 执行完毕并且返回的时候, 该代码块才执行完毕,这种行为被称之为 “structured concurrency”。
一般而言,在应用中具有生命周期的组件应该实现 CoroutineScope 接口,并负责该组件内 Coroutine 的创建和管理。例如对于 Android 应用来说,可以在 Activity 中实现 CoroutineScope 接口, 例如:
class ScopedActivity : Activity(), CoroutineScope {
lateinit var job: Job
// CoroutineScope 的实现
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
}
override fun onDestroy() {
super.onDestroy()
// 当 Activity 销毁的时候取消该 Scope 管理的 job。
// 这样在该 Scope 内创建的子 Coroutine 都会被自动的取消。
job.cancel()
}
/*
* 注意 coroutine builder 的 scope, 如果 activity 被销毁了或者该函数内创建的 Coroutine
* 抛出异常了,则所有子 Coroutines 都会被自动取消。不需要手工去取消。
*/
fun loadDataFromUI() = launch { // <- 自动继承当前 activity 的 scope context,所以在 UI 线程执行
val ioData = async(Dispatchers.IO) { // <- launch scope 的扩展函数,指定了 IO dispatcher,所以在 IO 线程运行
// 在这里执行阻塞的 I/O 耗时操作
}
// 和上面的并非 I/O 同时执行的其他操作
val data = ioData.await() // 等待阻塞 I/O 操作的返回结果
draw(data) // 在 UI 线程显示执行的结果
}
}
由于所有的 Coroutine 都需要一个 CoroutineScope,所以为了方便创建 Coroutine,在 CoroutineScope 上有很多扩展函数,比如 launch、async、actor、cancel 等。
GlobalScope
而 GlobalScope 是 CoroutineScope 的一个单例实现,其代码也是非常简单的:
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
在 Kotlin 中单例对象是通过 object 关键字实现的,上面的实现说明 GlobalScope 对象是 CoroutineScope 的一个实例,该实例所用的 CoroutineContext 是一个 EmptyCoroutineContext 实例(这也是一个单例 object 对象)。由于 GlobalScope 对象没有和应用生命周期组件相关联,需要自己管理 GlobalScope 所创建的 Coroutine,所以一般而言我们不直接使用 GlobalScope 来创建 Coroutine。前一节中使用了 GlobalScope 来创建 Coroutine 就是一种不推荐的做法。
在 Android 中会经常需要实现这个 CoroutineScope,所以为了方便开发者使用, 标准库中定义了一个 MainScope()
函数,该函数定义了一个使用 SupervisorJob
和 Dispatchers.Main
为 Scope context 的实现。所以上面的代码可以简化为:
@ExperimentalCoroutinesApi
// 由于 MainScope() 还没有正式发布,所以添加这个注解来避免 IDE 的警告
class ScopedActivity : Activity(),
CoroutineScope by MainScope(){ // 使用 by 指定代理实现
override fun onDestroy() {
super.onDestroy()
cancel() // 调用 CoroutineScope 的 cancel 函数
}
CoroutineScope 接口定义的虽然很简单,但是 CoroutineScope 上面定义了很多扩展函数是使用 Coroutine 的基础,所以下面来继续看看一些常用的扩展函数。
CoroutineScope.newCoroutineContext 扩展函数
这个扩展函数的实现大概如下:
fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
combined + Dispatchers.Default else combined
}
如果看实际的代码会比上面的复杂一点, 是因为里面包含了调试用的功能。
newCoroutineContext 这个函数就干了两件事,把参数 context 和当前的 coroutineContext 合并,然后判断合并后的 CoroutineContext 对象是否设定的有 dispatcher 或者 ContinuationInterceptor ,如果都没有的话,就在 CoroutineContext 对象上设定一个 Dispatchers.Default dispatcher。
CoroutineScope.launch 扩展函数
launch 扩展函数用来创建一个不阻塞当前线程的 Coroutine,返回一个 Job 对象来管理这个 Coroutine 实例。调用 Job.cancel 函数可以取消这个 Coroutine 的执行。
这个函数一共有三个参数,函数定义如下:
fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job
第一个参数 context,默认 launch 所创建的 Coroutine 会自动继承当前 Coroutine 的 context,如果有额外的 conetxt 需要传递给所创建的 Coroutine 则可以通过第一个参数来设置。
第二个参数 start 为 CoroutineStart 枚举类型,用来指定 Coroutine 启动的选项。有如下几个取值:
– DEFAULT (默认值)立刻安排执行该Coroutine实例
– LAZY 延迟执行,只有当用到的时候才执行
– ATOMIC 类似 DEFAULT,区别是当Coroutine还没有开始执行的时候无法取消
– UNDISPATCHED 如果使用 Dispatchers.Unconfined dispatcher,则立刻在当前线程执行直到遇到第一个suspension point 。然后当 Coroutine 恢复的时候,在继续在 suspension的 context 中设置的 CoroutineDispatcher 中执行。
第三个参数 block 为一个 suspending function,这个就是 Coroutine 中要执行的代码块,在实际使用过程中通常使用 lambda 表达式,也称之为 Coroutine 代码块。需要注意的是,这个 block 函数定义为 CoroutineScope 的扩展函数,所以在代码块中可以直接访问 CoroutineScope 对象(也就是 this 对象)
下面的示例演示了 start 为 LAZY 时候的行为,当调用 job.start() 后这个 Coroutine 才开始执行:
@ObsoleteCoroutinesApi
val BG = newSingleThreadContext("Background")
@ExperimentalCoroutinesApi
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
lateinit var job: Job;
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 设置 start 参数为 LAZY,这样 Coroutine 不会自动安排执行,当点击 FAB 的时候,调用 job.start 后才会执行
job = launch(start = CoroutineStart.LAZY) {
Log.d(TAG, "launch coroutine : ${Thread.currentThread().name}")
background()
textView.text = Date().toLocaleString()
}
fab.setOnClickListener { view ->
// 启动 job 这个 Coroutine 实例
job.start()
}
}
suspend fun background() {
withContext(BG) {
Thread.sleep(3000)
Log.d(TAG, "background() called ${Thread.currentThread().name}")
}
}
override fun onDestroy() {
super.onDestroy()
cancel()
}
如果把上面的
job = launch(start = CoroutineStart.LAZY)
修改为
launch(Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED)
则该Coroutine 会立刻在当前线程(Main 线程)执行,然后当遇到 background 函数后被暂停,由于 background 函数内切换到了 BG
这个 context 来继续执行(从 Main 线程切换到了后台线程),所以当 Coroutine 从 background 恢复的时候,继续使用 BG
这个线程执行,所以这个时候在非 UI 线程不能访问 textView,程序执行到这里会导致应用 Crash, 并抛出如下异常:
android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.
如果在Coroutine(或者子Coroutine)里面发生了未捕获异常(Uncaught exceptions )则这个Coroutine(或者父Coroutine)默认会被取消。所以出现异常后面的代码就不会执行了。
Job
CoroutineScope.launch 函数返回一个 Job 对象,该对象代表了这个刚刚创建的 Coroutine实例,job 对象有不同的状态(刚创建的状态、活跃的状态、执行完毕的状态、取消状态等),通过这个 job 对象可以控制这个 Coroutine 实例,比如调用 cancel 函数可以取消执行。 后面会再详细介绍 Job 对象,目前我们只需要知道这个代表 launch 创建的一个 Coroutine 实例即可。
launch 函数创建的 Coroutine 并没法返回 Coroutine 返回的值,如果遇到需要使用 Coroutine 返回值的情况应该如何办呢? 这种情况就需要使用另外一个 coroutine builder
函数了。
CoroutineScope.async
async 函数也是三个参数,参数类型和 launch 一样,唯一的区别是第三个block参数会返回一个值,而 async 函数的返回值为 Deferred
所以 async 和 launch 函数的区别就是, async 执行的代码块有返回值,通过 Deferred 对象可以获取到这个代码块异步执行后的结果。
Deferred 继承自 Job,所以通过 Deferred 也可以和 Job 一样来控制这个 Coroutine。 Deferred 类似于 Java 里面的 future。 通过调用 deferred 对象的 await() 函数来等待异步结果的返回。
CoroutineScope.produce
async 函数只能返回一个结果,如果遇到返回一系列数据的情况,则就需要使用 produce 这个 coroutine builder
函数了。
由于在编程世界中,生产者-消费者(producer-consumer )模式是如此的常见,所以 Coroutine 提供了 produce 这个函数。这个函数的定义和上面两个有很大的区别:
fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E>
第一个参数一样还是 CoroutineContext,而
第二个参数为生产者和消费者之间传输数据的通道(Channel)的缓冲数量,默认为0 表示没有缓冲,
第三个参数为生产者代码块,这是一个带有 suspend 标识符的异步代码块。注意这个代码块是 ProducerScope 的一个扩展函数,ProducerScope 继承了 CoroutineScope 和 SendChannel 两个接口, SendChannel 接口定义了生产者往通道(Channel)发送数据的send(e:E) 函数。
produce 的返回值是一个 ReceiveChannel 对象,通过 ReceiveChannel的 receive() 函数可以从通道中接收生产者发送的数据。
为了方便接收并处理数据,标准库中也提供了一些 ReceiveChannel 的扩展函数,比如 consume、consumeEachIndexed 等。
下面是一个使用 produce 的示例:
@ObsoleteCoroutinesApi
val BG = newSingleThreadContext("Background")
@ExperimentalCoroutinesApi
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val capacity = 0
// 修改这个值为非零,可以通过log看到缓冲的作用
val p = produce(BG, capacity) {
for (x in 1..10) {
Log.d(TAG, "onCreate() called $x")
// 用来模拟生产者和消费者产生数据不一致的情况,
// 这样可以看出 capacity 参数的作用
delay(1000)
send(x * x)
}
}
launch {
// 通过consumeEachIndexed 扩展函数来接收通道中的数据
p.consumeEachIndexed {
delay(3000)
Log.e(TAG, "onCreate: consume ${it.index}")
textView.text = "Index ${it.index}, Value ${it.value}"
}
}
fab.setOnClickListener { view ->
// 点击 FAB 按钮可以取消这个 produce
p.cancel()
}
}
通过 consume 或者 consumeEach 扩展函数处理数据的用法如下:
p.consume {
for (e in this) textView.text = "Value $e"
}
p.consumeEach {
textView.text = "Value $it"
}
需要注意的是,Coroutine 目前只有一些基础的 API 是稳定发布的,比如 launch,这个 produce 函数目前被标记为 @ExperimentalCoroutinesApi,所以这些 api 细节以后可能还会出现变化,大家具体使用的时候,可以参考其 API 文档了解这些变化。 使用 @ExperimentalCoroutinesApi api的时候, IDE 会给出明显的提示,后面介绍的其他实验性的 API 就不再特殊说明了。
prduce 函数的返回值 ReceiveChannel 只能有一个消费者来接收数据,对于有多个消费者同时可以接受数据的情况,就需要使用 broadcast 这个函数了。
CoroutineScope.broadcast
broadcast 这个函数的定义如下:
fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY, onCompletion: CompletionHandler? = null, block: suspend ProducerScope<E>.() -> Unit ): BroadcastChannel<E>
这个函数和前面的几个函数都不太一样, 参数 capacity 同样为通道的缓冲大小,但是这个值不可以设置为0. 参数 onCompletion 可以用来监听生产者是否完成,当生产者Coroutine执行完毕,则会回调这个函数对象。 block 代码块是 ProducerScope 的扩展函数,所以在这个代码块中可以直接调用 ProducerScope 的函数来像通道中发送数据。 这个函数的返回值是 BroadcastChannel 对象,通过该对象来操作这个 Coroutine。
注意 start 参数的默认值为 LAZY,所以默认情况下,只有当有消费者通过 BroadcastChannel.openSubscription 函数订阅数据的时候, block 代码块才开始执行。 openSubscription() 函数返回一个 ReceiveChannel 对象,这样就可以通过该对象来接收数据了。 下面是一个示例:
@ExperimentalCoroutinesApi
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val capacity = 5 // broadcast 中 capacity 不可以为0
val onComplete: CompletionHandler = { Log.e(TAG, "CompletionHandler: $it") }
val p = broadcast(BG, capacity, onCompletion = onComplete) {
for (x in 1..10) {
Log.d(TAG, "onCreate() called $x")
// 用来模拟生产者和消费者产生数据不一致的情况,
// 这样可以看出 capacity 参数的作用
delay(1000)
send(x * x)
}
}
// 下面两个消费者通过 openSubscription 来订阅数据
launch {
// 通过consumeEachIndexed 扩展函数来接收通道中的数据
p.openSubscription().consumeEachIndexed {
delay(3000)
Log.e(TAG, "onCreate: consume ${it.value} , index ${it.index}")
textView.text = "Value ${it.value}"
}
}
launch {
p.openSubscription().consumeEach {
Log.e(TAG, "onCreate: consumeEach $it")
}
}
fab.setOnClickListener { view ->
p.cancel()
}
}
下面是上面代码执行的 Log,注意看 onComplete 和 capacity 的作用:
03-02 13:51:46.972 16352-16376 D onCreate() called 1 03-02 13:51:47.975 16352-16376 D onCreate() called 2 03-02 13:51:47.976 16352-16352 E onCreate: consumeEach 1 03-02 13:51:48.976 16352-16376 D onCreate() called 3 03-02 13:51:48.978 16352-16352 E onCreate: consumeEach 4 03-02 13:51:49.978 16352-16352 E onCreate: consumeEach 9 03-02 13:51:49.978 16352-16376 D onCreate() called 4 03-02 13:51:50.978 16352-16352 E onCreate: consume 1 , index 0 03-02 13:51:50.979 16352-16376 D onCreate() called 5 03-02 13:51:50.981 16352-16352 E onCreate: consumeEach 16 03-02 13:51:51.980 16352-16376 D onCreate() called 6 03-02 13:51:51.981 16352-16352 E onCreate: consumeEach 25 03-02 13:51:52.982 16352-16376 D onCreate() called 7 03-02 13:51:52.983 16352-16352 E onCreate: consumeEach 36 03-02 13:51:53.982 16352-16352 E onCreate: consume 4 , index 1 03-02 13:51:53.984 16352-16352 E onCreate: consumeEach 49 03-02 13:51:53.984 16352-16376 D onCreate() called 8 03-02 13:51:54.985 16352-16376 D onCreate() called 9 03-02 13:51:54.986 16352-16352 E onCreate: consumeEach 64 03-02 13:51:56.985 16352-16352 E onCreate: consume 9 , index 2 03-02 13:51:56.987 16352-16352 E onCreate: consumeEach 81 03-02 13:51:56.988 16352-16376 D onCreate() called 10 03-02 13:51:59.990 16352-16352 E onCreate: consume 16 , index 3 03-02 13:51:59.992 16352-16352 E onCreate: consumeEach 100 03-02 13:51:59.993 16352-16376 E CompletionHandler: null 03-02 13:52:02.994 16352-16352 E onCreate: consume 25 , index 4 03-02 13:52:05.999 16352-16352 E onCreate: consume 36 , index 5 03-02 13:52:09.000 16352-16352 E onCreate: consume 49 , index 6 03-02 13:52:12.003 16352-16352 E onCreate: consume 64 , index 7 03-02 13:52:15.007 16352-16352 E onCreate: consume 81 , index 8 03-02 13:52:18.012 16352-16352 E onCreate: consume 100 , index 9
除了上面这几个分别用作不同场景的函数以外, CoroutineScope 还有另外一个扩展函数 actor, 这个函数是用来在 Coroutine 内处理外部发送过来的消息的,这个函数只能应用到 Java 环境,另外还有一个只能应用到 JavaScript环境的 promise 扩展函数,返回的是一个 Promise 对象。 这两个函数我们暂时先不介绍,等后面遇到了再详细说说。
总结
在 Android 环境中使用 Coroutine 的话, 为了保证Coroutine所占用的资源能够及时回收,需要在 CoroutineScope 中来启动其他的 Coroutine,然后在 UI 合适的生命周期中取消 Coroutine 来释放资源。
所以在 Android 的 Activity 或者 Fragment 以及其他具有生命周期的组件中使用 Coroutine的时候,一般使用下面的模式:
@ExperimentalCoroutinesApi
// 由于 MainScope() 还没有正式发布,所以添加这个注解来避免 IDE 的警告
class ScopedActivity : Activity(),
CoroutineScope by MainScope(){ // 使用 by 指定代理实现
override fun onDestroy() {
super.onDestroy()
cancel() // 调用 CoroutineScope 的 cancel 函数
}
这样就可以直接使用上面介绍的各种类型的 coroutine builder
函数来创建各种用途的 Coroutine 了。
launch
、 produce
、 async
、 broadcast
这四个 coroutine builder
将会是经常使用的函数。
下面是 CoroutineScope API DOC地址
CoroutineScope 接口 API 文档地址: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
GlobalScope 接口 API 文档地址: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-global-scope/index.html
以上所述就是小编给大家介绍的《掌握Kotlin Coroutine之 CoroutineScope》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Head First Web Design
Ethan Watrall、Jeff Siarto / O’Reilly Media, Inc. / 2009-01-02 / USD 49.99
Want to know how to make your pages look beautiful, communicate your message effectively, guide visitors through your website with ease, and get everything approved by the accessibility and usability ......一起来看看 《Head First Web Design》 这本书的介绍吧!