Kuflow如何为我们的流程作为Worfkows引擎的时间支持?
#编程 #showdev #java #temporal

在这样一个多元化的世界中,有一种做事的方式将是无聊的。这就是为什么在Kuflow上我们支持实施流程和任务逻辑的不同方法。在这篇文章中,我们将谈论其中一个,即通过Temporal的编排,这为我们提供了一种管理工作流程的强大方法。

我们不会详细介绍我们平台中处理的概念,因为您可以咨询documentation或其他blog entries,但是作为一般的想法,我们可以遵循以下内容。 Kuflow是一个平台,可让您建模涉及一系列任务以实现其成就的业务流程。这些任务可以由人类或系统执行,并且基本上是执行操作和/或收集信息的活动。我们还需要建立一种方法来控制这些任务的执行,它们的顺序,异步或非同步性质或它们从错误中恢复的能力。换句话说,我们需要一个工作流程来协调我们的流程。在Kuflow,我们称​​ Workflow引擎用于我们支持的不同类型的实现。在这种情况下,时间是工作流引擎

为什么要在kuflow过程中使用时间?

使用时间为我们带来了一些好处,主要的是能够以舒适和可靠的方式实施运营的弹性。例如,让我们想象一下,我们的一个过程的任务可以呼吁第三方API。如果此API在呼叫时没有可用,则任务失败,我们的过程可能会被阻止。在这种情况下,我们希望我们的流程的工作流程以后重新进行,并且必须忘记这样做所需的所有脚手架。另外,如果我们已经向其他外部API发布了修改请求,我们希望在恢复过程时,这些API的消耗将是确定性的,并且不会失败,因为我们已经消耗了两次。我们通过暂时实施工作流来实现这一切。

时间如何与Kuflow集成?

为了尽可能简化客户的基础架构,在Kuflow中,我们有自己的颞云,避免了客户部署此基础架构的需求。拥有颞云不是我们的核心业务,将来我们还可以提供使用其他云(例如Temporal Cloud或任何其他云)的可能性。

要在使用时间段内实现一个过程中的工作流程,我们需要开发称为Worker的Kuflow中的内容。工人无非就是在客户的服务器上运行并与kuflow API和我们云中可用的时间API进行交互的应用程序。这种方法允许使用混合云体系结构,该架构在访问您要在工人中消耗的不同服务时提供灵活性。因此,例如,部署在客户服务器上的工人将以安全可靠的方式访问客户的私人服务,以提取数据并根据他们的需求做出决定。

为促进该工人的发展,提供了通过时间活动来促进Kuflow API消费的图书馆。在大多数情况下,与Permulal提供的不同SDK一起,工人的建设是儿童的游戏。提供这些库的各种语言经常更新。

安全

对我们来说最重要的方面之一就是与安全有关的一切。因此,除了在客户资源上部署的功能强大的RBAC系统外,我们还使用一系列身份验证机制来访问我们提供的不同API。为了促进对这些API的访问和凭据的管理,Kuflow定义了“应用程序”的概念。根据定义,“应用程序”由凭证和访问证书组成。反过来,这些申请被用作候选人或“校长”,以授予您想要的资源的角色和权限。

时间API如何实现身份验证和授权?

示意性,为了进行身份验证,我们将使用相互的TLS并获得授权,我们将使用JWT携带者令牌。从Kuflow应用程序中获得证书和凭据,以简单的方式完成,而无需手动创建和上传证书。使用这些数据,我们现在可以将工人连接到Kuflow Cloud。身份验证机制遵循以下流程:一方面,有必要获得一个允许我们在时间上授权的JWT携带者令牌,因为此请求已发给Kuflow API REST(BASICAUTH已验证),以返回令牌。有了这个令牌,除了使用证书加密通信并使用此API实现身份验证外,还进行了与Temalal API GRPC的通信。

authentication

有关更多信息,请参阅documentation

要处理诸如载体令牌续订等问题,并使开发人员的经验尽可能舒适,我们在我们提供的图书馆中实施了这种机制,作为对工人的支持。在此代码中可以看到从

public class KuFlowAuthorizationTokenSupplier implements AuthorizationTokenSupplier {

    private static final Logger LOGGER = LoggerFactory.getLogger(KuFlowAuthorizationTokenSupplier.class);

    private static final double EXPIRE_PERCENTAGE = 0.1;

    private static final Duration EXPIRE_MAX_DURATION = Duration.ofMinutes(10);

    private final AuthenticationApi authenticationApi;

    private volatile String token;

    private volatile Instant tokenExpireAt;

    public KuFlowAuthorizationTokenSupplier(AuthenticationApi authenticationApi) {
        this.authenticationApi = authenticationApi;
    }

    @Override
    public String supply() {
        String token = this.requestToken();

        return "Bearer " + token;
    }

    private String requestToken() {
        String token = this.token;
        Instant tokenExpireAt = this.tokenExpireAt;
        if (isTokenNonExpired(token, tokenExpireAt)) {
            return token;
        }

        synchronized (this) {
            token = this.token;
            tokenExpireAt = this.tokenExpireAt;
            if (isTokenNonExpired(token, tokenExpireAt)) {
                return token;
            }

            AuthenticationResource authentication = new AuthenticationResource();
            authentication.setType(AuthenticationTypeResource.ENGINE);
            authentication = this.authenticationApi.createAuthentication(authentication);

            Duration expireDuration = Duration.between(Instant.now(), authentication.getExpiredAt());
            expireDuration = Duration.ofSeconds((long) (expireDuration.getSeconds() * EXPIRE_PERCENTAGE));
            if (expireDuration.compareTo(EXPIRE_MAX_DURATION) > 0) {
                expireDuration = EXPIRE_MAX_DURATION;
            }

            this.token = token = authentication.getToken();
            this.tokenExpireAt = tokenExpireAt = Instant.now().plus(expireDuration);

            LOGGER.debug("Regenerated JWT Temporal authorization token. Expired at: {}", tokenExpireAt);

            return token;
        }
    }

    private static boolean isTokenNonExpired(String token, Instant tokenExpireAt) {
        return token != null && tokenExpireAt != null && Instant.now().isBefore(tokenExpireAt);
    }
}

我们在这里所做的是实现临时SDK提供的授权tokensupplier 接口,用于使用我们的kuflow API客户端库在需要时协商一个令牌。以同样的方式,我们实现了 grpcmetadataprovider 接口,目的是添加为metadata,以前的实现提供的令牌:

public class AuthorizationGrpcMetadataProvider implements GrpcMetadataProvider {
  public static final Metadata.Key<String> AUTHORIZATION_HEADER_KEY =
      Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);

  private final AuthorizationTokenSupplier authorizationTokenSupplier;

  public AuthorizationGrpcMetadataProvider(AuthorizationTokenSupplier authorizationTokenSupplier) {
    this.authorizationTokenSupplier = authorizationTokenSupplier;
  }

  @Override
  public Metadata getMetadata() {
    Metadata metadata = new Metadata();
    metadata.put(AUTHORIZATION_HEADER_KEY, authorizationTokenSupplier.supply());
    return metadata;
  }
}

grpc连接到时间的元数据提供商类将是我们以后在工作中配置 workflowservicestub 的元素。

WorkflowServiceStubsOptions.newBuilder().addGrpcMetadataProvider(
    new AuthorizationGrpcMetadataProvider(new KuFlowAuthorizationTokenSupplier(this.authenticationApi))
);

作为成功连接到时间的最后一步,我们需要创建一个SSL上下文,该上下文将在与MutualTLS协商中使用。为此,我们需要扩展SDK的SSLContext类。实现取决于您的需求,但是可以在GitHub上的示例存储库中可用的以下代码段中找到一个示例。

 private SslContext createSslContext() {
        MutualTlsProperties mutualTls = this.applicationProperties.getTemporal().getMutualTls();
        if (StringUtils.isBlank(mutualTls.getCert()) && StringUtils.isBlank(mutualTls.getCertData())) {
            return null;
        }

        if (
            StringUtils.isNotBlank(mutualTls.getCert()) &&
            (StringUtils.isBlank(mutualTls.getKey()) || StringUtils.isBlank(mutualTls.getCa()))
        ) {
            throw new KuFlowEngineClientException("key and ca are required");
        }

        if (
            StringUtils.isNotBlank(mutualTls.getCertData()) &&
            (StringUtils.isBlank(mutualTls.getKeyData()) || StringUtils.isBlank(mutualTls.getCaData()))
        ) {
            throw new KuFlowEngineClientException("keyData or caData are required");
        }

        try (
            InputStream certInputStream = this.openInputStream(mutualTls.getCert(), mutualTls.getCertData());
            InputStream keyInputStream = this.openInputStream(mutualTls.getKey(), mutualTls.getKeyData());
            InputStream caInputStream = this.openInputStream(mutualTls.getCa(), mutualTls.getCaData())
        ) {
            KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
            trustStore.load(null, null);
            X509Certificate certificate = (X509Certificate) CertificateFactory.getInstance("X509").generateCertificate(caInputStream);
            trustStore.setCertificateEntry("temporal-ca", certificate);

            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            trustManagerFactory.init(trustStore);
            TrustManager trustManager = trustManagerFactory.getTrustManagers()[0];

            return SimpleSslContextBuilder.forPKCS8(certInputStream, keyInputStream).setTrustManager(trustManager).build();
        } catch (KeyStoreException | NoSuchAlgorithmException | CertificateException | IOException e) {
            throw new KuFlowEngineClientException("Unable to configure mTLS", e);
        }
    }

最后,像GRPC元数据提供商一样,我们将此SSL上下文添加到我们的WorkflowservicEstubs选项中。

public WorkflowServiceStubs workflowServiceStubs() {
    Builder builder = WorkflowServiceStubsOptions.newBuilder();
    builder.setTarget(this.applicationProperties.getTemporal().getTarget());
    builder.setSslContext(this.createSslContext());
    builder.addGrpcMetadataProvider(
        new AuthorizationGrpcMetadataProvider(new KuFlowAuthorizationTokenSupplier(this.authenticationApi))
    );

    WorkflowServiceStubsOptions options = builder.validateAndBuildWithDefaults();

    return WorkflowServiceStubs.newServiceStubs(options);
}

在这一点上,请记住,在Kuflow应用中配置颞上支持的过程时,可以下载使用这些连接配置机制的即用的工作模板,因此您有一个工作示例只需单击几下。同样,可以在我们的GitHub存储库和我们的documentation中找到更多信息和集成示例。

process-definition

在Kuflow中部署的颞云上的注释

如本文前面所述,提供时间部署的目的是为我们的客户促进集成。在这一点上,重要的是要突出一些问题:我们的时间部署是官方暂时存储库中可用版本的一个稍微修改的版本。基本上,已添加了安全性和授权修改。结果,某些API方法不可用。例如,您将无法使用我们的时间部署来执行未从Kuflow流程启动的工作流,这是避免滥用的必要条件。或以同样的方式,如果不是从Kuflow应用程序中,也无法取消工作流程。与普通的时间安装不同,其他API可能无法使用。但是,所有保证工人正确功能的API均已启用。如果您需要任何当前禁用的API,则可以通过与我们联系来请求它们。

进行其他一些修改允许您从应用程序中知道您的流程当前是否有任何工作人员。

worker-status

以相同的方式,我们尝试保持时间部署尽可能最新,如果您对此有任何疑问,请与我们联系。

结论

在本文中,我们表面上探讨了将时间段用作KUFLOW流程的工作流引擎。该解决方案为我们提供了比我们支持的其他工作流引擎(例如访问私人服务)的许多优势,在实施工人时可以更大的灵活性。以同样的方式,暂时提供的弹性和可伸缩性使其成为当流程高度复杂并需要协调不同外部API时最建议的选择。下表显示了每个支持的工作流程引擎的主要功能的摘要:

workflow-engine-matrix

免费尝试 kuflow 免费,creating your own free account并开始自动化您的公司。

最初出版于KuFlow's Blog - How KuFlow supports Temporal as a worfkows engine for our processes?