Coroutines不仅与并发有关
#kotlin #android #并发性 #coroutines

每当您听到有关Kotlin Coroutines的信息时,您可能会考虑一个简单,简洁且性能的解决方案,用于处理异步任务,例如网络请求。但这是他们的唯一目的吗?让我们考虑一下Kotlin Coroutines的使用范围以外的并发。

Kotlin Coroutines原语

让我们从理解,基础机制的基础如何起作用。如果我们看一下Kotlin Coroutines的原语,那只是几个课程和功能:

  • Continuation
  • CoroutineContext
  • suspendCoroutine
  • createCoroutine
  • startCoroutine

这就是我们在kotlin-stdlib中拥有的全部。但是他们用什么?让我们更深入地了解Kotlin Coroutines如何设计工作。

Continuation

延续只是两个成员的接口:contextresumeWith

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

它做什么,其目的是什么? Continuation实际上是 coroutine 。这就像一个抛光回调,具有传播其他信息并为Coroutine执行的有用结果。

我们尚未谈论CoroutineContext,现在只考虑resumeWith

resumeWith

像其他任何回调一样,这是Coroutine完成其工作时所谓的功能。它使用kotlin.Result以安全的方式传播Coroutine内部发生的任何例外。

因此,我们可以在下一个方法中使用Continuation从字面上创建我们的并发逻辑:

suspend fun executeNetworkRequest(): String = 
    suspendCoroutine { continuation ->
        thread {
            continuation.resumeWith(someBlockingRequest())
        }
    }

suspendCoroutine是Kotlin Coroutines代码和非coroutine代码之间的桥梁。

或使用现有的异步(伪代码):

suspend fun executeNetworkRequest(): String =
    suspendCoroutine { continuation ->
        apiService.getSomething()
            // onSuccess & onFailure is a callback
            .onSuccess(continuation::resume)
            .onFailure(continuation::resumeWithException)
            .executeAsync()
    }

Continuation<in T>.resume(..)是避免每次通过kotlin.Result的扩展。

因此,我们不仅可以实现我们的并发逻辑,还可以使用现有并与Kotlin Coroutines一起使用。

startCoroutine

此外,我们可以使用startCoroutine开始暂停从非悬浮上下文中暂停功能。在Kotlin中,如果您的main函数为suspend

,它总是在最后使用。

kotlinx.coroutines还使用它来运行coroutines,但是那里的机制当然要困难得多。

import kotlin.coroutines.*

fun main() {
    val operation: suspend () -> Unit = ::a
    operation.startCoroutine(
        object : Continuation<Unit> {
            // we will talk about it lower
            override val coroutineContext = EmptyCoroutineContext

            // called when coroutine finished its work
            override fun resumeWith(result: Result<Unit>) {
                println(
                    if(result.isSuccess) 
                        "Successfully done" 
                    else "Error happenned: ${result.exceptionOrNull()}"
                )
            }
        }
    )
}

suspend fun a() {...}

但是,当然,您不能仅在以这种方式执行Coroutines时从kotlinx.coroutines调用暂停功能。

CoroutineContext

现在,我们来到了Continuation CoroutineContext的另一名成员。这是什么?

在编程中,我们经常立即处理多个任务,并且有效地管理它们可能具有挑战性。 Kotlin的CoroutineContext有助于应对这一挑战。这是一个提供者,可以传播到Coroutine。在现实世界中,通常是关于将参数传递到复杂的Coroutines链中。

更清楚,kotlinx.coroutines中的Coroutinecontext代表structured concurrency

简单示例

让我们从以前的startCoroutine代码示例中创建,能够从CoroutineContext检索值:

import kotlin.coroutines.*

// define our container for data we need inside coroutine
// it should always inherit 'CoroutineContext.Element'
data class ExecutionContext(val someInt: Int) : CoroutineContext.Element {
    override val key = EXECUTION_CONTEXT_KEY
}

// define type-safe key using which we will get our value
val EXECUTION_CONTEXT_KEY = object : CoroutineContext.Key<ExecutionContext> {}

// define coroutine context that we will pass into Continuation
private val myCoroutineContext = object : CoroutineContext {
    private val values = mapOf<CoroutineContext.Key<*>, CoroutineContext.Element>(
        EXECUTION_CONTEXT_KEY to ExecutionContext(10000)
    )

    override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
        return values[key] as? E
    }

    // .. we omit other functions for simplicity
}

suspend fun a() {
    // here we retrieve value from coroutine context
    // coroutineContext is compiler intrinsic, can be called
    // only from suspend world
    val executionContext = coroutineContext[EXECUTION_CONTEXT_KEY]!!
    println(executionContext.someInt!!)
}

fun main() {
    val operation: suspend () -> Unit = ::a
    operation.startCoroutine(
        object : Continuation<Unit> {
            override val context: CoroutineContext = myCoroutineContext

            override fun resumeWith(result: Result<Unit>) {
                println(
                    if(result.isSuccess) 
                        "Successfully done" 
                    else "Error happenned: ${result.exceptionOrNull()}"
                )
            }
        }
    )
}

CoroutineContext.Element是用于存储CoroutineContext

内存储元素的摘要

CoroutineContext.KeyCoroutineContext.Element的标识符。

您可以使用此代码here

真实项目示例

让我们想象我们拥有API服务。通常,我们需要拥有一定的授权,所以让我们考虑下一个示例(至于这样,我服用了GRPC):

// let's define an element that will persist in `CoroutineContext`
data class AuthorizationContext(
    val accessHash: String?,
    val provider: AuthorizationProvider,
) : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<AuthorizationContext>

    override val key: CoroutineContext.Key<*> = Key
}

// now we define our interceptor
class AuthorizationInterceptor(
    private val authorizationProvider: AuthorizationProvider,
) : CoroutineContextServerInterceptor() {
    companion object {
        private val ACCESS_TOKEN_METADATA_KEY: Metadata.Key<String> =
            Metadata.Key.of("access-token", Metadata.ASCII_STRING_MARSHALLER)
    }

    override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext {
        return AuthorizationContext(
            accessHash = headers.get(ACCESS_TOKEN_METADATA_KEY),
            provider = authorizationProvider,
        )
    }
}

kotlinx.coroutines2中的CoroutineContext实际上只是一个Map<K : CoroutineContext.Key, V : CoroutineContext.Element>(例如,更准确,是jvm上的ConcurrentHashMap),与上面的示例相同。但是,如果我们谈论kotlinx.coroutines,它将传播给所需的Coroutine中的所有孩子(我们没有这样的机制)。

所以,现在我们可以在孩子们的情况下得到它:

suspend inline fun provideAuthorization(block: (UserId) -> Unit) {
    val authContext = coroutineContext[AuthorizationContext]
    authContext.accessHash ?: throw StatusException(Status.UNAUTHORIZED)

    val userId = authContext.provider.provide(authContext.accessHash)
    return block(userId)
}

有趣的事实:coroutineContext是Kotlin中唯一具有suspend修饰符的属性。 ð

对于GRPC,我们还需要注册我们的拦截器并编写我们的RPC。但是,对于GRPC,这种解决方案的想法是简单的 - 逻辑并简化开发人员的体验。

对于Java,GRPC使用ThreadLocal,因此我们也可以将CoroutineContext视为ThreadLocal的替代方案。我们不能在Coroutines中使用ThreadLocal,因为通常不会将Coroutine链接到特定线程(尤其是在谈论withContext时)。 Coroutines更有可能在另一个线程上恢复(此外,不同的Coroutines可以在一个线程上运行)。

但是,这是否意味着Coroutines存在的唯一原因是并发?让我解释一下。

Sequence

最常见的例子之一是kotlin.sequences.Sequence<T>。简而言之,这就像一个懒惰的收藏,只有在您开始消费时才能迭代。您可以阅读更多有关它们的here

如果您查看了SequenceScope的来源,它使用引擎盖下的悬挂功能:

@RestrictSuspension
public abstract class SequenceScope<in T> internal constructor() {
    /**
     * Yields a value to the [Iterator] being built and suspends
     * until the next value is requested.
     */
    public abstract suspend fun yield(value: T)

    // ... other
}

@RestrictSuspension拒绝消费者呼叫非会员暂停功能。

因此,这个想法是懒惰的元素。您可以将它们用作常规收藏,并利用懒惰的迭代。

但是它如何在引擎盖下工作?让我们看一下实施资源:

private typealias State = Int  

private const val State_NotReady: State = 0  
private const val State_ManyNotReady: State = 1  
private const val State_ManyReady: State = 2  
private const val State_Ready: State = 3  
private const val State_Done: State = 4  
private const val State_Failed: State = 5  

private class SequenceBuilderIterator<T> : SequenceScope<T>(), Iterator<T>, Continuation<Unit> {  
    private var state = State_NotReady  
    private var nextValue: T? = null  
    private var nextIterator: Iterator<T>? = null  
    var nextStep: Continuation<Unit>? = null  

    override fun hasNext(): Boolean {  
        while (true) {  
            when (state) {  
                State_NotReady -> {}  
                State_ManyNotReady ->  
                if (nextIterator!!.hasNext()) {  
                    state = State_ManyReady  
                    return true  
                } else {  
                    nextIterator = null  
                }  
                State_Done -> return false  
                State_Ready, State_ManyReady -> return true  
                else -> throw exceptionalState()  
            }  

            state = State_Failed  
            val step = nextStep!!  
            nextStep = null  
            step.resume(Unit)  
        }  
    }  

    override fun next(): T {  
        when (state) {  
            State_NotReady, State_ManyNotReady -> return nextNotReady()  
            State_ManyReady -> {  
                state = State_ManyNotReady  
                return nextIterator!!.next()  
            }  
            State_Ready -> {  
                state = State_NotReady  
                @Suppress("UNCHECKED_CAST")  
                val result = nextValue as T  
                nextValue = null  
                return result  
            }  
            else -> throw exceptionalState()  
        }  
    }  

    private fun nextNotReady(): T {  
        if (!hasNext()) throw NoSuchElementException() else return next()  
    }  

    private fun exceptionalState(): Throwable = when (state) {  
        State_Done -> NoSuchElementException()  
        State_Failed -> IllegalStateException("Iterator has failed.")  
        else -> IllegalStateException("Unexpected state of the iterator: $state")  
    }  


    override suspend fun yield(value: T) {  
        nextValue = value  
        state = State_Ready  
        return suspendCoroutineUninterceptedOrReturn { c ->  
            nextStep = c  
            COROUTINE_SUSPENDED  
        }  
    }    
    override fun resumeWith(result: Result<Unit>) {  
        result.getOrThrow() // just rethrow exception if it is there  
        state = State_Done  
    }  

    override val context: CoroutineContext  
        get() = EmptyCoroutineContext  
}

COROUTINE_SUSPENDED是Kotlin编译器内部使用的特殊常数,用于管理Coroutine悬架和恢复。它不是开发人员通常直接与之交互的东西,而是它充当Coroutine机械中的内部信号。

看起来很难阅读,不是吗?让我们逐步进行:

  1. 首先,我们从国家开始。我们将有下一个州,让我们简要地谈论它们:
    • state_notready :迭代器尚未准备好现在提供项目。它可能正在等待操作或进一步处理以使物品可用。
    • state_manynotready :迭代器准备提供多个项目,但尚未立即准备好。它正在等待一个信号表明物品已经准备好消费(基本上是等待终端操作员)。
    • state_manyready :迭代器现在准备提供多个项目。它可以立即从序列中给出下一个项目。
    • state_ready :迭代器有一个准备提供的单个项目。
    • 时,它设置为立即提供该物品。
    • state_done :迭代器没有更多的项目可提供。它已经完成了从序列产生元素的工作。当我们离开SequenceBuilder时,我们达到了这种状态
    • state_failed :发生了一些意外的事情,迭代器遇到了一个问题。通常,这不应该发生。
  2. 基于状态的hasNext,在准备消耗时返回值或一组值。
  3. next功能根据其当前状态从迭代器中检索下一个项目(与hasNext相同)。如果下一个没有hasNext,则可以到达nextNotReady()。在其他情况下,它将仅返回值。
  4. yield功能只是更改序列迭代器实现的状态。当添加新元素时,它将更改为State_Ready。使用suspendCoroutineUninterceptedOrReturn暂停Coroutine(执行)并稍后恢复。它将在以前的Coroutine(悬浮点)完成时开始。基本上,它是由迭代器处理的,但是使其更容易,就像我们从另一个回调了一个回调一样:
   fun foo(callback: (Result<Foo>) -> Unit) {
       // pseudo-code
       executor.execute { result -> callback(result) }
   }

   suspend fun bar(): Result<Foo> {
       // same if was like that
       val result = executor.execute() // suspend point
       return result
   }

最后,它看起来并不复杂,我对吗?

DeepRecursiveScope

现在,让我们讨论Kotlin Coroutines DeepRecursiveScope的另一个案例。如您所知,通常在特定函数呼叫本身时,我们有可能遇到StackOverflowError的概率,因为每个呼叫都将其贡献到我们的堆栈中。

例如,出于相同的目的,也存在tailrec语言构建。不同之处在于,tailrec无法通过调用其他功能进行分支(条件检查)。

您可以阅读更多here

因此,DeepRecursiveScope不依赖传统的堆栈流,而是使用Coroutines提供的所有功能。

我们不会停止使用DeepRecursiveScope的精确实施详细信息(您可以看一下here),因为它具有与Sequence相同的想法,并具有其他行为以支持提供的机制,但让我们讨论Kotlin Coroutines如何解决此问题。特定问题。

在内部进行Coroutines

它如何准确解决问题?正如我之前提到的,Coroutines受到CPS (Continuation Passing Style)的启发,但这并不是Kotlin编译器在处理有效的Coroutines所做的。

Kotlin编译器使用优化的组合来有效地管理Coroutines堆栈和执行。让我们检查到底有什么作用:

  • 编译器转换:Kotlin编译器生成状态机,与我们在Sequences的实现细节中看到的相同。它不会摆脱所有堆栈电话,而是将其减少到不遇到StackOverflowError
  • 连续性堆分配:在传统的回调链中,每个功能呼叫都会将数据推向呼叫堆栈。如果链条深处,这可能会消耗大量堆栈空间。在Coroutine方法中,当悬挂Coroutine时,其继续将其作为对象存储在堆上。此延续对象拥有必要的信息(堆栈,调度程序等),以恢复Coroutine的执行。这个堆的存储空间允许更大的能力处理深层呼叫链,而不会冒堆叠溢出的风险。

下一个Coroutines的确切机制:

  1. 序列化:悬挂的Coroutine的堆栈状态保存在一个堆分配的延续对象中。
  2. 恢复:准备恢复时,该框架为模拟捕获状态设置了本地堆栈。
  3. 内存复制:序列化堆栈状态从持续对象复制到本机堆栈。
  4. 上下文配置:执行上下文配置为匹配原始状态。
  5. 程序计数器:程序计数器设置为保存的值以进行正确的说明。
  6. Invocation :使用CPS调用连续代码,恢复执行。

堆栈还原还可以帮助我们恢复不同的线程,因为他们对我们的Coroutine堆栈一无所知。

所以,从现在开始,我们可以理解Coroutines在内部的工作方式。让我们继续使用其他示例,其中使用Kotlin Coroutines超出并发。

JetPack组成

如果您曾经与Compose合作,例如处理指针事件,您可能已经提到过,有一些来自Coroutines的骇客来聆听更新:

@RestrictsSuspension // <---
@JvmDefaultWithCompatibility
interface AwaitPointerEventScope : Density {
    // ...

    suspend fun awaitPointerEvent(
        pass: PointerEventPass = PointerEventPass.Main
    ): PointerEvent

    suspend fun <T> withTimeoutOrNull(
        timeMillis: Long,
        block: suspend AwaitPointerEventScope.() -> T
    ): T? = block()

    suspend fun <T> withTimeout(
        timeMillis: Long,
        block: suspend AwaitPointerEventScope.() -> T
    ): T = block()

    // ...
}

因此,如您所见,用来处理指针事件的范围用@RestrictsSuspension标记。如果我们来提供文档,我们将看到下一步:

This is a restricted suspension scope. Code in this scope is
always called un-dispatched and may only suspend for calls 
to [awaitPointerEvent]. These functions resume synchronously and the caller may mutate the result
**before** the next await call to affect the next stage of the input processing pipeline.

awaitPointerEvent是使用kotlin原始素处理的,没有kotlinx.coroutines。与在这种情况下一样

Android SDK

由于我们可以在需要回调的所有情况下使用suspend功能,因此还有其他有用的情况,我们可以应用此类逻辑。例如,在请求权限时:

val result = Warden
    .with(this)
    .requestPermission(Manifest.permission.CALL_PHONE)

when (result) {
    is PermissionState.Denied -> dialNumber(phoneNumber)
    PermissionState.Granted -> startCall(phoneNumber)
}

例如,我们正在使用Warden库。

结论

总而言之,本文探讨了Kotlin Coroutines的各个方面,强调了它们的多功能性,而不是传统的并发任务。我们已经深入研究了Coroutines原语的内部运作,讨论了它们在序列和复杂的解决问题的方案(例如深层递归)中的使用,并研究了展示其广泛适用性的现实世界示例。标题“ Coroutines不仅与并发有关”,恰当地反映了Kotlin Coroutines在现代软件开发中提供的多种功能。

随时在水冷却器聊天中随意放下您的专业知识!