在GO第2部分中构建DynamoDB事件商店:附加和查询
#aws #测试 #go #eventdriven

Part One中,我们介绍了事件商店,DynamoDB和测试驱动开发的基础知识。现在,我们将建立我们的活动商店及其最基本的功能。

Setup

首先,我们需要create an AWS Accountget your access key(为了更好的安全,请遵循AWS的建议和set up AWS credentials in your local environment)。

将访问键存储在您的.gitignore中包含的.env文件中。

接下来,我们将设置AWS配置并为我们的活动商店创建测试。

注意:我正在使用testify repo中的requireassert包。)

var EventStoreTable = "event-store"

func TestEventStore(t *testing.T) {
    //Create Dynamodb Client
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    cfg, err := config.LoadDefaultConfig(ctx,
        config.WithRegion(os.Getenv("AWS_REGION")),
        config.WithEndpointResolverWithOptions(
            aws.EndpointResolverWithOptionsFunc(
                func(service, region string, options ...interface{}) (aws.Endpoint, error) {
                    return aws.Endpoint{}, &aws.EndpointNotFoundError{}
                },
            ),
        ),
        config.WithCredentialsProvider(
            credentials.StaticCredentialsProvider{
                Value: aws.Credentials{
                    AccessKeyID:     os.Getenv("AWS_ACCESS_ID"),
                    SecretAccessKey: os.Getenv("AWS_SECRET_KEY"),
                },
            },
        ),
    )
    require.Nil(t, err)

    //Create Event Store
    client := dynamodb.NewFromConfig(cfg, EventStoreTable)
    es := store.New(client)
    require.NotNil(t, es)
}

一旦我们进行了失败的测试,我们就可以编写进行此通行证所需的代码,在这种情况下,Event Store类型以及功能构造函数:

type EventStore struct {
    DB    *dynamodb.Client
    Table string
}

func New(db *dynamodb.Client, table string) *EventStore {
    return &EventStore{DB: db, Table: table}
}

不同的环境可能具有不同的DynamOndB客户端和表名。为了处理此问题,我们将事件存储配置为将表名称和DynamoDB客户端作为参数。

事件

我们描述了第1部分中想要的事件的形状引起了这种变化。

type Event struct {
    Id                      string
    Version                 int
    CharacterName           string
    CharacterHitPointChange int
    Note                    string
}

我们活动商店的两个基本方法是AppendQuery。我们将新事件附加到活动商店,并从活动商店查询这些事件。

append

我们首先为活动商店的Append方法编写测试。基于DynamoDB的PutItem函数definition,我们的Append将需要与我们要附加的事件一起传递context.Context。该测试还将为我们提供三个事件,以查询以后。

//...
id := uuid.NewString()
events := []store.Event{
    {
        Id:                      id,
        Version:                 0,
        CharacterName:           "cpustejovsky",
        CharacterHitPointChange: 8,
        Note:                    "Init",
    },
    {
        Id:                      id,
        Version:                 1,
        CharacterName:           "cpustejovsky",
        CharacterHitPointChange: -2,
        Note:                    "Slashing damage from goblin",
    },
    {
        Id:                      id,
        Version:                 2,
        CharacterName:           "cpustejovsky",
        CharacterHitPointChange: -3,
        Note:                    "bludgeoning damage from bugbear",
    },
}
t.Run("Append Items to Event Store", func(t *testing.T) {
    for _, event := range events {
        err := es.Append(context.Background(), &event)
        if err != nil {
            t.Fatal(err)    
        }
    }
})

现在,我们编写了一种满足我们测试的附加方法:

func (es *EventStore) Append(ctx context.Context, event Event) error {
    input := &dynamodb.PutItemInput{
        TableName: &es.Table,
        Item: map[string]types.AttributeValue{
            "Id": &types.AttributeValueMemberS{Value: event.Id},
            //AttributeValueMemberN takes a string value, see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html
            "Version":                 &types.AttributeValueMemberN{Value: strconv.Itoa(event.Version)},
            "CharacterName":           &types.AttributeValueMemberS{Value: event.CharacterName},
            "CharacterHitPointChange": &types.AttributeValueMemberN{Value: strconv.Itoa(event.CharacterHitPointChange)},
            "Note":                    &types.AttributeValueMemberS{Value: event.Note},
        },
    }
    _, err := es.DB.PutItem(ctx, input)
    if err != nil {
        return err
    }
    return nil
}

我们的测试正在通过,但是此Append方法对我们的活动商店不起作用。我们需要拥有optimistic concurrency来确保不能覆盖事件。就目前而言,新版本0事件将覆盖现有事件。

让我们首先编写测试:

t.Run("Attempt to append existing version to event store and fail", func(t *testing.T) {
    e := store.Event{
        Id:      id,
        Version: 0,
    }
    err := es.Append(context.Background(), e)
    assert.NotNil(t, err)
})

此测试将使用我们当前的附录方法失败,因此我们需要在DynamoDB输入中添加条件:

cond := "attribute_not_exists(Version)"
input := &dynamodb.PutItemInput{
    TableName: &es.Table,
    Item: map[string]types.AttributeValue{
        "Id": &types.AttributeValueMemberS{Value: event.Id},
        "Version":            &types.AttributeValueMemberN{Value: strconv.Itoa(event.Version)},
        "CharacterName":      &types.AttributeValueMemberS{Value: event.Character.Name},
        "CharacterHitPoints": &types.AttributeValueMemberN{Value: strconv.Itoa(event.Character.HitPoints)},
        "Note":               &types.AttributeValueMemberS{Value: event.Note},
},
    ConditionExpression: &cond,
}

现在,我们对Append失败的测试正在通过,但是我们的附录测试正在失败。那是因为这三个事件已经将其保存到数据库中。为了清理该问题,我们可以在测试的底部编写t.Cleanup功能,以删除这些记录并在测试之间为我们重置状态。

//...
    t.Cleanup(func() {
        for i := 0; i < len(events); i++ {
            params := dynamodb.DeleteItemInput{
                TableName: &EventStoreTable,
                Key: map[string]types.AttributeValue{
                    "Id":      &types.AttributeValueMemberS{Value: id},
                    "Version": &types.AttributeValueMemberN{Value: strconv.Itoa(i)},
                },
            }
            _, err := client.DeleteItem(context.Background(), &params)
            if err != nil {
                t.Log("Error deleting items for cleanup:\t", err)
            }
        }
    })
}

现在我们的所有测试都通过了,但是我们仍然有问题。我们的用户将如何区分失败?我们的Append方法可能会失败,因为事件版本已经存在。如果DynamoDB存在内部问题。

也可能会失败。

为了提供清晰度,如果条件失败,我们可以创建一个哨兵错误并为此进行测试检查。首先,我们可以创建错误:

type EventAlreadyExistsError struct {
    ID      string ``
    Version int
}

func (e *EventAlreadyExistsError) Error() string {
    return fmt.Sprintf("event already exists for ID %s and Version %d", e.ID, e.Version)
}

然后我们将其添加到我们的测试中:

t.Run("Attempt to append existing version to event store and fail", func(t *testing.T) {
    e := store.Event{
        Id:      id,
        Version: 0,
    }
    err := es.Append(context.Background(), e)
    assert.NotNil(t, err)
    checkErr := &store.EventAlreadyExistsError{}
    assert.True(t, errors.As(err, &checkErr))
})

,我们的测试现在正在失败。为了让他们通过,我们需要在我们的Append方法中添加以下错误处理:

if err != nil {
    //Using the errors package, the code checks if this is an error specific to the condition being failed and, if so, returns a sentinel error that can be checked
    var errCheck *types.ConditionalCheckFailedException
    if errors.As(err, &errCheck) {
        return &EventAlreadyExistsError{
            ID:      event.Id,
            Version: event.Version,
        }
    }
    return err
}

现在我们的测试再次通过。我们创建了一半的基本功能并可以继续前进。

Query

由于我们在活动商店中设置了三个事件的状态,因此我们可以为活动商店编写测试以查询它们。我们知道它需要将事件ID用于查询,并且像Append一样,它需要一个context.Context来满足DynamoDB API:

t.Run("Query Items from Event Store", func(t *testing.T) {  
   events, err := es.Query(ctx, id)  
   assert.Nil(t, err)  
   assert.Equal(t, numberOfEvents, len(events))  
   for _, event := range events {
       assert.Contains(t, queriedEvents, event)
   }
})

我们的测试失败了,我们可以开始努力使它们通过:

// Query takes a context and DynamoDB query parameters and returns a slice of Events and an errorfunc (es *EventStore) Query(ctx context.Context, id string) ([]Event, error) {  
   var events []Event  
   kce := "Id = :uuid"  
   params := &dynamodb.QueryInput{  
      TableName:              &es.Table,  
      KeyConditionExpression: &kce,  
      ExpressionAttributeValues: map[string]types.AttributeValue{  
         ":uuid": &types.AttributeValueMemberS{Value: id},  
      },  
   }  
   // Query paginator provides pagination for queries until there are no more pages for DynamoDB to go through  
   // See: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.Pagination.htm   p := dynamodb.NewQueryPaginator(es.DB, params)  
   for p.HasMorePages() {  
      out, err := p.NextPage(ctx)  
      if err != nil {  
         return nil, err  
      }  
      // The output is unmarshalled into an Event slice which is appended to the events slice  
      err = attributevalue.UnmarshalListOfMaps(out.Items, &events)  
      if err != nil {  
         return nil, err  
      }  
   }  
   // If the slice is empty, then error is returned  
   if len(events) < 1 {  
      return nil, errors.New("no events found")  
   }  
   return events, nil  
}

此代码大部分是针对DynamoDB的。特别是,pagination对于任何查询或扫描都是必不可少的。他们确保DynamoDB如果有更多的数据返回的数据将返回超过1 MB的数据。

在高级别,我们的活动商店的Query方法是:

  • 查询基础数据库
  • 将其项目分解为Event
  • 确保至少返回一个Event

类似于Append,我们可以在此处添加一个哨兵错误,以帮助用户区分未找到事件和内部DynamoDB错误。

首先,写测试:

t.Run("Query returns specific error if no Event is found", func(t *testing.T) {
    _, err := es.Query(ctx, uuid.NewString())
    assert.NotNil(t, err)
    checkErr := &store.NoEventFoundError{}
    assert.True(t, errors.As(err, &checkErr))
})

要获得此测试,我们将创建错误并用它替换当前的errrors.New()值:

type NoEventFoundError struct{}

func (e *NoEventFoundError) Error() string {
    return "no event found"
}
//...
if len(events) < 1 {
    return nil, &NoEventFoundError{}
}

我们创建了事件商店的基本功能。要查看接下来会发生什么,让我们记录我们查询的输出:

store.Event{Id:"58f02691-78ed-4ca5-8e59-8f4deb44e063", Version:0, CharacterName:"cpustejovsky", CharacterHitPointChange:8, Note:"Init"}
store.Event{Id:"58f02691-78ed-4ca5-8e59-8f4deb44e063", Version:1, CharacterName:"cpustejovsky", CharacterHitPointChange:-2, Note:"Slashing damage from goblin"}
store.Event{Id:"58f02691-78ed-4ca5-8e59-8f4deb44e063", Version:2, CharacterName:"cpustejovsky", CharacterHitPointChange:-3, Note:"bludgeoning damage from bugbear"}

这不是有帮助的,即使有帮助。我们可以将8,-2和-3手动添加在一起,得出结论,“ Cpustejovsky”的角色处于3个生命值。即使我们有代码为我们做到这一点,在播放的D&D会议越多的情况下,查询所有事件也越来越长。

当我们解决重播和快照时,我们将在下一部分中解决这两个问题。

有任何问题或评论吗?关于活动商店或DynamoDB,我错过了什么或错了吗?让我在评论中知道或在Twitter或Gopher的懈怠中与我联系。