每当您听到有关Kotlin Coroutines的信息时,您可能会考虑一个简单,简洁且性能的解决方案,用于处理异步任务,例如网络请求。但这是他们的唯一目的吗?让我们考虑一下Kotlin Coroutines的使用范围以外的并发。
Kotlin Coroutines原语
让我们从理解,基础机制的基础如何起作用。如果我们看一下Kotlin Coroutines的原语,那只是几个课程和功能:
Continuation
CoroutineContext
suspendCoroutine
createCoroutine
startCoroutine
这就是我们在kotlin-stdlib
中拥有的全部。但是他们用什么?让我们更深入地了解Kotlin Coroutines如何设计工作。
Continuation
延续只是两个成员的接口:context
和resumeWith
:
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
它做什么,其目的是什么? Continuation
实际上是 coroutine 。这就像一个抛光回调,具有传播其他信息并为Coroutine执行的有用结果。
我们尚未谈论CoroutineContext
,现在只考虑resumeWith
。
resumeWith
像其他任何回调一样,这是Coroutine完成其工作时所谓的功能。它使用kotlin.Result
以安全的方式传播Coroutine内部发生的任何例外。
因此,我们可以在下一个方法中使用Continuation
从字面上创建我们的并发逻辑:
suspend fun executeNetworkRequest(): String =
suspendCoroutine { continuation ->
thread {
continuation.resumeWith(someBlockingRequest())
}
}
suspendCoroutine
是Kotlin Coroutines代码和非coroutine代码之间的桥梁。
或使用现有的异步(伪代码):
suspend fun executeNetworkRequest(): String =
suspendCoroutine { continuation ->
apiService.getSomething()
// onSuccess & onFailure is a callback
.onSuccess(continuation::resume)
.onFailure(continuation::resumeWithException)
.executeAsync()
}
Continuation<in T>.resume(..)
是避免每次通过kotlin.Result
的扩展。
因此,我们不仅可以实现我们的并发逻辑,还可以使用现有并与Kotlin Coroutines一起使用。
startCoroutine
此外,我们可以使用startCoroutine
开始暂停从非悬浮上下文中暂停功能。在Kotlin中,如果您的main
函数为suspend
。
kotlinx.coroutines
还使用它来运行coroutines,但是那里的机制当然要困难得多。
import kotlin.coroutines.*
fun main() {
val operation: suspend () -> Unit = ::a
operation.startCoroutine(
object : Continuation<Unit> {
// we will talk about it lower
override val coroutineContext = EmptyCoroutineContext
// called when coroutine finished its work
override fun resumeWith(result: Result<Unit>) {
println(
if(result.isSuccess)
"Successfully done"
else "Error happenned: ${result.exceptionOrNull()}"
)
}
}
)
}
suspend fun a() {...}
但是,当然,您不能仅在以这种方式执行Coroutines时从
kotlinx.coroutines
调用暂停功能。
CoroutineContext
现在,我们来到了Continuation
CoroutineContext
的另一名成员。这是什么?
在编程中,我们经常立即处理多个任务,并且有效地管理它们可能具有挑战性。 Kotlin的CoroutineContext
有助于应对这一挑战。这是一个提供者,可以传播到Coroutine。在现实世界中,通常是关于将参数传递到复杂的Coroutines链中。
更清楚,
kotlinx.coroutines
中的Coroutinecontext代表structured concurrency。
简单示例
让我们从以前的startCoroutine
代码示例中创建,能够从CoroutineContext
检索值:
import kotlin.coroutines.*
// define our container for data we need inside coroutine
// it should always inherit 'CoroutineContext.Element'
data class ExecutionContext(val someInt: Int) : CoroutineContext.Element {
override val key = EXECUTION_CONTEXT_KEY
}
// define type-safe key using which we will get our value
val EXECUTION_CONTEXT_KEY = object : CoroutineContext.Key<ExecutionContext> {}
// define coroutine context that we will pass into Continuation
private val myCoroutineContext = object : CoroutineContext {
private val values = mapOf<CoroutineContext.Key<*>, CoroutineContext.Element>(
EXECUTION_CONTEXT_KEY to ExecutionContext(10000)
)
override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
return values[key] as? E
}
// .. we omit other functions for simplicity
}
suspend fun a() {
// here we retrieve value from coroutine context
// coroutineContext is compiler intrinsic, can be called
// only from suspend world
val executionContext = coroutineContext[EXECUTION_CONTEXT_KEY]!!
println(executionContext.someInt!!)
}
fun main() {
val operation: suspend () -> Unit = ::a
operation.startCoroutine(
object : Continuation<Unit> {
override val context: CoroutineContext = myCoroutineContext
override fun resumeWith(result: Result<Unit>) {
println(
if(result.isSuccess)
"Successfully done"
else "Error happenned: ${result.exceptionOrNull()}"
)
}
}
)
}
内存储元素的摘要
CoroutineContext.Element
是用于存储CoroutineContext
CoroutineContext.Key
是CoroutineContext.Element
的标识符。
您可以使用此代码here。
真实项目示例
让我们想象我们拥有API服务。通常,我们需要拥有一定的授权,所以让我们考虑下一个示例(至于这样,我服用了GRPC):
// let's define an element that will persist in `CoroutineContext`
data class AuthorizationContext(
val accessHash: String?,
val provider: AuthorizationProvider,
) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<AuthorizationContext>
override val key: CoroutineContext.Key<*> = Key
}
// now we define our interceptor
class AuthorizationInterceptor(
private val authorizationProvider: AuthorizationProvider,
) : CoroutineContextServerInterceptor() {
companion object {
private val ACCESS_TOKEN_METADATA_KEY: Metadata.Key<String> =
Metadata.Key.of("access-token", Metadata.ASCII_STRING_MARSHALLER)
}
override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext {
return AuthorizationContext(
accessHash = headers.get(ACCESS_TOKEN_METADATA_KEY),
provider = authorizationProvider,
)
}
}
kotlinx.coroutines
2中的CoroutineContext
实际上只是一个Map<K : CoroutineContext.Key, V : CoroutineContext.Element>
(例如,更准确,是jvm上的ConcurrentHashMap),与上面的示例相同。但是,如果我们谈论kotlinx.coroutines
,它将传播给所需的Coroutine中的所有孩子(我们没有这样的机制)。
所以,现在我们可以在孩子们的情况下得到它:
suspend inline fun provideAuthorization(block: (UserId) -> Unit) {
val authContext = coroutineContext[AuthorizationContext]
authContext.accessHash ?: throw StatusException(Status.UNAUTHORIZED)
val userId = authContext.provider.provide(authContext.accessHash)
return block(userId)
}
有趣的事实:coroutineContext是Kotlin中唯一具有
suspend
修饰符的属性。 ð
对于GRPC,我们还需要注册我们的拦截器并编写我们的RPC。但是,对于GRPC,这种解决方案的想法是简单的 - 逻辑并简化开发人员的体验。
对于Java,GRPC使用ThreadLocal,因此我们也可以将
CoroutineContext
视为ThreadLocal
的替代方案。我们不能在Coroutines中使用ThreadLocal
,因为通常不会将Coroutine链接到特定线程(尤其是在谈论withContext
时)。 Coroutines更有可能在另一个线程上恢复(此外,不同的Coroutines可以在一个线程上运行)。
但是,这是否意味着Coroutines存在的唯一原因是并发?让我解释一下。
Sequence
最常见的例子之一是kotlin.sequences.Sequence<T>
。简而言之,这就像一个懒惰的收藏,只有在您开始消费时才能迭代。您可以阅读更多有关它们的here。
如果您查看了SequenceScope
的来源,它使用引擎盖下的悬挂功能:
@RestrictSuspension
public abstract class SequenceScope<in T> internal constructor() {
/**
* Yields a value to the [Iterator] being built and suspends
* until the next value is requested.
*/
public abstract suspend fun yield(value: T)
// ... other
}
@RestrictSuspension
拒绝消费者呼叫非会员暂停功能。
因此,这个想法是懒惰的元素。您可以将它们用作常规收藏,并利用懒惰的迭代。
但是它如何在引擎盖下工作?让我们看一下实施资源:
private typealias State = Int
private const val State_NotReady: State = 0
private const val State_ManyNotReady: State = 1
private const val State_ManyReady: State = 2
private const val State_Ready: State = 3
private const val State_Done: State = 4
private const val State_Failed: State = 5
private class SequenceBuilderIterator<T> : SequenceScope<T>(), Iterator<T>, Continuation<Unit> {
private var state = State_NotReady
private var nextValue: T? = null
private var nextIterator: Iterator<T>? = null
var nextStep: Continuation<Unit>? = null
override fun hasNext(): Boolean {
while (true) {
when (state) {
State_NotReady -> {}
State_ManyNotReady ->
if (nextIterator!!.hasNext()) {
state = State_ManyReady
return true
} else {
nextIterator = null
}
State_Done -> return false
State_Ready, State_ManyReady -> return true
else -> throw exceptionalState()
}
state = State_Failed
val step = nextStep!!
nextStep = null
step.resume(Unit)
}
}
override fun next(): T {
when (state) {
State_NotReady, State_ManyNotReady -> return nextNotReady()
State_ManyReady -> {
state = State_ManyNotReady
return nextIterator!!.next()
}
State_Ready -> {
state = State_NotReady
@Suppress("UNCHECKED_CAST")
val result = nextValue as T
nextValue = null
return result
}
else -> throw exceptionalState()
}
}
private fun nextNotReady(): T {
if (!hasNext()) throw NoSuchElementException() else return next()
}
private fun exceptionalState(): Throwable = when (state) {
State_Done -> NoSuchElementException()
State_Failed -> IllegalStateException("Iterator has failed.")
else -> IllegalStateException("Unexpected state of the iterator: $state")
}
override suspend fun yield(value: T) {
nextValue = value
state = State_Ready
return suspendCoroutineUninterceptedOrReturn { c ->
nextStep = c
COROUTINE_SUSPENDED
}
}
override fun resumeWith(result: Result<Unit>) {
result.getOrThrow() // just rethrow exception if it is there
state = State_Done
}
override val context: CoroutineContext
get() = EmptyCoroutineContext
}
COROUTINE_SUSPENDED
是Kotlin编译器内部使用的特殊常数,用于管理Coroutine悬架和恢复。它不是开发人员通常直接与之交互的东西,而是它充当Coroutine机械中的内部信号。
看起来很难阅读,不是吗?让我们逐步进行:
- 首先,我们从国家开始。我们将有下一个州,让我们简要地谈论它们:
- state_notready :迭代器尚未准备好现在提供项目。它可能正在等待操作或进一步处理以使物品可用。
- state_manynotready :迭代器准备提供多个项目,但尚未立即准备好。它正在等待一个信号表明物品已经准备好消费(基本上是等待终端操作员)。
- state_manyready :迭代器现在准备提供多个项目。它可以立即从序列中给出下一个项目。
- state_ready :迭代器有一个准备提供的单个项目。 时,它设置为立即提供该物品。
-
state_done :迭代器没有更多的项目可提供。它已经完成了从序列产生元素的工作。当我们离开
SequenceBuilder
时,我们达到了这种状态 - state_failed :发生了一些意外的事情,迭代器遇到了一个问题。通常,这不应该发生。
-
基于状态的
hasNext
,在准备消耗时返回值或一组值。 -
next
功能根据其当前状态从迭代器中检索下一个项目(与hasNext
相同)。如果下一个没有hasNext
,则可以到达nextNotReady()
。在其他情况下,它将仅返回值。 -
yield
功能只是更改序列迭代器实现的状态。当添加新元素时,它将更改为State_Ready
。使用suspendCoroutineUninterceptedOrReturn
暂停Coroutine(执行)并稍后恢复。它将在以前的Coroutine(悬浮点)完成时开始。基本上,它是由迭代器处理的,但是使其更容易,就像我们从另一个回调了一个回调一样:
fun foo(callback: (Result<Foo>) -> Unit) {
// pseudo-code
executor.execute { result -> callback(result) }
}
suspend fun bar(): Result<Foo> {
// same if was like that
val result = executor.execute() // suspend point
return result
}
最后,它看起来并不复杂,我对吗?
DeepRecursiveScope
现在,让我们讨论Kotlin Coroutines DeepRecursiveScope
的另一个案例。如您所知,通常在特定函数呼叫本身时,我们有可能遇到StackOverflowError
的概率,因为每个呼叫都将其贡献到我们的堆栈中。
例如,出于相同的目的,也存在tailrec
语言构建。不同之处在于,tailrec
无法通过调用其他功能进行分支(条件检查)。您可以阅读更多here。
因此,DeepRecursiveScope
不依赖传统的堆栈流,而是使用Coroutines提供的所有功能。
我们不会停止使用DeepRecursiveScope
的精确实施详细信息(您可以看一下here),因为它具有与Sequence
相同的想法,并具有其他行为以支持提供的机制,但让我们讨论Kotlin Coroutines如何解决此问题。特定问题。
在内部进行Coroutines
它如何准确解决问题?正如我之前提到的,Coroutines受到CPS (Continuation Passing Style)的启发,但这并不是Kotlin编译器在处理有效的Coroutines所做的。
Kotlin编译器使用优化的组合来有效地管理Coroutines堆栈和执行。让我们检查到底有什么作用:
-
编译器转换:Kotlin编译器生成状态机,与我们在Sequences的实现细节中看到的相同。它不会摆脱所有堆栈电话,而是将其减少到不遇到
StackOverflowError
。 - 连续性堆分配:在传统的回调链中,每个功能呼叫都会将数据推向呼叫堆栈。如果链条深处,这可能会消耗大量堆栈空间。在Coroutine方法中,当悬挂Coroutine时,其继续将其作为对象存储在堆上。此延续对象拥有必要的信息(堆栈,调度程序等),以恢复Coroutine的执行。这个堆的存储空间允许更大的能力处理深层呼叫链,而不会冒堆叠溢出的风险。
下一个Coroutines的确切机制:
- 序列化:悬挂的Coroutine的堆栈状态保存在一个堆分配的延续对象中。
- 恢复:准备恢复时,该框架为模拟捕获状态设置了本地堆栈。
- 内存复制:序列化堆栈状态从持续对象复制到本机堆栈。
- 上下文配置:执行上下文配置为匹配原始状态。
- 程序计数器:程序计数器设置为保存的值以进行正确的说明。
- Invocation :使用CPS调用连续代码,恢复执行。
堆栈还原还可以帮助我们恢复不同的线程,因为他们对我们的Coroutine堆栈一无所知。
所以,从现在开始,我们可以理解Coroutines在内部的工作方式。让我们继续使用其他示例,其中使用Kotlin Coroutines超出并发。
JetPack组成
如果您曾经与Compose合作,例如处理指针事件,您可能已经提到过,有一些来自Coroutines的骇客来聆听更新:
@RestrictsSuspension // <---
@JvmDefaultWithCompatibility
interface AwaitPointerEventScope : Density {
// ...
suspend fun awaitPointerEvent(
pass: PointerEventPass = PointerEventPass.Main
): PointerEvent
suspend fun <T> withTimeoutOrNull(
timeMillis: Long,
block: suspend AwaitPointerEventScope.() -> T
): T? = block()
suspend fun <T> withTimeout(
timeMillis: Long,
block: suspend AwaitPointerEventScope.() -> T
): T = block()
// ...
}
因此,如您所见,用来处理指针事件的范围用@RestrictsSuspension
标记。如果我们来提供文档,我们将看到下一步:
This is a restricted suspension scope. Code in this scope is
always called un-dispatched and may only suspend for calls
to [awaitPointerEvent]. These functions resume synchronously and the caller may mutate the result
**before** the next await call to affect the next stage of the input processing pipeline.
awaitPointerEvent
是使用kotlin原始素处理的,没有kotlinx.coroutines
。与在这种情况下一样
Android SDK
由于我们可以在需要回调的所有情况下使用suspend
功能,因此还有其他有用的情况,我们可以应用此类逻辑。例如,在请求权限时:
val result = Warden
.with(this)
.requestPermission(Manifest.permission.CALL_PHONE)
when (result) {
is PermissionState.Denied -> dialNumber(phoneNumber)
PermissionState.Granted -> startCall(phoneNumber)
}
例如,我们正在使用Warden库。
结论
总而言之,本文探讨了Kotlin Coroutines的各个方面,强调了它们的多功能性,而不是传统的并发任务。我们已经深入研究了Coroutines原语的内部运作,讨论了它们在序列和复杂的解决问题的方案(例如深层递归)中的使用,并研究了展示其广泛适用性的现实世界示例。标题“ Coroutines不仅与并发有关”,恰当地反映了Kotlin Coroutines在现代软件开发中提供的多种功能。
随时在水冷却器聊天中随意放下您的专业知识!