本文最初由Mehmet Akif Tütüncü和Léo Schneider发表。
春季框架是Web应用程序最受欢迎的选择之一。它具有出色的生态系统,工具和支持。春季应用主要用Java编写。尽管它们在许多不同的域和用例中都可以很好地服务,但它们可能不适合需要低延迟和高通量的现代应用。这是反应性编程范式可以提供帮助的地方,因为该范式旨在通过其非阻滞性质解决这些问题。春季已经支持通过Project Reactor的反应性编程。
这将是两部分的文章。在第一部分中,让我们从反应性编程简介开始。
1.反应性编程简介
反应性编程是一个范式,范围侧重于任务的非阻滞和异步处理。 JVM上反应性编程的一组规格/抽象称为Reactive Streams。项目反应堆是反应流的消息驱动,类型安全和功能实现,Spring(通过spring-webflux模块)使用它来启用反应性Web应用程序。反应流将数据处理模型为一端,产生值并消耗它们的一端。
有一些您应该熟悉的基本构建块。
org.reactivestreams.Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
这是发出值并订阅这些值的接口。它代表反应流的价值产生端。
org.reactivestreams.Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
这是定义反应流的进度的接口。可以有新的值,错误或完成信号。它代表反应流的价值末端。
org.reactivestreams.Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
这是订阅出版商的订户的接口。它允许请求多个消息。这样,反应流通过设计支持back-pressure。它还允许取消资源使用(不需要在不需要的情况下消耗消息)。在订阅中调用request
之前,流中不会流动数据。
reactor.core.publisher.Mono<T>
这是一个反应流的实现,其中可以存在T
类型的0或1个元素。它提供了许多不同的操作员来转换和组合流以获得所需的结果。
reactor.core.publisher.Flux<T>
这与Mono<T>
非常相似,但可以发射0或更多元素(不限于1)。
2.入门
在进行前进之前,值得花一点时间思考用于将应用程序转换为反应性的时间和精力。这是传统方法和反应性方法之间的总体观点和比较:
主题 | 传统 MVC应用程序 | 反应性应用 | |
---|---|---|---|
资源利用率 | 可能会阻止在利用率下引起的资源 | 更好地利用由于非阻滞性质而利用资源 | |
可伸缩性 | 受操作系统级线程界限 | 即使有限的资源 | ,更好的可伸缩性和性能 |
开发人员经验 | 更容易学习,教导和维护 | 需要一些习惯于反应性和功能性编程 | |
调试/工具 | 更容易使用更好的工具进行调试 | 目前很难进行调试和工具受到限制 |
要遵循本文的其余部分和演示目的,让我们首先创建传统的MVC风格的春季申请,以供天气信息。然后,我们将将此应用程序更新为反应性应用程序。
创建春季网络项目
您可以使用start.spring.io使用Java 17,Gradle,Spring Web,Spring Data JPA和H2数据库生成项目。这是我们项目的构建文件应该看起来像:
plugins {
java
id("org.springframework.boot") version "3.0.0"
id("io.spring.dependency-management") version "1.1.0"
}
group = "com.iodigital"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_17
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.springframework.boot:spring-boot-starter-web")
runtimeOnly("com.h2database:h2")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
tasks.withType<Test> {
useJUnitPlatform()
}
让我们创建我们的天气实体:
package com.iodigital.weather;
import jakarta.persistence.*;
import java.time.LocalDate;
import java.util.StringJoiner;
@Entity
public class WeatherInfo {
@Id
@GeneratedValue
private Long id;
private String region;
private String country;
private String state;
private String city;
private LocalDate localDate;
private String avgTemperature;
// Constructors, getters, setters here
}
然后是一个存储库:
package com.iodigital.weather;
import org.springframework.data.repository.ListCrudRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface WeatherRepository extends ListCrudRepository<WeatherInfo, Long> {
List<WeatherInfo> findAllByCityIgnoreCase(final String city);
}
然后服务:
package com.iodigital.weather;
import com.iodigital.weather.api.WeatherAPIClient;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class WeatherService {
private final WeatherRepository repository;
public WeatherService(final WeatherRepository repository) {
this.repository = repository;
}
public List<WeatherInfo> getAll() {
return repository.findAll();
}
public List<WeatherInfo> getForCity(final String city) {
return List.of(); // TODO: Will implement later
}
}
最后一个控制器:
package com.iodigital.weather;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/weather")
public class WeatherController {
private final WeatherService service;
public WeatherController(final WeatherService service) {
this.service = service;
}
@GetMapping
public List<WeatherInfo> getAll() {
return service.getAll();
}
@GetMapping("/city/{city}")
public List<WeatherInfo> getForCity(@PathVariable final String city) {
return service.getForCity(city);
}
}
现在让我们在终端运行我们的应用程序
gradle bootRun --console=plain
并发送请求进行测试。
# Gets nothing because DB is empty
curl localhost:8080/weather
[]
为了使此应用程序有用,我们还将与第三方天气数据提供商集成。我为此选择了weatherapi.com。在进行前进之前,注册并获取API密钥。
让我们添加匹配其API响应的模型(作为单独的文件):
// api/Location.java
package com.iodigital.weather.api;
import com.fasterxml.jackson.annotation.JsonProperty;
public record Location(@JsonProperty("name") String city, String region, String country) {}
// ---
// api/Temperature.java
package com.iodigital.weather.api;
import com.fasterxml.jackson.annotation.JsonProperty;
public record Temperature(@JsonProperty("avgtemp_f") double avgF) {}
// ---
// api/ForecastDay.java
package com.iodigital.weather.api;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDate;
public record ForecastDay(LocalDate date, @JsonProperty("day") Temperature temperature) {}
// ---
// api/Forecast.java
package com.iodigital.weather.api;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
public record Forecast(@JsonProperty("forecastday") List<ForecastDay> days) {}
// ---
// api/WeatherAPIResponse.java
package com.iodigital.weather.api;
import com.iodigital.weather.WeatherInfo;
import java.util.List;
public record WeatherAPIResponse(Location location, Forecast forecast) {
public List<WeatherInfo> toWeatherInfoList() {
final var region = location.region();
final var country = location.country();
final var city = location.city();
return forecast.days().stream().map(f ->
new WeatherInfo(
null, // id is null because this will be a new entity
region,
country,
"",
city,
f.date(),
"%.2f".formatted(f.temperature().avgF())
)
).toList();
}
}
然后,让我们将我们的凭据添加到application.properties
文件:
weatherapi.host=https://api.weatherapi.com
weatherapi.api-key=your-api-key
由于我们将发送HTTP请求,因此我们将使用Spring的RestTemplate
。让我们更新我们的应用程序类,以添加Bean
的定义。
package com.iodigital.weather;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
public class WeatherApplication {
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
public static void main(String[] args) {
SpringApplication.run(WeatherApplication.class, args);
}
}
现在我们可以添加一个可以与外部API交谈的客户端:
package com.iodigital.weather.api;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@Component
public class WeatherAPIClient {
private final RestTemplate http;
private final String host;
private final String apiKey;
public WeatherAPIClient(
final RestTemplate http,
@Value("${weatherapi.host}") final String host,
@Value("${weatherapi.api-key}") final String apiKey
) {
this.http = http;
this.host = host;
this.apiKey = apiKey;
}
public WeatherAPIResponse getWeather(final String city) {
return http
.getForObject(
"%s/v1/forecast.json?key=%s&q=%s&days=7".formatted(host, apiKey, city),
WeatherAPIResponse.class
);
}
}
现在我们可以为我们的WeatherService
添加一些逻辑:
package com.iodigital.weather;
import com.iodigital.weather.api.WeatherAPIClient;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class WeatherService {
private final WeatherAPIClient api;
private final WeatherRepository repository;
public WeatherService(final WeatherAPIClient api, final WeatherRepository repository) {
this.api = api;
this.repository = repository;
}
public List<WeatherInfo> getAll() {
return repository.findAll();
}
public List<WeatherInfo> getForCity(final String city) {
final var weatherForCity = repository.findAllByCityIgnoreCase(city);
if (!weatherForCity.isEmpty()) {
return weatherForCity;
}
final var apiResponse = api.getWeather(city);
return repository.saveAll(apiResponse.toWeatherInfoList());
}
}
最后,我们可以运行应用程序并再次测试:
# Gets nothing because DB is empty
curl localhost:8080/weather
[]
# Gets nothing from DB, then fetches from 3rd party, saves and returns some data
curl localhost:8080/weather/city/Amsterdam
[
{
"avgTemperature": "47,20",
"city": "Amsterdam",
"country": "Netherlands",
"id": 1,
"localDate": "2022-12-22",
"region": "North Holland",
"state": ""
},
...
]
# Gets data from DB directly
curl localhost:8080/weather
[
{
"avgTemperature": "47,20",
"city": "Amsterdam",
"country": "Netherlands",
"id": 1,
"localDate": "2022-12-22",
"region": "North Holland",
"state": ""
},
...
]
我们终于在那里。我们的应用程序有用。在下一节中,我们将反应!
使用春季webflux进行反应
现在我们有了一个应用程序,让我们将其变成一个反应性应用程序。为此,我们将用Spring WebFlux,Spring Data JPA替换Spring Web依赖性,并使用Spring Data R2DBC替换。我们还将为我们的H2数据库添加R2DBC依赖关系。 R2DBC与反应驱动程序一起使用,因此它将与我们的其余应用程序完美地集成,以使我们以非阻滞方式访问数据库。
让我们首先更新构建文件的dependencies
部分如下:
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.springframework.boot:spring-boot-starter-webflux")
runtimeOnly("com.h2database:h2")
runtimeOnly("io.r2dbc:r2dbc-h2")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
}
现在,我们需要分别调整控制器,服务和存储库层,直到我们的应用程序再次编译并使用反应性组件和类型。
作为经验法则,我们将用Mono<A>
替换A
的单个值,用Flux<A>
替换多个值List<A>
。
更新控制器很简单。我们的WeatherController
变为:
package com.iodigital.weather;
// ...
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/weather")
public class WeatherController {
// ...
@GetMapping
public Flux<WeatherInfo> getAll() {
return service.getAll();
}
@GetMapping("/city/{city}")
public Flux<WeatherInfo> getForCity(@PathVariable final String city) {
return service.getForCity(city);
}
}
由于服务代码将需要更改以利用Mono
和Flux
ES,所以让我们首先处理其余的更改,然后将服务留在结束时。
WeatherInfo
实体变为(请注意注释,不再有雅加达/JPA注释):
package com.iodigital.weather;
import org.springframework.data.annotation.Id;
// ...
public class WeatherInfo {
@Id
private Long id;
// ...
}
WeatherRepository
现在扩展了ReactiveCrudRepository
,因此db互动是非阻滞(使用r2dbc):
package com.iodigital.weather;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
@Repository
public interface WeatherRepository extends ReactiveCrudRepository<WeatherInfo, Long> {
Flux<WeatherInfo> findAllByCityIgnoreCase(final String city);
}
由于我们不再使用JPA/Hibernate,因此我们需要自己处理DB结构。幸运的是,对于像我们这样的简单情况,Springs直接支持DB模式初始化(现实世界应用程序可能会使用Flyway这样的工具为此目的)。让我们在资源文件夹中创建一个schema.sql
:
CREATE TABLE IF NOT EXISTS WEATHER_INFO(
id IDENTITY NOT NULL,
region VARCHAR(255) NOT NULL,
country VARCHAR(255) NOT NULL,
state VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
local_date DATE NOT NULL,
avg_temperature VARCHAR(255) NOT NULL
);
Web客户端也需要反应性,因此WeatherAPIClient
变为(使用WebClient
代替RestTemplate
):
package com.iodigital.weather.api;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@Component
public class WeatherAPIClient {
private static final Logger log = LoggerFactory.getLogger(WeatherAPIClient.class);
private final WebClient http;
private final String host;
private final String apiKey;
public WeatherAPIClient(
final WebClient http,
@Value("${weatherapi.host}") final String host,
@Value("${weatherapi.api-key}") final String apiKey
) {
this.http = http;
this.host = host;
this.apiKey = apiKey;
}
public Mono<WeatherAPIResponse> getWeather(final String city) {
return http
.get()
.uri("%s/v1/forecast.json?key=%s&q=%s&days=7".formatted(host, apiKey, city))
.exchangeToMono(response -> response.bodyToMono(WeatherAPIResponse.class))
.doFirst(() -> log.info("Getting weather forecast for city {}", city))
.doOnError(e -> log.error("Cannot get weather forecast for %s".formatted(city), e))
.doOnSuccess(response -> log.info("Weather forecast for city {}: {}", city, response));
}
}
这需要在应用程序中更新Bean
的定义:
package com.iodigital.weather;
// ...
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.web.reactive.function.client.WebClient;
// ...
@EnableR2dbcRepositories
@SpringBootApplication
public class WeatherApplication {
@Bean
public ReactorResourceFactory resourceFactory() {
return new ReactorResourceFactory();
}
@Bean
public WebClient webClient() {
return WebClient
.builder()
.clientConnector( // Some defafult configuration for WebClient
new ReactorClientHttpConnector(
resourceFactory(),
client -> client.responseTimeout(Duration.ofSeconds(10))
)
)
.build();
}
// ...
}
最后,让我们调整我们的服务层,以便WeatherService
变为:
package com.iodigital.weather;
// ...
import reactor.core.publisher.Flux;
@Service
public class WeatherService {
// ...
public Flux<WeatherInfo> getAll() {
return repository.findAll();
}
public Flux<WeatherInfo> getForCity(final String city) {
return repository
.findAllByCityIgnoreCase(city)
.switchIfEmpty(
api
.getWeather(city)
.flatMapMany(apiResponse ->
repository.saveAll(apiResponse.toWeatherInfoList())
)
);
}
}
最后,我们可以运行应用程序并测试:
# Gets nothing because DB is empty
curl localhost:8080/weather
[]
# Gets nothing from DB, then fetches from 3rd party, saves and returns some data
curl localhost:8080/weather/city/Amsterdam
[
{
"avgTemperature": "47,20",
"city": "Amsterdam",
"country": "Netherlands",
"id": 1,
"localDate": "2022-12-22",
"region": "North Holland",
"state": ""
},
...
]
# Gets data from DB directly
curl localhost:8080/weather
[
{
"avgTemperature": "47,20",
"city": "Amsterdam",
"country": "Netherlands",
"id": 1,
"localDate": "2022-12-22",
"region": "North Holland",
"state": ""
},
...
]
它有效!现在,我们有一个反应性应用程序,在每一层中都不块。
反应性应用程序的关键点是使用所定义的反应性类型和操作员来实现我们想要的结果。这也遵循功能编程的原则,因为我们构建的(不可变的)值实际上是我们程序的描述。这意味着当我们构建自己的流时,什么都没有运行。我们构建小的构建块,并将它们组合成更大的程序。当我们的流订阅时,整个程序最终会运行。对于此应用程序,它是由Spring Webflux进行的。
验证我们的应用程序实际上没有阻止
有一个名为Blockhound的很棒的工具,我们可以用来检测应用程序中是否有阻止调用。这样,我们可以确保在开发新功能时错误地破坏应用程序的非阻滞性质。设置它非常简单。
在build.gradle.kts
中的dependencies
下添加以下内容:
testImplementation("io.projectreactor.tools:blockhound:1.0.6.RELEASE")
然后在WeatherApplication
中静态安装:
package com.iodigital.weather;
// ...
import reactor.blockhound.BlockHound;
@EnableR2dbcRepositories
@SpringBootApplication
public class WeatherApplication {
static {
BlockHound
.builder()
// .allowBlockingCallsInside("Class", "method")
.install();
}
// ...
}
就是这样。现在,如果检测到任何地方的封锁调用,则阻止猎犬将引发异常。
3.下一步
为了回顾一下本文的第一部分,我们从传统的MVC风格的弹簧启动应用程序开始,然后将其转换为现代,反应式的逐步。
您可以在github.com/iodigital-com/reactive-kotlin-weather-api上找到此应用程序的源代码。
在the second part中,我们将通过将应用程序转换为Kotlin并添加Coroutines支持。