在变更流成为MongoDB的功能之前,他们想跟踪数据库中的实时更改以监视OPLOG条目并根据时间戳跟踪特定集合中的更改。这个过程通常很复杂,恢复和恢复阅读所需的机制并不是特别安全。
更改流允许应用程序实时直接接口来处理数据库集合的更改,并具有自定义事件驱动的体系结构的强大功能。
在配置了复制品集环境时,在MongoDB中可用更改流。他们依靠对成员的变化,因为他们就大多数成员的特定变化达成共识。这确保了特定集合中数据的安全性,尤其是在可能发生故障的情况下。
要在本地开发环境中使用变更流,我们使用Docker Compose配置了复制品集。有关此设置的详细说明,您可以参考@sibelius撰写的guide。我们已经投入了大量努力,以确保所有开发人员都可以轻松建立本地环境,并与MongoDB提供的全部功能一起工作。
设置
在Woovi上,我们在名为Publisher的特定服务中使用变更流。该服务是实例化数据库中每个集合的订户的应用程序。
import { companySubscriber } from './CompanySubscriber';
import { customerSubscriber } from './CustomerSubscriber';
import { userSubscriber } from './UserSubscriber';
export const setupSubscribers = () => {
companySubscriber();
userSubscriber();
customerSubscriber();
};
事件
订户的作用是监视集合并生成数据事件。然后将此事件传递给处理程序,该处理程序根据我们的特定需求处理数据。
export const userSubscriber = async () => {
const stream = User.watch([], { fullDocument: 'updateLookup' });
stream.on('change', (data) => handleUserSubscriberEvent(data));
};
根据特定集合上发生的事件,流量输出更改。 MongoDB有许多变化事件,如documentation所示。
流管线接受多个选项,fullDocument: 'updateLookup'
使事件更新发送完整文档,而不仅仅是默认情况下的更新字段,每个事件都有一个特定的流有效负载,您可以在应用程序上使用基础,并且可以根据需要进行配置来量身定制。
第一个参数是一个管道选项的数组,您可以通过以修改流量输出。
const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const stream = User.watch(pipeline, { fullDocument: 'updateLookup' });
stream.on('change', (data) => handleUserSubscriberEvent(data));
});
用例
最后,在我们的handeruserSubscriberevent函数中,我们利用数据对象在应用程序环境中驱动任何事件驱动的服务。在我们的具体情况下,我们使用它来在我们的弹性搜索服务中创建和更新索引,这是我们内部搜索工具背后的核心技术。
export const handleUserSubscriberEvent = (data) => {
const dataPicked = {
_id: data.fullDocument._id,
name: data.fullDocument.name,
email: data.fullDocument.email,
cellphone: data.fullDocument.cellphone,
taxID: data.fullDocument.taxID,
};
const obj = {
data: dataPicked,
};
handleDocumentIndexing(ELASTICSEARCH_INDEXES.USER, obj);
您可以将Change流用作强大的引擎来进行事件驱动的应用程序,例如:
- 分析处理
- 通知
- 与MQTT集成的物联网
MongoDB变更流的性能使Woovi可以尽可能快,安全地扩展更多事件驱动的产品。
Woovi是一家创业公司,使购物者能够按照自己的喜好付款。为了实现这一目标,Woovi为商人提供即时付款解决方案接受订单。
如果您想与我们合作,我们是hiring!