带有Spring Boot,Kotlin和Coroutines的反应性后端应用(第2部分)
#java #kotlin #springboot #reactive

本文最初由Léo SchneiderMehmet Akif Tütüncü发表。

介绍

在文章的第二部分中,我们将证明我们可以使用Kotlin Coroutines在直接样式编写代码时具有反应性应用程序。

我们希望使用java编写的反应堆从弹簧启动反应式项目开始,请参见part 1

在第一部分中,我们将专注于Kotlin并将项目适应该语言。在第二部分中,我们将探索Coroutines,以及如何使用它们。

用kotlin反应

到目前为止,我们在整个链中都有一个反应性应用。
我们甚至有一个保障措施来检测阻止呼叫,以验证运行时代码的非阻滞性质。

我们可以在这里停止所有内容,但是目前有一个可能的升级。
即使这意味着我们必须更改编程语言!

您在标题中阅读了它,这当然是Kotlin

Kotlin概述

有趣的是,Kotlin是一种乘法语言,最初用于Android开发,但现在越来越多地用作后端语言。
编译的代码将是正常的Java字节代码,因此甚至可以在同一项目中同时将KotlinJava文件同时使用。
这也意味着我们可以保留Java框架和库,因此不需要其他更改。

Kotlin功能

kotlin具有许多有趣的功能,其中有些甚至在最新版本中被Java复制。

空安全

一个主要功能是每种类型都有变化nullablenot nullable
编译器可以在编译时知道是否可能发生NullPointerException,如果没有处理此类情况,将拒绝编译。

不变性

默认情况下,kotlin中的一切都是不变的:除非您另行指定,否则收藏是不变的。
您可以使用val将变量标记为常数引用。
知道什么可以和什么不能改变,它有助于减少产生的副作用数量。

扩展功能

kotlin提供了在不继承类的情况下从接收方创建功能的能力。
然后,您可以扩展您不拥有的类的功能。

标准库

如果我们将其与Java进行比较,该语言的标准库具有许多方便的功能。
例如,您无需在您想从收藏中做某事时使用stream()
多亏了扩展功能,您甚至可以在Java类型上获得其他方法。

将项目迁移到Kotlin

下一步是将我们的项目升级到Kotlin。我们有两种方法:

1.使用IDE自动工具

Image description
虽然它适用于大多数方法,但它将将这些类型限制在IDE可以看到的类型中。
例如:

@get:GetMapping
val allWeather: Flux<WeatherInfo>
    get() = weatherService.allWeather

这种方法从get开始,IDE得出结论,这是一个称为allWeather的属性。
它创建了一个带有getter的属性来替换它。
但是我们人类可以看到这是一种方法,而是必须手动调整它:

fun getAllWeather(): Flux<WeatherInfo> {
    return weatherService.getAllWeather()
}

2.手动更改文件

由于IDE自动生成的代码并不理想拥有一些可读代码,
最好是手动重写它,或者至少要自动生成的所有代码。

这听起来像是一项非常耗时的任务,但是使用某些Regexes,查找/替换和多镜头帮助
要快速更改某些文件。

几乎每个功能都会从该模式中改变:

public Foo foo(Bar bar) {}

对此:

fun foo(bar: Bar): Foo {}

迁移到数据类

java限制了我们每个文件只有一个类。
有时我们有非常简单的课程,在这种情况下,我们可以将它们迁移为data class,它在Java中与Record相等。
Kotlin的附加值是我们可以将它们全部放入一个文件中。所得文件可以包含像这样的小模块的整个域。

package com.io.reactivecoroutines.weather.api

import com.fasterxml.jackson.annotation.JsonProperty
import java.time.LocalDate

data class Forecast(
    @JsonProperty("forecastday") val days: List<ForecastDay>
)

data class ForecastDay(
    val date: LocalDate,
    @JsonProperty("day") val temperature: Temperature,
)

data class Location(
    @JsonProperty("name") val city: String,
    val region: String,
    val country: String,
)

data class Temperature(
    @JsonProperty("maxtemp_c") val maxC: Double,
    @JsonProperty("maxtemp_f") val maxF: Double,
    @JsonProperty("mintemp_c") val minC: Double,
    @JsonProperty("mintemp_f") val minF: Double,
    @JsonProperty("avgtemp_c") val avgC: Double,
    @JsonProperty("avgtemp_f") val avgF: Double,
)

测试

如果您使用的是Mockito,则可以意识到函数when()现在无效,因为它是Kotlin中的保留关键字。
Mockito具有库的Kotlin变体,您可以获得依赖性:org.mockito.kotlin:mockito-kotlin比Java One获得更好的支持。
when被称为whenever,但是除此之外,几乎所有东西都应该相似。

现在我们有了Kotlin的整个项目,我们可以立即进入我们要寻找的东西:Coroutines

共同点

Coroutines首次在1963年出版的Design of a Separable Transition-Diagram Compiler引入。
它代表了可分开的程序的一部分,可以暂停并恢复其执行。

这意味着,如果我们具有可以暂停的功能,我们可以具有非阻滞行为和异步调用。
通过进行更多的调整,我们可以同时发生并启用并行性。

与实例化新线程相比,使用Coroutines启动新的执行流程非常轻巧,因此我们可以同时拥有许多Coroutines。
这说明了冠端的强大程度以及为什么它们对我们很有趣,尤其是对于非障碍的场景。

kotlin具有该语言支持并由图书馆实施的Coroutines(该实施仍然取决于平台):
kotlinx.coroutines
并且可以使用沙盒环境here尝试一下。

作为语言的一部分,共核

kotlin中的coroutines允许我们以直接样式编写代码,但与回调一起运行。
当您使用修饰符suspend更改功能时,您指出此功能可以暂停其执行
因此,它只能在Coroutine上下文中运行 。编译器将用Continuation有效包装返回类型。

在直接样式中,我们通常会写这样的代码:

fun postItem(item: Item) {
    val token = createToken()
    val response = client.post(item, token)
    handleResponse(response)
}

如果我们链接有关流的延续的回调,那将是这样很难阅读的事情:

fun postItem(item: Item) {
    createToken { token ->
        client.post(item, token) { response ->
            handleResponse(response)
        }
    }
}

只需将suspend关键字添加到直接样式代码中,我们就可以通过持续回调运行!
我们现在可以两全其美。

            suspend fun postItem(item: Item) {
/*Suspend*/     val token = createToken()
/*Suspend*/     val response = client.post(item, token)
                handleResponse(response)
            }

与项目反应堆集成

如果您想按照项目反应堆的方式保留代码并从其特定API中受益,
然后,您可以简单地将所有内容包裹在Kotlin代码中,就像您在Java中使用的方式一样。

由于库之间的互操作性,您可以使用asFlow()asFlux()更改代码。
如果您打算进行一些迁移,这尤其有价值。因为在这种情况下,您可以逐步进行。

您可以从Kotlin提供的生活质量中受益,并保持您的反应堆特定代码。
但这不是迁移整个项目的好理由。

很有趣的是,知道这是可能的,但是我们真正想要的是使用Coroutines,因为它将大大降低您阅读的代码的复杂性。

Coroutines迁移

在我们的项目中,我们意识到我们需要暂停从控制器到存储库中的链中的所有功能。
最好的开始是从诸如存储库之类的外部图层开始,然后逐层移动。

1.存储库层

在存储库中,我们以前有一个反应性存储库:

@Repository
interface WeatherRepository : ReactiveCrudRepository<WeatherInfo, String>

我们可以将其更改为CoroutineCrudRepository,我们还可以得到一个支持排序的一个:CoroutineSortingCrudRepository
这些接口来自依赖关系spring-boot-starter-data-r2dbc

@Repository
interface WeatherRepository : CoroutineCrudRepository<WeatherInfo, String>

如果我们检查函数findById(id: Long),我们可以看到存储库提供了以下内容:

suspend fun findById(id: Long): WeatherInfo?

这是惊人的部分!此功能正在暂停,该存储库是非障碍的,但是我们不必处理反应性类型!
无需再包装!

如果我们看一下findAll()方法,我们可以看到它不会暂停并返回反应类型:

fun findAll(): Flow<T>

要摆脱Flow类型,我们需要收集它,否则,我们可以将其传递到更高的水平。

  • 如果我们以页面的方式工作,那么我们可以简单地使用toList()等待整个列表
  • 如果我们想逐步流式传输数据,那么我们可以通过flow直到服务器响应的主体。

2. WebFlux的Webclients

大多数真实的应用程序不仅具有数据库,还具有网络上的CRUD操作。
这就是为什么我们在应用程序中有一些反应性网络元素的原因。 它们是使用反应器API以功能风格编写的。

他们看起来像这样:

val forecast: WeatherApiResponse = webClient
    .get()
    .uri("$host/v1/forecast.json?key=$apiKey&q=$query&days=7")
    .exchangeToMono<WeatherAPIResponse?> { it.bodyToMono() }
    .doFirst {
        log.info("Getting weather forecast for {}", query)
    }
    .doOnError {
        log.error("Cannot get weather forecast for $query", it)
        Exceptions.propagate(it)
    }
    .doOnSuccess { response ->
        log.info(
            "Weather forecast for query {}: {}",
            query,
            response
        )
    }

我们可以使用coroutine方法:我们标记了这样的悬挂电话:/*S*/

        val forecastResponse: ClientResponse = webClient
            .get()
            .uri("$host/v1/forecast.json?key=$apiKey&q=$query&days=7")
/*S*/       .awaitExchange { it }

        val statusCode = forecastResponse.statusCode()

        return when {
/*S*/       statusCode.is2xxSuccessful -> forecastResponse.awaitBody()
/*S*/       statusCode.is4xxClientError -> throw IllegalArgumentException(forecastResponse.createExceptionAndAwait())
/*S*/       statusCode.is5xxServerError -> throw forecastResponse.createExceptionAndAwait()
/*S*/       else -> throw forecastResponse.createExceptionAndAwait()
        }

现在,我们回到了直接的样式,我们对可以使用的代码类型有更多控制。

我们最终使用不同的功能:转换表看起来像:

  1. exchangeToMono()-> awaitExchange()及其无效的变体awaitExchangeOrNull()
  2. exchangeToFlux()-> exchangeToFlow()
  3. bodyToMono()-> awaitBody()及其无效的变体awaitBodyOrNull()
  4. Cold41-> Cold42 ...

您可以看到,可以等待零或一个物体的身体直接检索,
流动可以在以后等待或一一处理。

3.服务层

在服务层中,我们可以将功能标记为suspend,并将类型从其反应类型更改为“未包装”类型:

fun findById(id: Long): Mono<WeatherInfo> {}
fun findAll(): Flux<WeatherInfo> {}
fun streamAll(): Flux<WeatherInfo> {}

都应该变得如此:

suspend fun findById(id: Long): WeatherInfo {}
suspend fun findAll(): List<WeatherInfo> {}
suspend fun streamAll(): Flow<WeatherInfo> {}

4.控制器和请求处理程序

如果您正在为控制器使用@RestController注释,
然后,无论您拥有哪种类型 但是,当您调用暂停功能下线时,您也需要更改控制器以暂停。

@GetMapping("/{id}")
fun getById(@PathVariable id: Long): Mono<WeatherInfo> {}

将成为:

@GetMapping("/{id}")
suspend fun getById(@PathVariable id: Long): WeatherInfo {}

推出Coroutines

您可能遇到的一个重要问题是:但是我们从哪里开始Coroutines?
我在代码中的任何地方都看不到任何launch {}

Image description

与在反应堆中调用subscribe()的方式相同,您很可能不需要。
这是因为如果您的控制器功能正在悬挂,那么Spring将为您启动它们。
如果使用路由器样式方法,则可以使用coRouter而不是普通的方法。

您的路由器将从此更改:

@Bean
fun route(weatherSearchHandler: WeatherSearchHandler): RouterFunction<ServerResponse> {
    return RouterFunctions
        .route(
            POST("/weather-infos/search/")
                .and(accept(MediaType.APPLICATION_JSON)), weatherSearchHandler::searchByExample
        )
}

对此:

@Bean
fun route(weatherSearchHandler: WeatherSearchHandler) = coRouter {
        POST("/weather-infos/search/", weatherSearchHandler::searchByExample).apply {
            accept(MediaType.APPLICATION_JSON)
        }
}

当我们到达这一点时,我们终于迁移了一切,
我们可以在保持通常的写作风格的同时进行反应性应用。

要点

  1. 反应性库是可互操作的,因为它们与相同的抽象合作
  2. Kotlin是一种很好的语言,可以在一个反应​​性项目上使用,但不仅是
  3. 可以逐步完成此类项目
  4. coroutines允许我们保持应用的反应性,但没有功能代码样式限制
  5. 在使用悬浮功能时,我们可以使用普通类型,而不是反应性

额外资源: