node.js应用程序的OpenSearch迁移
#node #elasticsearch #opensearch #platformengineering

介绍

OpenSearch没有提供有关索引模板,索引映射,分析仪和搜索模板的群集状态的功能。在典型的数据库中,我们希望找到一些通常由ORM提供的迁移工具。 OpenSearch客户端库不提供任何类型的内容,ORMS要么不支持OpenSearch,要么没有支持它的迁移工具。

该帖子的代码可以在https://github.com/Npfries/opensearch-umzug上找到。它包含一个带有基本开发OpenSearch和OpenSearch仪表板容器的Docker-compose.yml文件。

大纲

  • 计划
    • JavaScript项目的Umzug迁移框架
  • 实施
    • 抽象CustomStorage class
    • OpenSearch REST API vs typeScript客户端
    • 提供执行的迁移
    • 记录迁移
    • 解雇迁移
    • 创建迁移
  • 摘要

规划

从头开始实施良好的迁移工具不够小,以至于许多团队无法考虑。幸运的是,我们不必从头开始完全启动。看看Microsoft工程师创建的JavaScript的流行开源ORM续集,我们可以看到他们还发布了开源的Umzug迁移框架,该框架为Nequelize提供的迁移工具提供动力。我们可以利用这种现有工具并扩展其以适应我们的需求。这将节省大量的工作,并带来相似的表现的额外好处。

执行

umzug支持多个数据库,每个数据库都实现了Abstract UmzugStorage类。该类是由框架公开的,可以作为配置上的customStorage属性提供给Umzug构造函数。我们可以创建自己的OpensearchStorage来实现UmzugStorageUmzugStorage的实现必须实现三种方法:

  • executed()-这应该返回执行的迁移列表。
  • logMigration()-这应该记录迁移。
  • unlogMigration()-这应该解除迁移。
// OpensearchStorage.js
import { UmzugStorage } from "umzug";

class OpensearchStorage implements UmzugStorage {
    async executed() {}
    async logMigration(params) {}
    async unlogMigration(params) {}
}

export { OpensearchStorage };

在使用打字稿代码库中使用OpenSearch时要考虑的另一件事是,您是否愿意使用OpenSearch JavaScript客户端库,或者使用暴露的REST API。使用OpenSearch客户端是典型的,因为它为使用OpenSearch REST API提供了令人愉悦且主要是键入的抽象。

  • 一方面,REST API相对稳定,并且请求格式将与OpenSearch提供的QDSL语法几乎相同。在升级OpenSearch之前,我们需要检查是否要对REST API进行破坏,因为我们不会在IDE中收到弃用警告。

  • 另一方面,OpenSearch客户端库易于使用,升级客户端时我们将在IDE中收到折旧警告,但是我们可能需要考虑包装更新的频率以使库保持与我们的OpenSearch Cluster约会。

为了方便起见,我们将选择OpenSearch客户库库。请记住,我们的选择必须相同,以实施UmzugStorage和实际迁移本身,我们最终将获得。维护此实施的团队可能与维护实际迁移的团队或团队不同,他们可能有不同的需求或偏好。

在我们实现OpensearchStorage类之前,我们需要准备OpenSearch客户端。我们将从@opensearch-project/opensearch导入它。

// OpensearchClient.js
import { Client } from "@opensearch-project/opensearch";

class OpensearchClient extends Client {
    constructor() {
        super({
            node: process.env.OPENSEARCH_HOST,
        });
    }
}

export { OpensearchClient };

然后我们可以在OpensearchStorage中消耗客户端。

// OpensearchStorage.js
import { UmzugStorage } from "umzug";
import { OpensearchClient } from "./OpensearchClient.js";

class OpensearchStorage implements UmzugStorage {
    client: OpensearchClient;

    constructor() {
        this.client = new OpensearchClient();
    }

    async executed() {}
    async logMigration(params) {}
    async unlogMigration(params) {}
}

export { OpensearchStorage };

现在,我们准备在OpensearchStorage上实现方法。从executed开始,此方法不采用任何参数,应返回所有先前执行的迁移作为代表每个应用迁移名称的字符串。

async executed() {
  const migrationsIndexExists = (
    await this.client.indices.exists({ index: 'migrations' })
  ).body

  if (!migrationsIndexExists) {
    await this.client.indices.create({ index: 'migrations' })
    return []
  }

  const respose = await this.client.search({
    index: 'migrations',
    body: {
      query: {
        match_all: {}
      },
      size: 100
    }
  })

  const result = response?.body?.hits?.hits?.map(m => m['_source']['name']) ?? []

  return result
}

让我们分解这里发生的事情。

  • 首先,我们正在检查迁移索引是否存在。如果没有,我们可以继续创建它,并且我们可以假设尚未应用迁移,因此我们用空数组进行了尽早返回。此方法将在其他方法面前调用,因此这可能是进行迁移索引创建的好地方。

  • 接下来,我们使用match_all查询执行搜索,以返回所有结果,并将大小设置为任意的数字,这比可预见的迁移数量要高。总是可以调整这一点以适合需求,但应考虑,因为在这里意外截断的结果将导致迁移不止一次。

  • 最后,我们将结果映射到仅包含迁移名称的数组,这是我们所需的返回值。

转到logMigration,此方法应执行其名称所建议的内容,并将迁移记录到我们在executed中创建的迁移索引。

async logMigration(params) {
  await this.client.index({
    index: 'migrations',
    body: {
      name: params.name,
      timestamp: new Date().toISOString()
    },
    refresh: true
  })
}

此方法要简单得多,因为它所需要做的就是将迁移记录到迁移索引。当特定迁移完成时,我们包括一个用于跟踪的时间戳字段。我们添加refresh: true,以便在返回之前等待可查询的记录,否则可以进入迁移后不久拨打executed的情况,将导致重复的迁移执行。这最有可能在集成测试而不是正常的迁移执行过程中发生,但我们将其包括在内。最后,我们可以实现unlogMigration

async unlogMigration(params) {
  await this.client.deleteByQuery({
    index: 'migrations',
    body: {
      query: {
        bool: {
          filter: [
            {
              term: {
                name: params.name
              }
            }
          ]
        }
      }
    }
  })
}

这几乎与logMigration一样简单,但是我们必须执行过滤器查询才能删除正确的迁移。我们可以将它们放在一起,并完整地实现了UmzugStorage类。

// OpensearchStorage.js
import { OpensearchClient } from "./OpensearchClient.js";

class OpensearchStorage {
    client;

    constructor() {
        this.client = new OpensearchClient();
    }

    async executed() {
        const migrationsIndexExists = (await this.client.indices.exists({ index: "migrations" })).body;

        if (!migrationsIndexExists) {
            await this.client.indices.create({ index: "migrations" });
            return [];
        }

        const response = await this.client.search({
            index: "migrations",
            body: {
                query: {
                    match_all: {},
                },
                size: 100,
            },
        });

        const result = response?.body?.hits?.hits?.map((m) => m["_source"]["name"]) ?? [];

        return result;
    }

    async logMigration(params) {
        await this.client.index({
            index: "migrations",
            body: {
                name: params.name,
                timestamp: new Date().toISOString(),
            },
            refresh: true,
        });
    }

    async unlogMigration(params) {
        await this.client.deleteByQuery({
            index: "migrations",
            body: {
                query: {
                    bool: {
                        filter: [
                            {
                                term: {
                                    name: params.name,
                                },
                            },
                        ],
                    },
                },
            },
        });
    }
}

export { OpensearchStorage };

为了在执行迁移时使用OpensearchStorage,我们应该将其传递给Umzug的实例。通常将其用于CI环境,或在本地环境中由用户运行。因此,我们将设置Umzug以作为脚本运行,而不是直接的应用程序代码。

// migrate.js
#!/user/bin/env node
import { OpensearchStorage } from './OpensearchStorage.js'
import { OpensearchClient } from './OpensearchClient.js'
import { Umzug } from `umzug`

const client = new OpensearchClient()

const umzug = new Umzug({
  migrations: {
    glob: 'migrations/**/*.cjs'
  },
  logger: console,
  context: client,
  storage: new OpensearchStorage()
})

umzug.runAsCLI()

让我们命名此文件migrate.js。 Umzug提供了一个有用的runAsCLI()助手,可以为我们解析争论,从而节省了一些努力。默认情况下,Umzug能够执行JavaScript迁移而无需任何其他工作。可以通过生成迁移并传递从任何任意来源生成的一系列迁移来修改。让我们继续创建我们的第一个迁移。 Umzug期望每个迁移都有两个出口:向上步骤和下降步骤。如上所述,我们将使用OpenSearch客户端进行迁移工具以及迁移本身。

// [date]_[name].cjs
module.exports = {
    async up({ context: client }) {
        await client.indices.create({
            index: "test_index",
        });
    },
    async down({ context: client }) {
        await client.indices.delete({
            index: "test_index",
        });
    },
};

运行node ./migrate.js up应导致执行迁移目录中的任何迁移。

概括

总而言之,我们为OpenSearch配置了自定义存储实现,以提供给我们选择的迁移工具Umzug。为了简单,我们选择在REST API上使用OpenSearch客户端库。然后,我们将UMZUG配置为在使用自定义存储扩展后将其作为CLI运行。这是进一步扩展工具以支持多个环境和主机的理想位置,例如在镜像环境的情况下。最后,我们创建了第一个开放式搜索迁移。将来,我可能会探索实施适配器以支持QDSL迁移,以便在OpenSearch Dashboards Dev工具和我们的迁移工具之间提供互操作性。