MongoDB在Nodejs + Mongoose中实现了视图
#typescript #node #mongodb #mongoose

有什么意见?

实现的视图是包含查询结果的数据库对象。当我们想快速检索重量查询的输出而不为每个请求运行时,它们很有用。

在MongoDB中,实现的视图就像其他任何集合一样。在我们的情况下,我们希望创建实质性的视图,以存储大型活动集合中的多个时间捕捞的分析视图。根据用例,我们可以在每日,每周,每月或任何其他时段基础上更新视图。我们还可以按需触发更新。

本文的工作示例代码可在GitHub上找到。

原始收藏

我们将使用订单的示例集合 - 在线商店中制作的大量订单。这是一个简化的模式。

// Order.ts

const Order = new mongoose.Schema({
  customer: {
    fullName: { type: String, required: true },
    email: { type: String, required: true },
  },
  items: [
    {
      item: {
        name: { type: String, required: true },
        price: { type: Number, required: true },
      },
      quantity: { type: Number, required: true },
    },
  ],
  date: { type: Date, required: true },
});

// This prevents Mongoose from recompiling the model.
export default mongoose.models?.Order || mongoose.model("Order", Order);

物质视图

我们将在不分组的情况下汇总所有订单的每日统计数据。这将减少巨大的活动收集到稳定增长的365个年度收集,这是廉价的查询和存储。

尽管我们从整个订单集合中汇总,但这种观点很昂贵。此处的优化之一可以是仅从创建的订单中汇总比此实体视图中的最后一个条目晚的订单。

很方便存储旁边的物质视图模型以避免单独更新它们的聚合管道。

// DayStats.ts

const DAY_STATS_COLLECTION_NAME = "day_stats";

// Schema of the aggregate documents stored in materialized view.
const DayStats = new mongoose.Schema(
  {
    _id: { type: String, required: true },
    date: { type: Date, required: true },
    revenue: { type: Number, required: true },
    orders: { type: Number, required: true },
    averageOrderRevenue: { type: Number, required: true },
  },
  { collection: DAY_STATS_COLLECTION_NAME, versionKey: false }
);

// Aggregation pipeline for the materialized view.
export async function calculateDayStats() {
  await Order.aggregate([
    // Flattens items array to calculate sums later.
    { $unwind: { path: "$items" } },
    // Actual calculations for totals and averages.
    {
      $group: {
        _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } },
        revenue: {
          $sum: { $multiply: ["$items.quantity", "$items.item.price"] },
        },
        orders: { $sum: 1 },
        averageOrderRevenue: {
          $avg: { $multiply: ["$items.quantity", "$items.item.price"] },
        },
      },
    },
    // Adding date object after grouping for querying 
    // the materialized view in future (to use date comparison).
    {
      $addFields: {
        date: {
          $dateFromString: { dateString: "$_id", format: "%Y-%m-%d" },
        },
      },
    },
    // Rounding calculated values.
    {
      $set: {
        revenue: { $round: ["$revenue", 5] },
        orders: { $round: ["$orders", 5] }, 
        averageOrderRevenue: { $round: ["$averageOrderRevenue", 5] },
      },
    },
    // Saving to materialized view (writing into collection).
    { $merge: { into: DAY_STATS_COLLECTION_NAME, whenMatched: "replace" } },
  ]);
}

// This prevents Mongoose from recompiling the model.
export default mongoose.models?.DayStats ||
  mongoose.model("DayStats", DayStats);

更新实现的视图

现在我们有了实现的视图,我们需要每天更新它。为此,我们将使用Mongo支持的持久性层的Node.js的轻巧工作库Agenda

我们将拥有一个工作脚本,该脚本将设置队列,安排更新并根据该时间表更新实现的视图。

工作脚本需要始终运行 - 它会定期检查计划的作业并在需要时触发它们。

// worker.ts

import * as dotenv from "dotenv";
dotenv.config();

import setupQueue from "./setup-queue";
import scheduleDbViewUpdates from "./db-view-updates";

(async function () {
  const queue = await setupQueue();
  const jobs = await scheduleDbViewUpdates(queue);

  console.info("Jobs worker started");
})();

setup-queue只是要初始化议程并将其指向mongodb。

// setup-queue.ts

import { Agenda } from "@hokify/agenda";

const DATABASE_URI = process.env.DATABASE_URI;
const DATABASE_COLLECTION = process.env.JOBS_DATABASE_COLLECTION;

if (!DATABASE_URI || !DATABASE_COLLECTION) {
  throw new Error(
    "DATABASE_URI and JOBS_DATABASE_COLLECTION are missing in env"
  );
}

export default async function setupQueue() {
  const agenda = new Agenda({
    db: {
      address: DATABASE_URI,
      collection: DATABASE_COLLECTION,
    },
    processEvery: "5 minutes",
  });

  agenda.on("start", (job) => {
    console.info(`Job ${job.attrs.name} started`);
  });

  agenda.on("complete", (job) => {
    console.info(`Job ${job.attrs.name} completed`);
  });

  agenda.on("fail", (err, job) => {
    console.error(`Job ${job.attrs.name} failed`, err);
  });

  await agenda.start();

  return agenda;
}

db-view-updates将安排我们的视图更新。

// db-view-updates.ts

import { Agenda } from "@hokify/agenda";

import dbConnection from "../shared/dbConnection";
import { calculateDayStats } from "../models/DayStats";

const JOB_NAMES = {
  CALCULATE_DAY_STATS: "CALCULATE_DAY_STATS",
  CALCULATE_DAY_GEO_BUCKET_STATS: "CALCULATE_DAY_GEO_BUCKET_STATS",
  CALCULATE_DAY_PLATFORM_STATS: "CALCULATE_DAY_PLATFORM_STATS",
};

export default async function scheduleDbViewUpdates(agenda: Agenda) {
  const jobs = [];

  agenda.define(JOB_NAMES.CALCULATE_DAY_STATS, async (job) => {
    const connection = await dbConnection();

    await calculateDayStats();

    connection.disconnect();
  });

  jobs.push(await agenda.every("1 day", JOB_NAMES.CALCULATE_DAY_STATS));

  return jobs;
}

结论

这种简单的设置允许为性能密集型MongODB聚合添加任意数量的实现视图。它也可以与加入多个集合的聚合一起使用。

扩展此设置时,重要的是要关注聚合性能并使用索引优化它们,仅重新计算新文档并在辅助复制品上运行聚合。

工作示例代码可在GitHub上找到。