ð�ð»完整列表已使用的内容:
Spring 网络框架
Spring WebFlux 反应性休息服务
gRPC kotlin grpc
gRPC-Spring-Boot-Starter grpc春季启动器
Spring Data R2DBC 使用反应式驱动程序集成SQL数据库的规范
Zipkin 开源,端到端分布的tracing
Spring Cloud Sleuth 分布式跟踪的自动配置
Prometheus 监视和警报
Grafana 用于与Prometheus的所有内容组成可观察性仪表板
Kubernetes 自动部署,扩展和管理集装箱应用程序
Docker 和docker-compose
Helm kubernetes的包装经理
Flywaydb 迁移
您可以在 GitHub repository 中找到源代码。
对于这个项目,让我们使用Spring和Postgresql实施Kotlin Grpc微服务。
GRPC非常适合低潜伏期和高吞吐量通信,因此对于效率至关重要的微服务非常有用。
默认情况下,消息用Protobuf编码。虽然Protobuf有效地发送和接收二进制格式。
春天没有开箱即用的GRPC入门者,我们必须使用社区,最受欢迎的是 yidongnan
和 LogNet ,两者都很好并且可以使用,
对于这个项目,选择了第一个。
在第一步,我们必须添加 gRPC Kotlin Codegen Plugin for Protobuf Compiler 。
所有UI接口将在端口上可用:
Swagger UI: http://localhost:8000/webjars/swagger-ui/index.html
GOFA:Razeqiaion23
Zipkin UI:http://localhost:9411
Prometheus UI:http://localhost:9090
该项目的Docker-Compose文件:
version: "3.9"
services:
microservices_postgresql:
image: postgres:latest
container_name: microservices_postgresql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=bank_accounts
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./docker_data/microservices_pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: microservices_node_exporter
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: microservices_grafana
restart: always
image: grafana/grafana
ports:
- '3000:3000'
networks: [ "microservices" ]
zipkin:
image: openzipkin/zipkin:latest
restart: always
container_name: microservices_zipkin
ports:
- "9411:9411"
networks: [ "microservices" ]
networks:
microservices:
name: microservices
GRPC消息使用Protobuf序列化,Protobuf是一种有效的二进制消息格式,它在服务器和客户端上序列非常快,
及其序列化导致小消息有效载荷,在有限的带宽方案(例如移动应用程序)中很重要。
用于指定每个服务的RPC定义的接口合同将使用协议缓冲区定义。
每个微服务都将在此处为此定义一个原始文件。
首先,我们必须在proto文件中定义服务并对其进行编译,它最多具有单一方法和一台服务器流:
syntax = "proto3";
package com.example.grpc.bank.service;
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";
service BankAccountService {
rpc createBankAccount (CreateBankAccountRequest) returns (CreateBankAccountResponse);
rpc getBankAccountById (GetBankAccountByIdRequest) returns (GetBankAccountByIdResponse);
rpc depositBalance (DepositBalanceRequest) returns (DepositBalanceResponse);
rpc withdrawBalance (WithdrawBalanceRequest) returns (WithdrawBalanceResponse);
rpc getAllByBalance (GetAllByBalanceRequest) returns (stream GetAllByBalanceResponse);
rpc getAllByBalanceWithPagination(GetAllByBalanceWithPaginationRequest) returns (GetAllByBalanceWithPaginationResponse);
}
message BankAccountData {
string id = 1;
string firstName = 2;
string lastName = 3;
string email = 4;
string address = 5;
string currency = 6;
string phone = 7;
double balance = 8;
string createdAt = 9;
string updatedAt = 10;
}
message CreateBankAccountRequest {
string email = 1;
string firstName = 2;
string lastName = 3;
string address = 4;
string currency = 5;
string phone = 6;
double balance = 7;
}
message CreateBankAccountResponse {
BankAccountData bankAccount = 1;
}
message GetBankAccountByIdRequest {
string id = 1;
}
message GetBankAccountByIdResponse {
BankAccountData bankAccount = 1;
}
message DepositBalanceRequest {
string id = 1;
double balance = 2;
}
message DepositBalanceResponse {
BankAccountData bankAccount = 1;
}
message WithdrawBalanceRequest {
string id = 1;
double balance = 2;
}
message WithdrawBalanceResponse {
BankAccountData bankAccount = 1;
}
message GetAllByBalanceRequest {
double min = 1;
double max = 2;
int32 page = 3;
int32 size = 4;
}
message GetAllByBalanceResponse {
BankAccountData bankAccount = 1;
}
message GetAllByBalanceWithPaginationRequest {
double min = 1;
double max = 2;
int32 page = 3;
int32 size = 4;
}
message GetAllByBalanceWithPaginationResponse {
repeated BankAccountData bankAccount = 1;
int32 page = 2;
int32 size = 3;
int32 totalElements = 4;
int32 totalPages = 5;
bool isFirst = 6;
bool isLast = 7;
}
GRPC的实际Maven依赖性:
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-kotlin-stub</artifactId>
<version>${grpc.kotlin.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${java.grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-kotlin</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
和Maven Protobuf插件:
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${java.grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>grpc-kotlin</id>
<groupId>io.grpc</groupId>
<artifactId>protoc-gen-grpc-kotlin</artifactId>
<version>${grpc.kotlin.version}</version>
<classifier>jdk8</classifier>
<mainClass>io.grpc.kotlin.generator.GeneratorRunner</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
</execution>
<execution>
<id>compile-kt</id>
<goals>
<goal>compile-custom</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
<outputDirectory>${project.build.directory}/generated-sources/protobuf/kotlin</outputDirectory>
<pluginId>kotlin</pluginId>
</configuration>
</execution>
</executions>
</plugin>
该插件为您的每个GRPC服务生成一个类。
例如: bankAccountgrpcservicegrpc 其中 bankAccountgrpcservice 是原始文件中GRPC服务的名称。
此类包含客户端存根和服务器 inflybase 您需要扩展。
汇编完成后,我们可以实施GRPC服务。
@grpcservice 允许我们通过针对此服务的拦截器列表,因此我们可以在此处添加 loggrpcintector 。
为了验证请求,让我们使用 Spring-boot-starter-validation 使用 Hibernate Validator
@GrpcService(interceptors = [LogGrpcInterceptor::class])
class BankAccountGrpcService(
private val bankAccountService: BankAccountService,
private val tracer: Tracer,
private val validator: Validator
) : BankAccountServiceGrpcKt.BankAccountServiceCoroutineImplBase() {
override suspend fun createBankAccount(request: CreateBankAccountRequest): CreateBankAccountResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(CREATE_BANK_ACCOUNT)
runWithTracing(span) {
bankAccountService.createBankAccount(validate(BankAccount.of(request)))
.let { CreateBankAccountResponse.newBuilder().setBankAccount(it.toProto()).build() }
.also { it ->
log.info("created bank account: $it").also { span.tag("account", it.toString()) }
}
}
}
}
override suspend fun getBankAccountById(request: GetBankAccountByIdRequest): GetBankAccountByIdResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(GET_BANK_ACCOUNT_BY_ID)
runWithTracing(span) {
bankAccountService.getBankAccountById(UUID.fromString(request.id))
.let { GetBankAccountByIdResponse.newBuilder().setBankAccount(it.toProto()).build() }
.also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
}
}
}
override suspend fun depositBalance(request: DepositBalanceRequest): DepositBalanceResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(DEPOSIT_BALANCE)
runWithTracing(span) {
bankAccountService.depositAmount(UUID.fromString(request.id), BigDecimal.valueOf(request.balance))
.let { DepositBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
.also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
}
}
}
override suspend fun withdrawBalance(request: WithdrawBalanceRequest): WithdrawBalanceResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(WITHDRAW_BALANCE)
runWithTracing(span) {
bankAccountService.withdrawAmount(UUID.fromString(request.id), BigDecimal.valueOf(request.balance))
.let { WithdrawBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
.also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
}
}
}
override fun getAllByBalance(request: GetAllByBalanceRequest): Flow<GetAllByBalanceResponse> {
runWithTracing(tracer, GET_ALL_BY_BALANCE) {
return bankAccountService.findAllByBalanceBetween(validate(FindByBalanceRequestDto.of(request)))
.map { GetAllByBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
.flowOn(Dispatchers.IO + tracer.asContextElement())
}
}
override suspend fun getAllByBalanceWithPagination(request: GetAllByBalanceWithPaginationRequest): GetAllByBalanceWithPaginationResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_WITH_PAGINATION)
runWithTracing(span) {
bankAccountService.findByBalanceAmount(validate(FindByBalanceRequestDto.of(request)))
.toGetAllByBalanceWithPaginationResponse()
.also { log.info("response: $it") }.also { span.tag("response", it.toString()) }
}
}
}
private fun <T> validate(data: T): T {
return data.run {
val errors = validator.validate(data)
if (errors.isNotEmpty()) throw ConstraintViolationException(errors).also { log.error("validation error: ${it.localizedMessage}") }
data
}
}
companion object {
private val log = LoggerFactory.getLogger(BankAccountGrpcService::class.java)
private const val timeOutMillis = 5000L
private const val CREATE_BANK_ACCOUNT = "BankAccountGrpcService.createBankAccount"
private const val GET_BANK_ACCOUNT_BY_ID = "BankAccountGrpcService.getBankAccountById"
private const val DEPOSIT_BALANCE = "BankAccountGrpcService.depositBalance"
private const val WITHDRAW_BALANCE = "BankAccountGrpcService.withdrawBalance"
private const val GET_ALL_BY_BALANCE = "BankAccountGrpcService.getAllByBalance"
private const val GET_ALL_BY_BALANCE_WITH_PAGINATION = "BankAccountGrpcService.getAllByBalanceWithPagination"
}
}
fun Page<BankAccount>.toGetAllByBalanceWithPaginationResponse(): GetAllByBalanceWithPaginationResponse {
return GetAllByBalanceWithPaginationResponse
.newBuilder()
.setIsFirst(this.isFirst)
.setIsLast(this.isLast)
.setTotalElements(this.totalElements.toInt())
.setTotalPages(this.totalPages)
.setPage(this.pageable.pageNumber)
.setSize(this.pageable.pageSize)
.addAllBankAccount(this.content.map { it.toProto() })
.build()
}
拦截器是一个GRPC概念,允许应用程序与传入或发出的GRPC调用交互。
他们提供了一种丰富请求处理管道的方法。
我们可以添加GRPC拦截器,在这里我们实现 logGrpCinterceptor :
class LogGrpcInterceptor : ServerInterceptor {
override fun <ReqT : Any?, RespT : Any?> interceptCall(
call: ServerCall<ReqT, RespT>,
headers: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> {
log.info("Service: ${call.methodDescriptor.serviceName}, Method: ${call.methodDescriptor.bareMethodName}, Headers: $headers")
return next.startCall(call, headers)
}
companion object {
private val log = LoggerFactory.getLogger(LogGrpcInterceptor::class.java)
}
}
并将其添加到全局 grpcglobalserverspector :
@Configuration(proxyBeanMethods = false)
class GlobalInterceptorConfiguration {
@GrpcGlobalServerInterceptor
fun logServerInterceptor(): LogGrpcInterceptor? = LogGrpcInterceptor()
}
微服务的服务层有一些方法,例如,使用数据列表,它具有两种方法,
一个返回 pageimpl 用于一元方法响应的一个返回流量用于GRPC流响应方法。
当前的弹簧版本支持@transactional r2dbc
注释
界面和实现如下:
@Service
interface BankAccountService {
suspend fun createBankAccount(bankAccount: BankAccount): BankAccount
suspend fun getBankAccountById(id: UUID): BankAccount
suspend fun depositAmount(id: UUID, amount: BigDecimal): BankAccount
suspend fun withdrawAmount(id: UUID, amount: BigDecimal): BankAccount
fun findAllByBalanceBetween(requestDto: FindByBalanceRequestDto): Flow<BankAccount>
suspend fun findByBalanceAmount(requestDto: FindByBalanceRequestDto): Page<BankAccount>
}
@Service
class BankAccountServiceImpl(
private val bankRepository: BankRepository,
private val tracer: Tracer
) : BankAccountService {
@Transactional
override suspend fun createBankAccount(@Valid bankAccount: BankAccount): BankAccount =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(CREATE_BANK_ACCOUNT)
runWithTracing(span) {
bankRepository.save(bankAccount).also { span.tag("saved account", it.toString()) }
}
}
@Transactional(readOnly = true)
override suspend fun getBankAccountById(id: UUID): BankAccount =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(GET_BANK_ACCOUNT_BY_ID)
runWithTracing(span) {
bankRepository.findById(id).also { span.tag("bank account", it.toString()) }
?: throw BankAccountNotFoundException(id.toString())
}
}
@Transactional
override suspend fun depositAmount(id: UUID, amount: BigDecimal): BankAccount =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(DEPOSIT_AMOUNT)
runWithTracing(span) {
bankRepository.findById(id)
?.let { bankRepository.save(it.depositAmount(amount)) }
.also { span.tag("bank account", it.toString()) }
?: throw BankAccountNotFoundException(id.toString())
}
}
@Transactional
override suspend fun withdrawAmount(id: UUID, amount: BigDecimal): BankAccount =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(WITHDRAW_AMOUNT)
runWithTracing(span) {
bankRepository.findById(id)
?.let { bankRepository.save(it.withdrawAmount(amount)) }
.also { span.tag("bank account", it.toString()) }
?: throw BankAccountNotFoundException(id.toString())
}
}
@Transactional(readOnly = true)
override fun findAllByBalanceBetween(requestDto: FindByBalanceRequestDto): Flow<BankAccount> {
val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE)
runWithTracing(span) {
return bankRepository.findAllByBalanceBetween(
requestDto.minBalance,
requestDto.maxBalance,
requestDto.pageable
)
}
}
@Transactional(readOnly = true)
override suspend fun findByBalanceAmount(requestDto: FindByBalanceRequestDto): Page<BankAccount> =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_WITH_PAGINATION)
runWithTracing(span) {
bankRepository.findByBalanceAmount(requestDto.minBalance, requestDto.maxBalance, requestDto.pageable)
.also { span.tag("pagination", it.toString()) }
}
}
companion object {
private const val CREATE_BANK_ACCOUNT = "BankAccountService.createBankAccount"
private const val GET_BANK_ACCOUNT_BY_ID = "BankAccountService.getBankAccountById"
private const val DEPOSIT_AMOUNT = "BankAccountService.depositAmount"
private const val WITHDRAW_AMOUNT = "BankAccountService.withdrawAmount"
private const val GET_ALL_BY_BALANCE = "BankAccountService.findAllByBalanceBetween"
private const val GET_ALL_BY_BALANCE_WITH_PAGINATION = "BankAccountService.findByBalanceAmount"
}
}
R2DBC 是一种API,可为关系数据库提供反应性,非阻滞API。
使用此功能,您可以在Spring Boot中将反应性API读取并以反应性/同步方式写入数据库。
bankrepository 是 coroutinesortingrepository 来自Spring Data和我们的Custom BankPostgresrepository 实现。
对于我们的自定义bankPostgresrepository在此处使用的实现 r2dbcentityTemplate 和 databaseclient 。
如果我们想像JPA provide一样具有类似的分页响应,
我们必须手动创建 pageimpl 。
@Repository
interface BankRepository : CoroutineSortingRepository<BankAccount, UUID>, BankPostgresRepository {
fun findAllByBalanceBetween(min: BigDecimal, max: BigDecimal, pageable: Pageable): Flow<BankAccount>
}
@Repository
interface BankPostgresRepository {
suspend fun findByBalanceAmount(min: BigDecimal, max: BigDecimal, pageable: Pageable): Page<BankAccount>
}
@Repository
class BankPostgresRepositoryImpl(
private val template: R2dbcEntityTemplate,
private val databaseClient: DatabaseClient,
private val tracer: Tracer,
) : BankPostgresRepository {
override suspend fun findByBalanceAmount(min: BigDecimal, max: BigDecimal, pageable: Pageable): Page<BankAccount> =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_AMOUNT)
val query = Query.query(Criteria.where(BALANCE).between(min, max))
runWithTracing(span) {
val accountsList = async {
template.select(query.with(pageable), BankAccount::class.java)
.asFlow()
.toList()
}
val totalCount = async {
databaseClient.sql("SELECT count(bank_account_id) as total FROM microservices.bank_accounts WHERE balance BETWEEN :min AND :max")
.bind("min", min)
.bind("max", max)
.fetch()
.one()
.awaitFirst()
}
PageImpl(accountsList.await(), pageable, totalCount.await()["total"] as Long)
.also { span.tag("pagination", it.toString()) }
.also { log.debug("pagination: $it") }
}
}
companion object {
private val log = LoggerFactory.getLogger(BankPostgresRepositoryImpl::class.java)
private const val GET_ALL_BY_BALANCE_AMOUNT = "BankPostgresRepository.findByBalanceAmount"
}
}
对于错误处理GRPC启动器的错误提供了 grpcadvice ,标志着要检查的类别的类别以获取异常处理方法,
@grpcexceptionhandler 标记要执行的注释方法,如果抛出了指定的例外,
状态代码很好地描述了here
@GrpcAdvice
class GrpcExceptionAdvice {
@GrpcExceptionHandler(RuntimeException::class)
fun handleRuntimeException(ex: RuntimeException): StatusException {
val status = Status.INTERNAL.withDescription(ex.message).withCause(ex)
return status.asException().also { log.error("status: $status") }
}
@GrpcExceptionHandler(BankAccountNotFoundException::class)
fun handleBankAccountNotFoundException(ex: BankAccountNotFoundException): StatusException {
val status = Status.INVALID_ARGUMENT.withDescription(ex.message).withCause(ex)
return status.asException().also { log.error("status: $status") }
}
@GrpcExceptionHandler(MethodArgumentNotValidException::class)
fun handleMethodArgumentNotValidException(ex: MethodArgumentNotValidException): StatusException {
val errorMap: MutableMap<String, String> = HashMap()
ex.bindingResult.fieldErrors.forEach { error -> error.defaultMessage?.let { errorMap[error.field] = it } }
val status = Status.INVALID_ARGUMENT.withDescription(errorMap.toString()).withCause(ex)
return status.asException().also { log.error("status: $status") }
}
@GrpcExceptionHandler(ConstraintViolationException::class)
fun handleConstraintViolationException(ex: ConstraintViolationException): StatusException {
val status = Status.INVALID_ARGUMENT.withDescription(ex.toString()).withCause(ex)
return status.asException().also { log.error("status: $status") }
}
companion object {
private val log = LoggerFactory.getLogger(GrpcExceptionAdvice::class.java)
}
}
我们的微服务还具有HTTP控制器:
@Tag(name = "BankAccount", description = "Bank Account REST Controller")
@RestController
@RequestMapping(path = ["/api/v1/bank"])
class BankAccountController(private val bankAccountService: BankAccountService) {
@PostMapping(produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "createBankAccount",
summary = "Create bew bank account",
operationId = "createBankAccount",
description = "Create new bank for account for user"
)
suspend fun createBankAccount(@Valid @RequestBody req: CreateBankAccountDto) =
withTimeout(timeOutMillis) {
ResponseEntity
.status(HttpStatus.CREATED)
.body(bankAccountService.createBankAccount(BankAccount.of(req)).toSuccessHttpResponse())
.also { log.info("created bank account: $it") }
}
@PutMapping(path = ["/deposit/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "depositBalance",
summary = "Deposit balance",
operationId = "depositBalance",
description = "Deposit given amount to the bank account balance"
)
suspend fun depositBalance(
@PathVariable("id") id: UUID,
@Valid @RequestBody depositBalanceDto: DepositBalanceDto
) = withTimeout(timeOutMillis) {
ResponseEntity.ok(bankAccountService.depositAmount(id, depositBalanceDto.amount).toSuccessHttpResponse())
.also { log.info("response: $it") }
}
@PutMapping(path = ["/withdraw/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "withdrawBalance",
summary = "Withdraw balance",
operationId = "withdrawBalance",
description = "Withdraw given amount from the bank account balance"
)
suspend fun withdrawBalance(
@PathVariable("id") id: UUID,
@Valid @RequestBody withdrawBalanceDto: WithdrawBalanceDto
) = withTimeout(timeOutMillis) {
ResponseEntity.ok(bankAccountService.depositAmount(id, withdrawBalanceDto.amount).toSuccessHttpResponse())
.also { log.info("response: $it") }
}
@GetMapping(path = ["{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "getBankAccountById",
summary = "Get bank account by id",
operationId = "getBankAccountById",
description = "Get user bank account by given id"
)
suspend fun getBankAccountById(@PathVariable(required = true) id: UUID) = withTimeout(timeOutMillis) {
ResponseEntity.ok(bankAccountService.getBankAccountById(id).toSuccessHttpResponse())
.also { log.info("success get bank account: $it") }
}
@GetMapping(path = ["all/balance"], produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "findAllAccountsByBalance",
summary = "Find all bank account with given amount range",
operationId = "findAllAccounts",
description = "Find all bank accounts for the given balance range with pagination"
)
suspend fun findAllAccountsByBalance(
@RequestParam(name = "min", defaultValue = "0") min: BigDecimal,
@RequestParam(name = "max", defaultValue = "500000000") max: BigDecimal,
@RequestParam(name = "page", defaultValue = "0") page: Int,
@RequestParam(name = "size", defaultValue = "10") size: Int,
) = withTimeout(timeOutMillis) {
ResponseEntity.ok(bankAccountService.findByBalanceAmount(FindByBalanceRequestDto(min, max, PageRequest.of(page, size))))
.also { log.info("response: $it") }
}
@GetMapping(path = ["all/balance/stream"])
@Operation(
method = "getAllByBalanceStream",
summary = "Find all bank account with given amount range returns stream",
operationId = "getAllByBalanceStream",
description = "Find all bank accounts for the given balance range"
)
fun getAllByBalanceStream(
@RequestParam(name = "min", defaultValue = "0") min: BigDecimal,
@RequestParam(name = "max", defaultValue = "500000000") max: BigDecimal,
@RequestParam(name = "page", defaultValue = "0") page: Int,
@RequestParam(name = "size", defaultValue = "10") size: Int,
): Flow<SuccessBankAccountResponse> {
return bankAccountService.findAllByBalanceBetween(FindByBalanceRequestDto(min, max, PageRequest.of(page, size)))
.map { it -> it.toSuccessHttpResponse().also { log.info("response: $it") } }
}
companion object {
private val log = LoggerFactory.getLogger(BankAccountController::class.java)
private const val timeOutMillis = 5000L
}
}
用于与GRPC合作的UI客户量很少,个人喜欢使用 BloomRPC ,
另一个有用的工具是 grpcurl 和 grpcui 。
下一步让我们将我们的微服务部署到K8,
在此示例中,我们可以使用简单的多阶段文件来构建Docker映像:
FROM --platform=linux/arm64 azul/zulu-openjdk-alpine:17 as builder
ARG JAR_FILE=target/KotlinSpringGrpc-0.0.1-SNAPSHOT.jar
COPY ${JAR_FILE} application.jar
RUN java -Djarmode=layertools -jar application.jar extract
FROM azul/zulu-openjdk-alpine:17
COPY --from=builder dependencies/ ./
COPY --from=builder snapshot-dependencies/ ./
COPY --from=builder spring-boot-loader/ ./
COPY --from=builder application/ ./
ENTRYPOINT ["java", "org.springframework.boot.loader.JarLauncher", "-XX:MaxRAMPercentage=75", "-XX:+UseG1GC"]
对于使用K8的工作,喜欢使用 Helm ,微服务部署很简单,并且具有部署本身,服务,配置
和ServiceMonitor。
最后一个是因为监视使用 kube-prometheus-stack helm chart
微服务头盔图yaml文件是:
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.microservice.name }}
labels:
app: {{ .Values.microservice.name }}
spec:
replicas: {{ .Values.microservice.replicas }}
template:
metadata:
name: {{ .Values.microservice.name }}
labels:
app: {{ .Values.microservice.name }}
spec:
containers:
- name: {{ .Values.microservice.name }}
image: {{ .Values.microservice.image }}
imagePullPolicy: Always
resources:
requests:
memory: {{ .Values.microservice.resources.requests.memory }}
cpu: {{ .Values.microservice.resources.requests.cpu }}
limits:
memory: {{ .Values.microservice.resources.limits.memory }}
cpu: {{ .Values.microservice.resources.limits.cpu }}
livenessProbe:
httpGet:
port: {{ .Values.microservice.livenessProbe.httpGet.port }}
path: {{ .Values.microservice.livenessProbe.httpGet.path }}
initialDelaySeconds: {{ .Values.microservice.livenessProbe.initialDelaySeconds }}
periodSeconds: {{ .Values.microservice.livenessProbe.periodSeconds }}
readinessProbe:
httpGet:
port: {{ .Values.microservice.readinessProbe.httpGet.port }}
path: {{ .Values.microservice.readinessProbe.httpGet.path }}
initialDelaySeconds: {{ .Values.microservice.readinessProbe.initialDelaySeconds }}
periodSeconds: {{ .Values.microservice.readinessProbe.periodSeconds }}
ports:
- containerPort: {{ .Values.microservice.ports.http.containerPort }}
name: {{ .Values.microservice.ports.http.name }}
- containerPort: {{ .Values.microservice.ports.grpc.containerPort}}
name: {{ .Values.microservice.ports.grpc.name }}
env:
- name: SPRING_APPLICATION_NAME
value: microservice_k8s
- name: JAVA_OPTS
value: "-XX:+UseG1GC -XX:MaxRAMPercentage=75"
- name: SERVER_PORT
valueFrom:
configMapKeyRef:
key: server_port
name: {{ .Values.microservice.name }}-config-map
- name: GRPC_SERVER_PORT
valueFrom:
configMapKeyRef:
key: grpc_server_port
name: {{ .Values.microservice.name }}-config-map
- name: SPRING_ZIPKIN_BASE_URL
valueFrom:
configMapKeyRef:
key: zipkin_base_url
name: {{ .Values.microservice.name }}-config-map
- name: SPRING_R2DBC_URL
valueFrom:
configMapKeyRef:
key: r2dbc_url
name: {{ .Values.microservice.name }}-config-map
- name: SPRING_FLYWAY_URL
valueFrom:
configMapKeyRef:
key: flyway_url
name: {{ .Values.microservice.name }}-config-map
restartPolicy: Always
terminationGracePeriodSeconds: {{ .Values.microservice.terminationGracePeriodSeconds }}
selector:
matchLabels:
app: {{ .Values.microservice.name }}
---
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.microservice.name }}-service
labels:
app: {{ .Values.microservice.name }}
spec:
selector:
app: {{ .Values.microservice.name }}
ports:
- port: {{ .Values.microservice.service.httpPort }}
name: http
protocol: TCP
targetPort: http
- port: {{ .Values.microservice.service.grpcPort }}
name: grpc
protocol: TCP
targetPort: grpc
type: ClusterIP
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
release: monitoring
name: {{ .Values.microservice.name }}-service-monitor
namespace: default
spec:
selector:
matchLabels:
app: {{ .Values.microservice.name }}
endpoints:
- interval: 10s
port: http
path: /actuator/prometheus
namespaceSelector:
matchNames:
- default
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.microservice.name }}-config-map
data:
server_port: "8080"
grpc_server_port: "8000"
zipkin_base_url: zipkin:9411
r2dbc_url: "r2dbc:postgresql://postgres:5432/bank_accounts"
flyway_url: "jdbc:postgresql://postgres:5432/bank_accounts"
和 values.yaml 文件:
microservice:
name: kotlin-spring-microservice
image: alexanderbryksin/kotlin_spring_grpc_microservice:latest
replicas: 1
livenessProbe:
httpGet:
port: 8080
path: /actuator/health/liveness
initialDelaySeconds: 60
periodSeconds: 5
readinessProbe:
httpGet:
port: 8080
path: /actuator/health/readiness
initialDelaySeconds: 60
periodSeconds: 5
ports:
http:
name: http
containerPort: 8080
grpc:
name: grpc
containerPort: 8000
terminationGracePeriodSeconds: 20
service:
httpPort: 8080
grpcPort: 8000
resources:
requests:
memory: '6000Mi'
cpu: "3000m"
limits:
memory: '6000Mi'
cpu: "3000m"
作为用于使用K8的UI工具,个人喜欢使用 Lens 。
您可以找到 GitHub repository here 的更多详细信息和源代码,
当然,总是在现实世界中,业务逻辑和基础架构代码要复杂得多,我们必须实施更多必要的功能。
我希望这篇文章有用和有益,并乐于收到任何反馈或问题,请随意 contact me email 或任何< strong> messengers :)