开始进行活动采购和Eventercing.backbone
#redis #kafka #eventsourcing #cqrs

开始进行活动采购和Eventercing.backbone

这篇文章是有关事件采购的系列的第一篇,是一个令人兴奋的框架,称为 eventsourcing.backbone

在这篇文章中,我们将探讨事件采购的基本原理和框架的Hello World样本。

了解事件采购

事件采购捕获并坚持状态随着事件的顺序而变化的建筑模式。它提供了可用于在上以任何给定时间点的应用程序重建当前状态的事件的历史日志。这种方法提供了各种好处,例如可唤起性,可伸缩性和建立复杂的工作流程。

事件采购与命令查询责任隔离( cqrs )模式相结合时,提供了更多优势。 cqrs 分开 读取写入应用程序的关注点,使能够针对特定的读取或写入需求进行优化的专用数据库生成。这种关注点的分离允许更敏捷和灵活的数据库架构设计,因为它们不对于提前设置至关重要。

通过利用 eventsourcing.backbone ,开发人员可以将事件采购和CQR一起实现,从而实现了一个强大的体系结构,可促进可扩展性,灵活性和可维护性。

您可以检查this article,以了解有关事件采购的更多信息。

引入EventSourcing.backbone

eventsourcing.backbone 的一个值得注意的方面是其独特的方法用于事件采购,而不是发明新事件源数据库, eventsourcing.backbone 利用现有消息流的组合,例如kafka,redis流或类似技术,以及键值数据库或诸如redis之类的服务。

此体系结构可以带来一些好处。消息流虽然非常适合处理事件序列和确保可靠的消息传递,但对于繁重的有效载荷可能不是最佳的。通过将键值数据库与消息流相结合, eventsourcing.backbone 允许将消息有效负载存储在键值数据库中,而流则保存序列和元数据。这种方法提高性能,并通过允许将消息的个人数据方面分配到不同的键中,从而促进 gdpr 标准。

值得注意的是, eventsourcing.backbone 当前为.NET生态系统提供了SDK。尽管该框架可能将来扩展到其他编程语言和框架,但目前,它专门针对.NET平台。

利用现有的基础架构

eventsourcing.backbone 的主要优点之一是它与广泛采用的消息流平台(如 kafka或redis stream )的兼容性。这些平台提供了强大的消息传递保证和高吞吐量,使其非常适合大规模处理事件流。

此外, eventsourcing.backbone 与流行的键值数据库(例如 redis,couchbase或amazon dynamodb )无缝集成。这种集成使开发人员能够利用这些数据库的优势来有效地存储和检索与事件有关的辅助数据。

幕后

eventsourcing.backbone 正在采用有些不同的方法。它通过接口定义消息合同(而不是数据类)。这种方法对版本作业很友好,并且更容易处理多种消息类型。它还可以通过将个人数据隔离为不同的方法参数来帮助GDPR。简而言之,以对开发人员透明的方式构建每个方法参数的消息类型。

每个生产者方法 call get 序列化作为键值基于方法参数(默认情况下)和存储的键值存储中(在我们的情况下是Redis Hash)。然后,消息的元数据被推入 stream (在我们的情况下redis流)。

消费者 重建 /strong>直接进入订户接口。

该框架能够处理不同的消息模式和不同的策略,因此将在以后的帖子中发布更多。

让我们品尝一下。

[Taken by Asaf Cohen](https://www.carmel-law.co.il/)

先决条件
为了运行样本,您需要

重新启动并运行:

docker run -p 6379:6379 -it --rm --name redis-event-source-sample redislabs/rejson:latest

创建一个使用3个项目(生产者,消费者,常见抽象)的解决方案。或克隆以下存储库作为起点:

git clone https://github.com/bnayae/HelloEventSourcing

定义诸如普通合约之类的相互利益代码:

将以下Nuget软件包添加到公共抽象的项目中:

第一步是定义事件模式
EventSourcing.Backbone正在采用定义事件数据结构的接口的方法。

将以下代码放入事件抽象项目。

using EventSourcing.Backbone;
namespace EventSourcing.Demo;

[EventsContract(EventsContractType.Producer)]
[EventsContract(EventsContractType.Consumer)]
public interface IShipmentTracking
{
    ValueTask OrderPlacedAsync(User user, Product product, DateTimeOffset time);
    ValueTask PackingAsync(string email, int productId, DateTimeOffset time);
    ValueTask OnDeliveryAsync(string email, int productId, DateTimeOffset time);
    ValueTask OnReceivedAsync(string email, int productId, DateTimeOffset time);
}

eventscontract 属性是魔术开始的地方。
通过用 event Contart _a 编译源源生成器将接口进行装饰,将生成生成和消费事件所需的代码(您不必编写样板代码)。

因此,将生成生产者的_ishipmentTrackingProducer
接口的代码,并为消费者提供

注意:所有方法均应返回 valueTask (这是约定),但是 void _ Will _ WILL会很好,但会导致_valuetask _on _on生成版本的接口(生产者/消费者)。此外,所有生成的方法都将具有_async 后缀。

创建生产者:

添加以下nuget用于使用 redis的生产者(使用redis stream + redis hash)。

using EventSourcing.Backbone;
using EventSourcing.Demo;

IShipmentTrackingProducer producer = RedisProducerBuilder.Create()
                                    .Uri("hello.event-sourcing")
                                    .BuildShipmentTrackingProducer();

User user = new(1, "someone@gmail.com", "someone");
var product = new Product(1234, "Something you must have", 10000);
await producer.OrderPlacedAsync(user, product, DateTimeOffset.UtcNow);

这就是您需要的生产者。

看一下 buildshiptrackingproducer ,这是从 isHipmentTracking 界面生成的扩展方法,并返回 iShipmentTrackingProducer

创建消费者:

添加以下nuget用于使用 redis的消费者(使用redis stream + redis hash)。

添加一个实现消费者(生成)接口的类:

using EventSourcing.Demo;

class Subscription : IShipmentTrackingConsumer
{
    public static readonly Subscription Instance = new Subscription();

    public ValueTask OrderPlacedAsync(
                        User user, Product product, DateTimeOffset time)
    {
        Console.WriteLine($"Order Placed: {user.name}, {product.name}, {product.id}");
            return ValueTask.CompletedTask;
    }

    public ValueTask PackingAsync(string email, int productId, DateTimeOffset time)
    {
        Console.WriteLine($"Packing: {email}, {productId}, {time}");
        return ValueTask.CompletedTask;
    }

    public ValueTask OnDeliveryAsync(string email, int productId, DateTimeOffset time)
    {
        Console.WriteLine($"On Delivery: {email}, {productId}, {time}");
        return ValueTask.CompletedTask;    
    }

    public ValueTask OnReceivedAsync(string email, int productId, DateTimeOffset time)
    {
        Console.WriteLine($"On Received: {email}, {productId}, {time}");
        return ValueTask.CompletedTask;   
    }
}

最后一步是将此类附加到事件流。

using EventSourcing.Demo;
using EventSourcing.Backbone;

IConsumerLifetime subscription = RedisConsumerBuilder.Create()
                                          .Uri(URIs.Default)
                                          .SubscribeShipmentTrackingConsumer(Subscription.Instance);

await subscription.Completion;

如果您还记得制片人正在发送 orderplacedasync 的单个事件,
为了完成图片,我们将添加消费者和生产商的组合,这将改变沿运输流的状态。

创建一个新项目,并同时添加 redis的生产者和消费者nugets:

添加 isHipmentTrackingConsumer 的实现。

using EventSourcing.Backbone;
using EventSourcing.Demo;

class Subscription : IShipmentTrackingConsumer
{
    public static readonly Subscription Instance = new Subscription();

    private readonly IShipmentTrackingProducer _producer = RedisProducerBuilder.Create()
                                .Uri(URIs.Default)
                                .BuildShipmentTrackingProducer();


    public async ValueTask OrderPlacedAsync(User user, Product product, DateTimeOffset time)
    {
        await _producer.PackingAsync(user.email, product.id, DateTimeOffset.Now);
    }

    public async ValueTask PackingAsync(string email, int productId, DateTimeOffset time)
    {
        await _producer.OnDeliveryAsync(email, productId, DateTimeOffset.Now);
    }

    public async ValueTask OnDeliveryAsync(string email, int productId, DateTimeOffset time)
    {
        await _producer.OnReceivedAsync(email, productId, DateTimeOffset.Now);
    }

    public ValueTask OnReceivedAsync(string email, int productId, DateTimeOffset time)
    {
            return ValueTask.CompletedTask;
    }
}

这样,我们可以处理国家的过渡。

最后一步是将其插入流中:

using EventSourcing.Demo;
using EventSourcing.Backbone;

Console.WriteLine("Consuming and Producing Events");

IConsumerLifetime subscription = RedisConsumerBuilder.Create()
                                                .Uri(URIs.Default)
                                                .Group("transit")
                                                .SubscribeShipmentTrackingConsumer(Subscription.Instance);

await subscription.Completion;

注意 group(transitâ)这对于分开与报告当前状态的国家过渡的消费者组很重要。

消费者组在多个消费者之间分配处理消息的工作量,允许并行处理,同时确保每条消息仅处理一次。这样可以确保处理大量数据或高通量消息流时有效且可靠的消息处理,可扩展性和容错性。

将两个消费者放在同一消费者组(或在默认一个消费者组下)将导致传输处理或记者处理的消息并导致一团糟。

完整代码可在以下方面可用:
https://github.com/bnayae/HelloEventSourcing/tree/HelloWorld
请注意,它在 helloworld branch

结论

在此介绍性文章中,我们探索了事件采购的概念,并介绍了 Eventsourcing.backbone.backbone 的独特方面。该框架通过利用消息流和键值数据库的结合来脱颖而出,以实现更好的性能,支持GDPR标准并为事件采购实现提供灵活性。

eventsourcing.backbone 与流行的消息流平台和键值数据库无缝集成,使开发人员能够利用其优势来扩展和高效的事件采购。

与CQR结合使用时,事件采购可以增强数据库架构设计,使其更敏捷和灵活,并启用针对特定需求进行优化的专用数据库的生成。

在以后的帖子中,我们会钻探到更高级的模式,关注点,利弊以及最佳实践。