介绍:
在数据库迁移的领域中,即使是过渡序列发生器(例如,序列发生器)也会提出独特的挑战。在本文中,我们将探讨在.NET环境中将序列发生器从DynamoDB迁移到MySQL的复杂性。加入我们,深入研究这一迁移的细节,发现我们采用的解决方案以确保无缝过渡。
语境:
在最近的数据库迁移项目中,我们遇到了将序列发生器从dynamoDB迁移到mySQL的需要。尽管这似乎是一项简单的任务,但它带来了意想不到的复杂性。值得注意的是,MySQL不本地支持序列,因此需要实现自定义解决方案。
挑战:
当我们开始这种迁移时,我们意识到MySQL缺乏对序列的内置支持,需要一种周到的方法。我们需要设计一种自定义解决方案,以使我们能够维护序列发生器的完整性,同时利用MySQL的功能。
利用Dapper进行MySQL集成:
为了应对这一挑战,我们转向了Dapper,这是一种轻巧,有效的对象相关映射(ORM)工具。通过使用Dapper,我们可以将序列发生器与MySQL无缝集成,从而弥合现有代码库和新数据库的唯一要求之间的差距。
在考虑序列生成器迁移的选项时,我们最初探讨了将RAW SQL执行与实体框架使用的可能性。但是,我们遇到了一个障碍,因为我们的应用程序仍在EF Core 6上运行,不幸的是,当时该功能不支持此功能。这迫使我们找到了一种替代解决方案,该解决方案将使我们能够在技术堆栈的限制内实现目标。
解决方案
dynamodb中的序列发生器:
public class SequenceGenerator : ISequenceGenerator
{
private static readonly SemaphoreSlim Semaphore = new(1, 1);
private readonly DynamoDbContext _dbContext;
public SequenceGenerator(DynamoDbContext dbContext)
{
_dbContext = dbContext;
}
public Task<int> GetNextAsync(SequenceType sequenceType)
{
return Policy<int>.Handle<ConditionalCheckFailedException>().WaitAndRetryAsync(
3,
retryAttempt => TimeSpan.FromMilliseconds(50) * retryAttempt
).ExecuteAsync(() => GetNextNumberFromSequenceAsync(sequenceType));
}
private async Task<int> GetNextNumberFromSequenceAsync(SequenceType sequenceType)
{
var sequenceKey = Sequence.GetKey(sequenceType);
int result;
try
{
await Semaphore.WaitAsync();
var sequence = await _dbContext.GetItemAsync<Sequence>(sequenceKey, sequenceKey);
if (sequence is null)
{
sequence = Sequence.Init(sequenceType);
result = sequence.Get();
await _dbContext.PutItemAsync(sequence);
}
else
{
var condition = Condition<Sequence>.On(x => x.CurrentValue).EqualTo(sequence.CurrentValue);
result = sequence.Get();
await _dbContext.UpdateItem<Sequence>()
.WithPrimaryKey(sequenceKey, sequenceKey)
.WithCondition(condition)
.On(x => x.CurrentValue).Assign(result)
.On(x => x.LastUsed).Assign(sequence.LastUsed)
.ExecuteAsync();
}
}
finally
{
Semaphore.Release();
}
return result;
}
}
您可能会看到,代码非常复杂。尽管如此,它正在完成工作!
MySQL中的序列发生器:
public class SequenceGenerator : ISequenceGenerator
{
private readonly IDbConnection _dbConnection;
public SequenceGenerator(DatabaseConfiguration databaseConfiguration)
{
_dbConnection = new MySqlConnection(databaseConfiguration.UserVariablesConnectionString);
}
public async Task<int> GetNextAsync(SequenceType sequenceType)
{
var current = await _dbConnection.ExecuteScalarAsync<int>(
$@"
UPDATE Sequences SET Value = (@next := Value + 1) WHERE SequenceType = {(int)sequenceType};
SELECT @next;"
);
return current;
}
}
值得注意的是,我们需要创建新的数据库表并支持新连接字符串,这使我们可以使用用户定义的变量:
public class DatabaseConfiguration
{
public string ConnectionString => "";
public string UserVariablesConnectionString => $"{ConnectionString};Allow User Variables=True";
}
public class Sequence
{
public Sequence(int value, SequenceType sequenceType)
{
Value = value;
SequenceType = sequenceType;
}
public int Value { get; private set; }
public SequenceType SequenceType { get; }
}
public class SequenceEntityConfiguration : IEntityTypeConfiguration<Sequence>
{
public void Configure(EntityTypeBuilder<Sequence> builder)
{
builder.ToTable("Sequences");
builder.HasKey(x => x.SequenceType);
builder.Property(x => x.Value);
builder.HasData(
new Sequence(100, SequenceType.SequenceA), // start from 100
new Sequence(200, SequenceType.SequenceB) // start from 200
);
}
}
测试序列发生器:
为了验证序列发生器的成功迁移和功能,我们进行了两个关键测试:
顺序呼叫:
public class SequenceGeneratorTests : BaseDatabaseTests
{
private readonly ISequenceGenerator _sut;
public SequenceGeneratorTests()
{
_sut = new SequenceGenerator(DatabaseConfiguration);
}
[Fact]
public async Task GetNextAsync_GivenTwoSequenceCalls_GivesIncreasedValues()
{
//When
var firstNumber = await _sut.GetNextAsync(SequenceType.Submerchant);
var secondNumber = await _sut.GetNextAsync(SequenceType.Submerchant);
//Then
secondNumber.Should().BeGreaterThan(firstNumber);
}
}
并行呼叫:
public class SequenceGeneratorTests : BaseDatabaseTests
{
private readonly ISequenceGenerator _sut;
public SequenceGeneratorTests()
{
_sut = new SequenceGenerator(DatabaseConfiguration);
}
[Fact]
public async Task GetNextAsync_GivenFiveParallelCallsFromNewInstances_GivesIncreasedValues()
{
// Given
var tasks = Enumerable.Repeat(5, 5).Select(
_ => new SequenceGenerator(DatabaseConfiguration).GetNextAsync(SequenceType.Submerchant)
);
//When
var result = await Task.WhenAll(tasks);
//Then
result.Should().OnlyHaveUniqueItems();
}
}
结论:
将序列发生器从DynamoDB迁移到MySQL似乎是较大数据库迁移项目的一个小方面。但是,MySQL缺乏内置序列支持带来的挑战需要仔细考虑并实施自定义解决方案。通过利用Dapper和我们的专业知识,我们成功地迁移了我们的序列发生器,同时保持可靠性并确保准确的ID生成。