本文最初由Léo Schneider和Mehmet Akif Tütüncü发表。
介绍
在文章的第二部分中,我们将证明我们可以使用Kotlin Coroutines在直接样式编写代码时具有反应性应用程序。
我们希望使用java编写的反应堆从弹簧启动反应式项目开始,请参见part 1。
在第一部分中,我们将专注于Kotlin并将项目适应该语言。在第二部分中,我们将探索Coroutines,以及如何使用它们。
用kotlin反应
到目前为止,我们在整个链中都有一个反应性应用。
我们甚至有一个保障措施来检测阻止呼叫,以验证运行时代码的非阻滞性质。
我们可以在这里停止所有内容,但是目前有一个可能的升级。
即使这意味着我们必须更改编程语言!
您在标题中阅读了它,这当然是Kotlin
。
Kotlin概述
有趣的是,Kotlin
是一种乘法语言,最初用于Android开发,但现在越来越多地用作后端语言。
编译的代码将是正常的Java字节代码,因此甚至可以在同一项目中同时将Kotlin
和Java
文件同时使用。
这也意味着我们可以保留Java框架和库,因此不需要其他更改。
Kotlin功能
kotlin具有许多有趣的功能,其中有些甚至在最新版本中被Java复制。
空安全
一个主要功能是每种类型都有变化nullable
和not nullable
。
编译器可以在编译时知道是否可能发生NullPointerException
,如果没有处理此类情况,将拒绝编译。
不变性
默认情况下,kotlin中的一切都是不变的:除非您另行指定,否则收藏是不变的。
您可以使用val
将变量标记为常数引用。
知道什么可以和什么不能改变,它有助于减少产生的副作用数量。
扩展功能
kotlin提供了在不继承类的情况下从接收方创建功能的能力。
然后,您可以扩展您不拥有的类的功能。
标准库
如果我们将其与Java进行比较,该语言的标准库具有许多方便的功能。
例如,您无需在您想从收藏中做某事时使用stream()
。
多亏了扩展功能,您甚至可以在Java类型上获得其他方法。
将项目迁移到Kotlin
下一步是将我们的项目升级到Kotlin。我们有两种方法:
1.使用IDE自动工具
虽然它适用于大多数方法,但它将将这些类型限制在IDE可以看到的类型中。
例如:
@get:GetMapping
val allWeather: Flux<WeatherInfo>
get() = weatherService.allWeather
这种方法从get
开始,IDE得出结论,这是一个称为allWeather
的属性。
它创建了一个带有getter的属性来替换它。
但是我们人类可以看到这是一种方法,而是必须手动调整它:
fun getAllWeather(): Flux<WeatherInfo> {
return weatherService.getAllWeather()
}
2.手动更改文件
由于IDE自动生成的代码并不理想拥有一些可读代码,
最好是手动重写它,或者至少要自动生成的所有代码。
这听起来像是一项非常耗时的任务,但是使用某些Regexes,查找/替换和多镜头帮助
要快速更改某些文件。
几乎每个功能都会从该模式中改变:
public Foo foo(Bar bar) {}
对此:
fun foo(bar: Bar): Foo {}
迁移到数据类
java限制了我们每个文件只有一个类。
有时我们有非常简单的课程,在这种情况下,我们可以将它们迁移为data class
,它在Java中与Record
相等。
Kotlin的附加值是我们可以将它们全部放入一个文件中。所得文件可以包含像这样的小模块的整个域。
package com.io.reactivecoroutines.weather.api
import com.fasterxml.jackson.annotation.JsonProperty
import java.time.LocalDate
data class Forecast(
@JsonProperty("forecastday") val days: List<ForecastDay>
)
data class ForecastDay(
val date: LocalDate,
@JsonProperty("day") val temperature: Temperature,
)
data class Location(
@JsonProperty("name") val city: String,
val region: String,
val country: String,
)
data class Temperature(
@JsonProperty("maxtemp_c") val maxC: Double,
@JsonProperty("maxtemp_f") val maxF: Double,
@JsonProperty("mintemp_c") val minC: Double,
@JsonProperty("mintemp_f") val minF: Double,
@JsonProperty("avgtemp_c") val avgC: Double,
@JsonProperty("avgtemp_f") val avgF: Double,
)
测试
如果您使用的是Mockito
,则可以意识到函数when()
现在无效,因为它是Kotlin中的保留关键字。
Mockito具有库的Kotlin变体,您可以获得依赖性:org.mockito.kotlin:mockito-kotlin
比Java One获得更好的支持。
when
被称为whenever
,但是除此之外,几乎所有东西都应该相似。
现在我们有了Kotlin的整个项目,我们可以立即进入我们要寻找的东西:Coroutines
共同点
Coroutines首次在1963年出版的Design of a Separable Transition-Diagram Compiler引入。
它代表了可分开的程序的一部分,可以暂停并恢复其执行。
这意味着,如果我们具有可以暂停的功能,我们可以具有非阻滞行为和异步调用。
通过进行更多的调整,我们可以同时发生并启用并行性。
与实例化新线程相比,使用Coroutines启动新的执行流程非常轻巧,因此我们可以同时拥有许多Coroutines。
这说明了冠端的强大程度以及为什么它们对我们很有趣,尤其是对于非障碍的场景。
kotlin具有该语言支持并由图书馆实施的Coroutines(该实施仍然取决于平台):
kotlinx.coroutines
并且可以使用沙盒环境here尝试一下。
作为语言的一部分,共核
kotlin中的coroutines允许我们以直接样式编写代码,但与回调一起运行。
当您使用修饰符suspend
更改功能时,您指出此功能可以暂停其执行
因此,它只能在Coroutine上下文中运行 。编译器将用Continuation
有效包装返回类型。
在直接样式中,我们通常会写这样的代码:
fun postItem(item: Item) {
val token = createToken()
val response = client.post(item, token)
handleResponse(response)
}
如果我们链接有关流的延续的回调,那将是这样很难阅读的事情:
fun postItem(item: Item) {
createToken { token ->
client.post(item, token) { response ->
handleResponse(response)
}
}
}
只需将suspend
关键字添加到直接样式代码中,我们就可以通过持续回调运行!
我们现在可以两全其美。
suspend fun postItem(item: Item) {
/*Suspend*/ val token = createToken()
/*Suspend*/ val response = client.post(item, token)
handleResponse(response)
}
与项目反应堆集成
如果您想按照项目反应堆的方式保留代码并从其特定API中受益,
然后,您可以简单地将所有内容包裹在Kotlin代码中,就像您在Java中使用的方式一样。
由于库之间的互操作性,您可以使用asFlow()
和asFlux()
更改代码。
如果您打算进行一些迁移,这尤其有价值。因为在这种情况下,您可以逐步进行。
您可以从Kotlin提供的生活质量中受益,并保持您的反应堆特定代码。
但这不是迁移整个项目的好理由。
很有趣的是,知道这是可能的,但是我们真正想要的是使用Coroutines,因为它将大大降低您阅读的代码的复杂性。
Coroutines迁移
在我们的项目中,我们意识到我们需要暂停从控制器到存储库中的链中的所有功能。
最好的开始是从诸如存储库之类的外部图层开始,然后逐层移动。
1.存储库层
在存储库中,我们以前有一个反应性存储库:
@Repository
interface WeatherRepository : ReactiveCrudRepository<WeatherInfo, String>
我们可以将其更改为CoroutineCrudRepository
,我们还可以得到一个支持排序的一个:CoroutineSortingCrudRepository
:
这些接口来自依赖关系spring-boot-starter-data-r2dbc
@Repository
interface WeatherRepository : CoroutineCrudRepository<WeatherInfo, String>
如果我们检查函数findById(id: Long)
,我们可以看到存储库提供了以下内容:
suspend fun findById(id: Long): WeatherInfo?
这是惊人的部分!此功能正在暂停,该存储库是非障碍的,但是我们不必处理反应性类型!
无需再包装!
如果我们看一下findAll()
方法,我们可以看到它不会暂停并返回反应类型:
fun findAll(): Flow<T>
要摆脱Flow
类型,我们需要收集它,否则,我们可以将其传递到更高的水平。
- 如果我们以页面的方式工作,那么我们可以简单地使用
toList()
等待整个列表 - 如果我们想逐步流式传输数据,那么我们可以通过
flow
直到服务器响应的主体。
2. WebFlux的Webclients
大多数真实的应用程序不仅具有数据库,还具有网络上的CRUD操作。
这就是为什么我们在应用程序中有一些反应性网络元素的原因。
它们是使用反应器API以功能风格编写的。
他们看起来像这样:
val forecast: WeatherApiResponse = webClient
.get()
.uri("$host/v1/forecast.json?key=$apiKey&q=$query&days=7")
.exchangeToMono<WeatherAPIResponse?> { it.bodyToMono() }
.doFirst {
log.info("Getting weather forecast for {}", query)
}
.doOnError {
log.error("Cannot get weather forecast for $query", it)
Exceptions.propagate(it)
}
.doOnSuccess { response ->
log.info(
"Weather forecast for query {}: {}",
query,
response
)
}
我们可以使用coroutine方法:我们标记了这样的悬挂电话:/*S*/
val forecastResponse: ClientResponse = webClient
.get()
.uri("$host/v1/forecast.json?key=$apiKey&q=$query&days=7")
/*S*/ .awaitExchange { it }
val statusCode = forecastResponse.statusCode()
return when {
/*S*/ statusCode.is2xxSuccessful -> forecastResponse.awaitBody()
/*S*/ statusCode.is4xxClientError -> throw IllegalArgumentException(forecastResponse.createExceptionAndAwait())
/*S*/ statusCode.is5xxServerError -> throw forecastResponse.createExceptionAndAwait()
/*S*/ else -> throw forecastResponse.createExceptionAndAwait()
}
现在,我们回到了直接的样式,我们对可以使用的代码类型有更多控制。
我们最终使用不同的功能:转换表看起来像:
-
exchangeToMono()
->awaitExchange()
及其无效的变体awaitExchangeOrNull()
-
exchangeToFlux()
->exchangeToFlow()
-
bodyToMono()
->awaitBody()
及其无效的变体awaitBodyOrNull()
- Cold41-> Cold42 ...
您可以看到,可以等待零或一个物体的身体直接检索,
流动可以在以后等待或一一处理。
3.服务层
在服务层中,我们可以将功能标记为suspend
,并将类型从其反应类型更改为“未包装”类型:
fun findById(id: Long): Mono<WeatherInfo> {}
fun findAll(): Flux<WeatherInfo> {}
fun streamAll(): Flux<WeatherInfo> {}
都应该变得如此:
suspend fun findById(id: Long): WeatherInfo {}
suspend fun findAll(): List<WeatherInfo> {}
suspend fun streamAll(): Flow<WeatherInfo> {}
4.控制器和请求处理程序
如果您正在为控制器使用@RestController
注释,
然后,无论您拥有哪种类型
但是,当您调用暂停功能下线时,您也需要更改控制器以暂停。
@GetMapping("/{id}")
fun getById(@PathVariable id: Long): Mono<WeatherInfo> {}
将成为:
@GetMapping("/{id}")
suspend fun getById(@PathVariable id: Long): WeatherInfo {}
推出Coroutines
您可能遇到的一个重要问题是:但是我们从哪里开始Coroutines?
我在代码中的任何地方都看不到任何launch {}
。
与在反应堆中调用subscribe()
的方式相同,您很可能不需要。
这是因为如果您的控制器功能正在悬挂,那么Spring
将为您启动它们。
如果使用路由器样式方法,则可以使用coRouter
而不是普通的方法。
您的路由器将从此更改:
@Bean
fun route(weatherSearchHandler: WeatherSearchHandler): RouterFunction<ServerResponse> {
return RouterFunctions
.route(
POST("/weather-infos/search/")
.and(accept(MediaType.APPLICATION_JSON)), weatherSearchHandler::searchByExample
)
}
对此:
@Bean
fun route(weatherSearchHandler: WeatherSearchHandler) = coRouter {
POST("/weather-infos/search/", weatherSearchHandler::searchByExample).apply {
accept(MediaType.APPLICATION_JSON)
}
}
当我们到达这一点时,我们终于迁移了一切,
我们可以在保持通常的写作风格的同时进行反应性应用。
要点
- 反应性库是可互操作的,因为它们与相同的抽象合作
- Kotlin是一种很好的语言,可以在一个反应性项目上使用,但不仅是
- 可以逐步完成此类项目
- coroutines允许我们保持应用的反应性,但没有功能代码样式限制
- 在使用悬浮功能时,我们可以使用普通类型,而不是反应性
额外资源:
- Kotlin channels用于流数据
- abaoqian35 by nicolasbrofänkel
- Concurrency is not Parallelism作者Rob Pike
- 罗马伊丽莎白的Deep Dive into Coroutines on JVM
- Java sockets I/O: blocking, non-blocking and asynchronous by Aliaksandr liakh