kotlin grpc带有弹簧ð‫
#java #kotlin #spring #grpc

ð�ð»完整列表已使用的内容:

Spring 网络框架
Spring WebFlux 反应性休息服务
gRPC kotlin grpc
gRPC-Spring-Boot-Starter grpc春季启动器
Spring Data R2DBC 使用反应式驱动程序集成SQL数据库的规范
Zipkin 开源,端到端分布的tracing
Spring Cloud Sleuth 分布式跟踪的自动配置
Prometheus 监视和警报
Grafana 用于与Prometheus的所有内容组成可观察性仪表板
Kubernetes 自动部署,扩展和管理集装箱应用程序
Docker 和docker-compose
Helm kubernetes的包装经理
Flywaydb 迁移

您可以在 GitHub repository 中找到源代码。
对于这个项目,让我们使用Spring和Postgresql实施Kotlin Grpc微服务。
GRPC非常适合低潜伏期和高吞吐量通信,因此对于效率至关重要的微服务非常有用。
默认情况下,消息用Protobuf编码。虽然Protobuf有效地发送和接收二进制格式。
春天没有开箱即用的GRPC入门者,我们必须使用社区,最受欢迎的是 yidongnan
LogNet ,两者都很好并且可以使用,
对于这个项目,选择了第一个。
在第一步,我们必须添加 gRPC Kotlin Codegen Plugin for Protobuf Compiler

所有UI接口将在端口上可用:

Swagger UI: http://localhost:8000/webjars/swagger-ui/index.html

Swagger

GOFA:Razeqiaion23

Grafana

Zipkin UI:http://localhost:9411

Zipkin

Prometheus UI:http://localhost:9090

Prometheus

该项目的Docker-Compose文件:

version: "3.9"

services:
  microservices_postgresql:
    image: postgres:latest
    container_name: microservices_postgresql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=bank_accounts
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./docker_data/microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: microservices_node_exporter
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: microservices_grafana
    restart: always
    image: grafana/grafana
    ports:
      - '3000:3000'
    networks: [ "microservices" ]

  zipkin:
    image: openzipkin/zipkin:latest
    restart: always
    container_name: microservices_zipkin
    ports:
      - "9411:9411"
    networks: [ "microservices" ]

networks:
  microservices:
    name: microservices

GRPC消息使用Protobuf序列化,Protobuf是一种有效的二进制消息格式,它在服务器和客户端上序列非常快,
及其序列化导致小消息有效载荷,在有限的带宽方案(例如移动应用程序)中很重要。
用于指定每个服务的RPC定义的接口合同将使用协议缓冲区定义。
每个微服务都将在此处为此定义一个原始文件。
首先,我们必须在proto文件中定义服务并对其进行编译,它最多具有单一方法和一台服务器流:

syntax = "proto3";

package com.example.grpc.bank.service;

import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

service BankAccountService {
  rpc createBankAccount (CreateBankAccountRequest) returns (CreateBankAccountResponse);
  rpc getBankAccountById (GetBankAccountByIdRequest) returns (GetBankAccountByIdResponse);
  rpc depositBalance (DepositBalanceRequest) returns (DepositBalanceResponse);
  rpc withdrawBalance (WithdrawBalanceRequest) returns (WithdrawBalanceResponse);
  rpc getAllByBalance (GetAllByBalanceRequest) returns (stream GetAllByBalanceResponse);
  rpc getAllByBalanceWithPagination(GetAllByBalanceWithPaginationRequest) returns (GetAllByBalanceWithPaginationResponse);
}

message BankAccountData {
  string id = 1;
  string firstName = 2;
  string lastName = 3;
  string email = 4;
  string address = 5;
  string currency = 6;
  string phone = 7;
  double balance = 8;
  string createdAt = 9;
  string updatedAt = 10;
}

message CreateBankAccountRequest {
  string email = 1;
  string firstName = 2;
  string lastName = 3;
  string address = 4;
  string currency = 5;
  string phone = 6;
  double balance = 7;
}

message CreateBankAccountResponse {
  BankAccountData bankAccount = 1;
}

message GetBankAccountByIdRequest {
  string id = 1;
}

message GetBankAccountByIdResponse {
  BankAccountData bankAccount = 1;
}

message DepositBalanceRequest {
  string id = 1;
  double balance = 2;
}

message DepositBalanceResponse {
  BankAccountData bankAccount = 1;
}

message WithdrawBalanceRequest {
  string id = 1;
  double balance = 2;
}

message WithdrawBalanceResponse {
  BankAccountData bankAccount = 1;
}

message GetAllByBalanceRequest {
  double min = 1;
  double max = 2;
  int32 page = 3;
  int32 size = 4;
}

message GetAllByBalanceResponse {
  BankAccountData bankAccount = 1;
}

message GetAllByBalanceWithPaginationRequest {
  double min = 1;
  double max = 2;
  int32 page = 3;
  int32 size = 4;
}

message GetAllByBalanceWithPaginationResponse {
  repeated BankAccountData bankAccount = 1;
  int32 page = 2;
  int32 size = 3;
  int32 totalElements = 4;
  int32 totalPages = 5;
  bool isFirst = 6;
  bool isLast = 7;
}

GRPC的实际Maven依赖性:

<dependencies>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-kotlin-stub</artifactId>
        <version>${grpc.kotlin.version}</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-protobuf</artifactId>
        <version>${java.grpc.version}</version>
    </dependency>
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-kotlin</artifactId>
        <version>${protobuf.version}</version>
    </dependency>
</dependencies>

和Maven Protobuf插件:

<plugin>
    <groupId>org.xolstice.maven.plugins</groupId>
    <artifactId>protobuf-maven-plugin</artifactId>
    <version>0.6.1</version>
    <executions>
        <execution>
            <id>compile</id>
            <goals>
                <goal>compile</goal>
                <goal>compile-custom</goal>
            </goals>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:${protobuf.protoc.version}:exe:${os.detected.classifier}
                </protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>io.grpc:protoc-gen-grpc-java:${java.grpc.version}:exe:${os.detected.classifier}
                </pluginArtifact>
                <protocPlugins>
                    <protocPlugin>
                        <id>grpc-kotlin</id>
                        <groupId>io.grpc</groupId>
                        <artifactId>protoc-gen-grpc-kotlin</artifactId>
                        <version>${grpc.kotlin.version}</version>
                        <classifier>jdk8</classifier>
                        <mainClass>io.grpc.kotlin.generator.GeneratorRunner</mainClass>
                    </protocPlugin>
                </protocPlugins>
            </configuration>
        </execution>
        <execution>
            <id>compile-kt</id>
            <goals>
                <goal>compile-custom</goal>
            </goals>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:${protobuf.protoc.version}:exe:${os.detected.classifier}
                </protocArtifact>
                <outputDirectory>${project.build.directory}/generated-sources/protobuf/kotlin</outputDirectory>
                <pluginId>kotlin</pluginId>
            </configuration>
        </execution>
    </executions>
</plugin>

该插件为您的每个GRPC服务生成一个类。
例如: bankAccountgrpcservicegrpc 其中 bankAccountgrpcservice 是原始文件中GRPC服务的名称。
此类包含客户端存根和服务器 inflybase 您需要扩展。
汇编完成后,我们可以实施GRPC服务。
@grpcservice 允许我们通过针对此服务的拦截器列表,因此我们可以在此处添加 loggrpcintector
为了验证请求,让我们使用 Spring-boot-starter-validation 使用 Hibernate Validator

@GrpcService(interceptors = [LogGrpcInterceptor::class])
class BankAccountGrpcService(
    private val bankAccountService: BankAccountService,
    private val tracer: Tracer,
    private val validator: Validator
) : BankAccountServiceGrpcKt.BankAccountServiceCoroutineImplBase() {


    override suspend fun createBankAccount(request: CreateBankAccountRequest): CreateBankAccountResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(CREATE_BANK_ACCOUNT)

                runWithTracing(span) {
                    bankAccountService.createBankAccount(validate(BankAccount.of(request)))
                        .let { CreateBankAccountResponse.newBuilder().setBankAccount(it.toProto()).build() }
                        .also { it ->
                            log.info("created bank account: $it").also { span.tag("account", it.toString()) }
                        }
                }
            }
        }

    override suspend fun getBankAccountById(request: GetBankAccountByIdRequest): GetBankAccountByIdResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(GET_BANK_ACCOUNT_BY_ID)

                runWithTracing(span) {
                    bankAccountService.getBankAccountById(UUID.fromString(request.id))
                        .let { GetBankAccountByIdResponse.newBuilder().setBankAccount(it.toProto()).build() }
                        .also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
                }
            }
        }

    override suspend fun depositBalance(request: DepositBalanceRequest): DepositBalanceResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(DEPOSIT_BALANCE)

                runWithTracing(span) {
                    bankAccountService.depositAmount(UUID.fromString(request.id), BigDecimal.valueOf(request.balance))
                        .let { DepositBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
                        .also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
                }
            }
        }

    override suspend fun withdrawBalance(request: WithdrawBalanceRequest): WithdrawBalanceResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(WITHDRAW_BALANCE)

                runWithTracing(span) {
                    bankAccountService.withdrawAmount(UUID.fromString(request.id), BigDecimal.valueOf(request.balance))
                        .let { WithdrawBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
                        .also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
                }
            }
        }

    override fun getAllByBalance(request: GetAllByBalanceRequest): Flow<GetAllByBalanceResponse> {
        runWithTracing(tracer, GET_ALL_BY_BALANCE) {
            return bankAccountService.findAllByBalanceBetween(validate(FindByBalanceRequestDto.of(request)))
                .map { GetAllByBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
                .flowOn(Dispatchers.IO + tracer.asContextElement())
        }
    }

    override suspend fun getAllByBalanceWithPagination(request: GetAllByBalanceWithPaginationRequest): GetAllByBalanceWithPaginationResponse =
        withContext(tracer.asContextElement()) {
            withTimeout(timeOutMillis) {
                val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_WITH_PAGINATION)

                runWithTracing(span) {
                    bankAccountService.findByBalanceAmount(validate(FindByBalanceRequestDto.of(request)))
                        .toGetAllByBalanceWithPaginationResponse()
                        .also { log.info("response: $it") }.also { span.tag("response", it.toString()) }
                }
            }

        }


    private fun <T> validate(data: T): T {
        return data.run {
            val errors = validator.validate(data)
            if (errors.isNotEmpty()) throw ConstraintViolationException(errors).also { log.error("validation error: ${it.localizedMessage}") }
            data
        }
    }


    companion object {
        private val log = LoggerFactory.getLogger(BankAccountGrpcService::class.java)
        private const val timeOutMillis = 5000L

        private const val CREATE_BANK_ACCOUNT = "BankAccountGrpcService.createBankAccount"
        private const val GET_BANK_ACCOUNT_BY_ID = "BankAccountGrpcService.getBankAccountById"
        private const val DEPOSIT_BALANCE = "BankAccountGrpcService.depositBalance"
        private const val WITHDRAW_BALANCE = "BankAccountGrpcService.withdrawBalance"
        private const val GET_ALL_BY_BALANCE = "BankAccountGrpcService.getAllByBalance"
        private const val GET_ALL_BY_BALANCE_WITH_PAGINATION = "BankAccountGrpcService.getAllByBalanceWithPagination"
    }
}

fun Page<BankAccount>.toGetAllByBalanceWithPaginationResponse(): GetAllByBalanceWithPaginationResponse {
    return GetAllByBalanceWithPaginationResponse
        .newBuilder()
        .setIsFirst(this.isFirst)
        .setIsLast(this.isLast)
        .setTotalElements(this.totalElements.toInt())
        .setTotalPages(this.totalPages)
        .setPage(this.pageable.pageNumber)
        .setSize(this.pageable.pageSize)
        .addAllBankAccount(this.content.map { it.toProto() })
        .build()
}

Bloom-RPC

拦截器是一个GRPC概念,允许应用程序与传入或发出的GRPC调用交互。
他们提供了一种丰富请求处理管道的方法。
我们可以添加GRPC拦截器,在这里我们实现 logGrpCinterceptor

class LogGrpcInterceptor : ServerInterceptor {

    override fun <ReqT : Any?, RespT : Any?> interceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {
        log.info("Service: ${call.methodDescriptor.serviceName}, Method: ${call.methodDescriptor.bareMethodName}, Headers: $headers")
        return next.startCall(call, headers)
    }

    companion object {
        private val log = LoggerFactory.getLogger(LogGrpcInterceptor::class.java)
    }
}

并将其添加到全局 grpcglobalserverspector

@Configuration(proxyBeanMethods = false)
class GlobalInterceptorConfiguration {

    @GrpcGlobalServerInterceptor
    fun logServerInterceptor(): LogGrpcInterceptor? = LogGrpcInterceptor()
}

Zipkin

微服务的服务层有一些方法,例如,使用数据列表,它具有两种方法,
一个返回 pageimpl 用于一元方法响应的一个返回流量用于GRPC流响应方法。
当前的弹簧版本支持@transactional r2dbc
注释 界面和实现如下:

@Service
interface BankAccountService {

    suspend fun createBankAccount(bankAccount: BankAccount): BankAccount

    suspend fun getBankAccountById(id: UUID): BankAccount

    suspend fun depositAmount(id: UUID, amount: BigDecimal): BankAccount

    suspend fun withdrawAmount(id: UUID, amount: BigDecimal): BankAccount

    fun findAllByBalanceBetween(requestDto: FindByBalanceRequestDto): Flow<BankAccount>

    suspend fun findByBalanceAmount(requestDto: FindByBalanceRequestDto): Page<BankAccount>
}
@Service
class BankAccountServiceImpl(
    private val bankRepository: BankRepository,
    private val tracer: Tracer
) : BankAccountService {

    @Transactional
    override suspend fun createBankAccount(@Valid bankAccount: BankAccount): BankAccount =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(CREATE_BANK_ACCOUNT)

            runWithTracing(span) {
                bankRepository.save(bankAccount).also { span.tag("saved account", it.toString()) }
            }
        }

    @Transactional(readOnly = true)
    override suspend fun getBankAccountById(id: UUID): BankAccount =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(GET_BANK_ACCOUNT_BY_ID)

            runWithTracing(span) {
                bankRepository.findById(id).also { span.tag("bank account", it.toString()) }
                    ?: throw BankAccountNotFoundException(id.toString())
            }
        }

    @Transactional
    override suspend fun depositAmount(id: UUID, amount: BigDecimal): BankAccount =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(DEPOSIT_AMOUNT)

            runWithTracing(span) {
                bankRepository.findById(id)
                    ?.let { bankRepository.save(it.depositAmount(amount)) }
                    .also { span.tag("bank account", it.toString()) }
                    ?: throw BankAccountNotFoundException(id.toString())
            }
        }

    @Transactional
    override suspend fun withdrawAmount(id: UUID, amount: BigDecimal): BankAccount =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(WITHDRAW_AMOUNT)

            runWithTracing(span) {
                bankRepository.findById(id)
                    ?.let { bankRepository.save(it.withdrawAmount(amount)) }
                    .also { span.tag("bank account", it.toString()) }
                    ?: throw BankAccountNotFoundException(id.toString())
            }
        }

    @Transactional(readOnly = true)
    override fun findAllByBalanceBetween(requestDto: FindByBalanceRequestDto): Flow<BankAccount> {
        val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE)

        runWithTracing(span) {
            return bankRepository.findAllByBalanceBetween(
                requestDto.minBalance,
                requestDto.maxBalance,
                requestDto.pageable
            )
        }
    }

    @Transactional(readOnly = true)
    override suspend fun findByBalanceAmount(requestDto: FindByBalanceRequestDto): Page<BankAccount> =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_WITH_PAGINATION)

            runWithTracing(span) {
                bankRepository.findByBalanceAmount(requestDto.minBalance, requestDto.maxBalance, requestDto.pageable)
                    .also { span.tag("pagination", it.toString()) }
            }
        }


    companion object {
        private const val CREATE_BANK_ACCOUNT = "BankAccountService.createBankAccount"
        private const val GET_BANK_ACCOUNT_BY_ID = "BankAccountService.getBankAccountById"
        private const val DEPOSIT_AMOUNT = "BankAccountService.depositAmount"
        private const val WITHDRAW_AMOUNT = "BankAccountService.withdrawAmount"
        private const val GET_ALL_BY_BALANCE = "BankAccountService.findAllByBalanceBetween"
        private const val GET_ALL_BY_BALANCE_WITH_PAGINATION = "BankAccountService.findByBalanceAmount"
    }
}

Bloom-RPC

R2DBC 是一种API,可为关系数据库提供反应性,非阻滞API。
使用此功能,您可以在Spring Boot中将反应性API读取并以反应性/同步方式写入数据库。
bankrepository coroutinesortingrepository 来自Spring Data和我们的Custom BankPostgresrepository 实现。
对于我们的自定义bankPostgresrepository在此处使用的实现 r2dbcentityTemplate databaseclient
如果我们想像JPA provide一样具有类似的分页响应,
我们必须手动创建 pageimpl

Zipkin

@Repository
interface BankRepository : CoroutineSortingRepository<BankAccount, UUID>, BankPostgresRepository {
    fun findAllByBalanceBetween(min: BigDecimal, max: BigDecimal, pageable: Pageable): Flow<BankAccount>
}
@Repository
interface BankPostgresRepository {
    suspend fun findByBalanceAmount(min: BigDecimal, max: BigDecimal, pageable: Pageable): Page<BankAccount>
}
@Repository
class BankPostgresRepositoryImpl(
    private val template: R2dbcEntityTemplate,
    private val databaseClient: DatabaseClient,
    private val tracer: Tracer,
) : BankPostgresRepository {

    override suspend fun findByBalanceAmount(min: BigDecimal, max: BigDecimal, pageable: Pageable): Page<BankAccount> =
        withContext(Dispatchers.IO + tracer.asContextElement()) {
            val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_AMOUNT)
            val query = Query.query(Criteria.where(BALANCE).between(min, max))

            runWithTracing(span) {
                val accountsList = async {
                    template.select(query.with(pageable), BankAccount::class.java)
                        .asFlow()
                        .toList()
                }

                val totalCount = async {
                    databaseClient.sql("SELECT count(bank_account_id) as total FROM microservices.bank_accounts WHERE balance BETWEEN :min AND :max")
                        .bind("min", min)
                        .bind("max", max)
                        .fetch()
                        .one()
                        .awaitFirst()
                }

                PageImpl(accountsList.await(), pageable, totalCount.await()["total"] as Long)
                    .also { span.tag("pagination", it.toString()) }
                    .also { log.debug("pagination: $it") }
            }
        }

    companion object {
        private val log = LoggerFactory.getLogger(BankPostgresRepositoryImpl::class.java)
        private const val GET_ALL_BY_BALANCE_AMOUNT = "BankPostgresRepository.findByBalanceAmount"
    }
}

对于错误处理GRPC启动器的错误提供了 grpcadvice ,标志着要检查的类别的类别以获取异常处理方法,
@grpcexceptionhandler 标记要执行的注释方法,如果抛出了指定的例外,
状态代码很好地描述了here

@GrpcAdvice
class GrpcExceptionAdvice {

    @GrpcExceptionHandler(RuntimeException::class)
    fun handleRuntimeException(ex: RuntimeException): StatusException {
        val status = Status.INTERNAL.withDescription(ex.message).withCause(ex)
        return status.asException().also { log.error("status: $status") }
    }

    @GrpcExceptionHandler(BankAccountNotFoundException::class)
    fun handleBankAccountNotFoundException(ex: BankAccountNotFoundException): StatusException {
        val status = Status.INVALID_ARGUMENT.withDescription(ex.message).withCause(ex)
        return status.asException().also { log.error("status: $status") }
    }

    @GrpcExceptionHandler(MethodArgumentNotValidException::class)
    fun handleMethodArgumentNotValidException(ex: MethodArgumentNotValidException): StatusException {
        val errorMap: MutableMap<String, String> = HashMap()
        ex.bindingResult.fieldErrors.forEach { error -> error.defaultMessage?.let { errorMap[error.field] = it } }
        val status = Status.INVALID_ARGUMENT.withDescription(errorMap.toString()).withCause(ex)
        return status.asException().also { log.error("status: $status") }
    }

    @GrpcExceptionHandler(ConstraintViolationException::class)
    fun handleConstraintViolationException(ex: ConstraintViolationException): StatusException {
        val status = Status.INVALID_ARGUMENT.withDescription(ex.toString()).withCause(ex)
        return status.asException().also { log.error("status: $status") }
    }


    companion object {
        private val log = LoggerFactory.getLogger(GrpcExceptionAdvice::class.java)
    }
}

我们的微服务还具有HTTP控制器:

@Tag(name = "BankAccount", description = "Bank Account REST Controller")
@RestController
@RequestMapping(path = ["/api/v1/bank"])
class BankAccountController(private val bankAccountService: BankAccountService) {

    @PostMapping(produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "createBankAccount",
        summary = "Create bew bank account",
        operationId = "createBankAccount",
        description = "Create new bank for account for user"
    )
    suspend fun createBankAccount(@Valid @RequestBody req: CreateBankAccountDto) =
        withTimeout(timeOutMillis) {
            ResponseEntity
                .status(HttpStatus.CREATED)
                .body(bankAccountService.createBankAccount(BankAccount.of(req)).toSuccessHttpResponse())
                .also { log.info("created bank account: $it") }
        }

    @PutMapping(path = ["/deposit/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "depositBalance",
        summary = "Deposit balance",
        operationId = "depositBalance",
        description = "Deposit given amount to the bank account balance"
    )
    suspend fun depositBalance(
        @PathVariable("id") id: UUID,
        @Valid @RequestBody depositBalanceDto: DepositBalanceDto
    ) = withTimeout(timeOutMillis) {
        ResponseEntity.ok(bankAccountService.depositAmount(id, depositBalanceDto.amount).toSuccessHttpResponse())
            .also { log.info("response: $it") }
    }

    @PutMapping(path = ["/withdraw/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "withdrawBalance",
        summary = "Withdraw balance",
        operationId = "withdrawBalance",
        description = "Withdraw given amount from the bank account balance"
    )
    suspend fun withdrawBalance(
        @PathVariable("id") id: UUID,
        @Valid @RequestBody withdrawBalanceDto: WithdrawBalanceDto
    ) = withTimeout(timeOutMillis) {
        ResponseEntity.ok(bankAccountService.depositAmount(id, withdrawBalanceDto.amount).toSuccessHttpResponse())
            .also { log.info("response: $it") }
    }

    @GetMapping(path = ["{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "getBankAccountById",
        summary = "Get bank account by id",
        operationId = "getBankAccountById",
        description = "Get user bank account by given id"
    )
    suspend fun getBankAccountById(@PathVariable(required = true) id: UUID) = withTimeout(timeOutMillis) {
        ResponseEntity.ok(bankAccountService.getBankAccountById(id).toSuccessHttpResponse())
            .also { log.info("success get bank account: $it") }
    }


    @GetMapping(path = ["all/balance"], produces = [MediaType.APPLICATION_JSON_VALUE])
    @Operation(
        method = "findAllAccountsByBalance",
        summary = "Find all bank account with given amount range",
        operationId = "findAllAccounts",
        description = "Find all bank accounts for the given balance range with pagination"
    )
    suspend fun findAllAccountsByBalance(
        @RequestParam(name = "min", defaultValue = "0") min: BigDecimal,
        @RequestParam(name = "max", defaultValue = "500000000") max: BigDecimal,
        @RequestParam(name = "page", defaultValue = "0") page: Int,
        @RequestParam(name = "size", defaultValue = "10") size: Int,
    ) = withTimeout(timeOutMillis) {
        ResponseEntity.ok(bankAccountService.findByBalanceAmount(FindByBalanceRequestDto(min, max, PageRequest.of(page, size))))
            .also { log.info("response: $it") }
    }

    @GetMapping(path = ["all/balance/stream"])
    @Operation(
        method = "getAllByBalanceStream",
        summary = "Find all bank account with given amount range returns stream",
        operationId = "getAllByBalanceStream",
        description = "Find all bank accounts for the given balance range"
    )
    fun getAllByBalanceStream(
        @RequestParam(name = "min", defaultValue = "0") min: BigDecimal,
        @RequestParam(name = "max", defaultValue = "500000000") max: BigDecimal,
        @RequestParam(name = "page", defaultValue = "0") page: Int,
        @RequestParam(name = "size", defaultValue = "10") size: Int,
    ): Flow<SuccessBankAccountResponse> {
        return bankAccountService.findAllByBalanceBetween(FindByBalanceRequestDto(min, max, PageRequest.of(page, size)))
            .map { it -> it.toSuccessHttpResponse().also { log.info("response: $it") } }
    }


    companion object {
        private val log = LoggerFactory.getLogger(BankAccountController::class.java)
        private const val timeOutMillis = 5000L
    }
}

用于与GRPC合作的UI客户量很少,个人喜欢使用 BloomRPC
另一个有用的工具是 grpcurl grpcui

下一步让我们将我们的微服务部署到K8,
在此示例中,我们可以使用简单的多阶段文件来构建Docker映像:

Lens

FROM --platform=linux/arm64 azul/zulu-openjdk-alpine:17 as builder
ARG JAR_FILE=target/KotlinSpringGrpc-0.0.1-SNAPSHOT.jar
COPY ${JAR_FILE} application.jar
RUN java -Djarmode=layertools -jar application.jar extract

FROM azul/zulu-openjdk-alpine:17
COPY --from=builder dependencies/ ./
COPY --from=builder snapshot-dependencies/ ./
COPY --from=builder spring-boot-loader/ ./
COPY --from=builder application/ ./
ENTRYPOINT ["java", "org.springframework.boot.loader.JarLauncher", "-XX:MaxRAMPercentage=75", "-XX:+UseG1GC"]

对于使用K8的工作,喜欢使用 Helm ,微服务部署很简单,并且具有部署本身,服务,配置
和ServiceMonitor。
最后一个是因为监视使用 kube-prometheus-stack helm chart

微服务头盔图yaml文件是:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ .Values.microservice.name }}
  labels:
    app: {{ .Values.microservice.name }}
spec:
  replicas: {{ .Values.microservice.replicas }}
  template:
    metadata:
      name: {{ .Values.microservice.name }}
      labels:
        app: {{ .Values.microservice.name }}
    spec:
      containers:
        - name: {{ .Values.microservice.name }}
          image: {{ .Values.microservice.image }}
          imagePullPolicy: Always
          resources:
            requests:
              memory: {{ .Values.microservice.resources.requests.memory }}
              cpu: {{ .Values.microservice.resources.requests.cpu }}
            limits:
              memory: {{ .Values.microservice.resources.limits.memory }}
              cpu: {{ .Values.microservice.resources.limits.cpu }}
          livenessProbe:
            httpGet:
              port: {{ .Values.microservice.livenessProbe.httpGet.port }}
              path: {{ .Values.microservice.livenessProbe.httpGet.path }}
            initialDelaySeconds: {{ .Values.microservice.livenessProbe.initialDelaySeconds }}
            periodSeconds: {{ .Values.microservice.livenessProbe.periodSeconds }}
          readinessProbe:
            httpGet:
              port: {{ .Values.microservice.readinessProbe.httpGet.port }}
              path: {{ .Values.microservice.readinessProbe.httpGet.path }}
            initialDelaySeconds: {{ .Values.microservice.readinessProbe.initialDelaySeconds }}
            periodSeconds: {{ .Values.microservice.readinessProbe.periodSeconds }}
          ports:
            - containerPort: {{ .Values.microservice.ports.http.containerPort }}
              name: {{ .Values.microservice.ports.http.name }}
            - containerPort: {{ .Values.microservice.ports.grpc.containerPort}}
              name: {{ .Values.microservice.ports.grpc.name }}
          env:
            - name: SPRING_APPLICATION_NAME
              value: microservice_k8s
            - name: JAVA_OPTS
              value: "-XX:+UseG1GC -XX:MaxRAMPercentage=75"
            - name: SERVER_PORT
              valueFrom:
                configMapKeyRef:
                  key: server_port
                  name: {{ .Values.microservice.name }}-config-map
            - name: GRPC_SERVER_PORT
              valueFrom:
                configMapKeyRef:
                  key: grpc_server_port
                  name: {{ .Values.microservice.name }}-config-map
            - name: SPRING_ZIPKIN_BASE_URL
              valueFrom:
                configMapKeyRef:
                  key: zipkin_base_url
                  name: {{ .Values.microservice.name }}-config-map
            - name: SPRING_R2DBC_URL
              valueFrom:
                configMapKeyRef:
                  key: r2dbc_url
                  name: {{ .Values.microservice.name }}-config-map
            - name: SPRING_FLYWAY_URL
              valueFrom:
                configMapKeyRef:
                  key: flyway_url
                  name: {{ .Values.microservice.name }}-config-map
      restartPolicy: Always
      terminationGracePeriodSeconds: {{ .Values.microservice.terminationGracePeriodSeconds }}
  selector:
    matchLabels:
      app: {{ .Values.microservice.name }}

---

apiVersion: v1
kind: Service
metadata:
  name: {{ .Values.microservice.name }}-service
  labels:
    app: {{ .Values.microservice.name }}
spec:
  selector:
    app: {{ .Values.microservice.name }}
  ports:
    - port: {{ .Values.microservice.service.httpPort }}
      name: http
      protocol: TCP
      targetPort: http
    - port: {{ .Values.microservice.service.grpcPort }}
      name: grpc
      protocol: TCP
      targetPort: grpc
  type: ClusterIP

---

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  labels:
    release: monitoring
  name: {{ .Values.microservice.name }}-service-monitor
  namespace: default
spec:
  selector:
    matchLabels:
      app: {{ .Values.microservice.name }}
  endpoints:
    - interval: 10s
      port: http
      path: /actuator/prometheus
  namespaceSelector:
    matchNames:
      - default

---

apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ .Values.microservice.name }}-config-map
data:
  server_port: "8080"
  grpc_server_port: "8000"
  zipkin_base_url: zipkin:9411
  r2dbc_url: "r2dbc:postgresql://postgres:5432/bank_accounts"
  flyway_url: "jdbc:postgresql://postgres:5432/bank_accounts"

values.yaml 文件:

microservice:
  name: kotlin-spring-microservice
  image: alexanderbryksin/kotlin_spring_grpc_microservice:latest
  replicas: 1
  livenessProbe:
    httpGet:
      port: 8080
      path: /actuator/health/liveness
    initialDelaySeconds: 60
    periodSeconds: 5
  readinessProbe:
    httpGet:
      port: 8080
      path: /actuator/health/readiness
    initialDelaySeconds: 60
    periodSeconds: 5
  ports:
    http:
      name: http
      containerPort: 8080
    grpc:
      name: grpc
      containerPort: 8000
  terminationGracePeriodSeconds: 20
  service:
    httpPort: 8080
    grpcPort: 8000
  resources:
    requests:
      memory: '6000Mi'
      cpu: "3000m"
    limits:
      memory: '6000Mi'
      cpu: "3000m"

Lens

作为用于使用K8的UI工具,个人喜欢使用 Lens

您可以找到 GitHub repository here 的更多详细信息和源代码,
当然,总是在现实世界中,业务逻辑和基础架构代码要复杂得多,我们必须实施更多必要的功能。
我希望这篇文章有用和有益,并乐于收到任何反馈或问题,请随意 contact me email 或任何< strong> messengers :)