node.js中的分布式锁
#typescript #node #distributedsystems #redis

通常,您需要构建一个需要协调访问某个资源的功能。一个常见的用例是可以在背景中加工作业的队列。

这些工作可以是:

  1. 由一群工人接管,并并行处理。他们开始和结束的顺序无关紧要。
  2. 拾取并连续处理。他们处理的顺序是事项。

第一种情况通常足够简单。通常,您不必担心比赛条件。可以按照他们进来的任何顺序成功处理(或失败和重述)工作。

但是,当您必须保证尊重其处理顺序时会发生什么?如果您有一个服务器和一个工作线程,则很简单。工作将连续处理,并将尊重其订购。

当您部署在不同服务器上的工人舰队时会发生什么?您将需要“选举”您的一名工人,才能成为处理这些工作的人。更好的是,您必须确保该工人死亡或从服务器池中删除,另一个健康的工作人员必须找到工作并继续以某种切换方式进行处理。这是分布式锁定算法方便的地方。

这是我们的演示运行,我们在其中模拟了两个服务器竞争以获取锁定的服务器,以及服务器降低时锁的移交方式:

two terminal windows showing two instances of our app running and acquiring the lock

如果您是一个精明的node.js工程师,并且只想查看代码,则here is the Github repo ready for you.随意离开github star,以便人们可以更轻松地找到此分布式锁模板。

锁在一个节点上

A lock是一种共同的机制,可帮助程序员限制访问(相互排除)到某些资源,从而使您可以控制哪个线程可以读取或写入共享数据。

在基于node.js的单线程,基于事件 - 循环的运行时,您通常不需要锁,但是在Rust等多线程环境中,您通常需要使用原子操作来使用原子操作像std::sync::Mutex这样的标准库,该库保证仅一个线程一次访问给定的代码路径。

像这样的原始词非常有用,但是我们的用例超出了一台服务器。请记住,我们在这里越过服务器的边界,我们必须确保其他工人不会处理我们的工作,以防一个工人已经在处理它们。

我们需要扩展锁的概念并使其分布,其他服务器也可以看到谁握住锁并相应地行动。

使我们的锁与Redis分发

鉴于我们必须在服务器上共享锁状态,因此我们需要某种分布式数据共享机制,这些机制可以在我们的工人舰队中同步这些数据。我们可以以某种方式连接我们的工人并在其中发送消息,但是将其推出非常复杂。另一个替代方法是使用共享数据存储,例如数据库,该数据库可以使锁定状态立即在我们的服务器上使用。重新适合此目的。

redis是一项奇妙的技术。如果您不熟悉它,Redis是一个内存数据库。它非常快,可以充当您的服务器群中的共享缓存。

redis为原子操作提供API,这将支持我们保证只有一个工人可以在任何给定时间获得锁。我们将很快看到这一点,但是此博客文章在很大程度上遵循了Redis本身在其patterns manual here中建议的模式,称为 redlock 对于我们有一个Redis实例的情况。

为了促进对分布式锁的工作方式的理解,请在下图中查看。我们有四个工人的库,这些工人是node.js服务器,负责处理我们的假设串行队列。这四个服务器将同时尝试通过在REDIS中设置键值对来获取锁定。第一个设法执行此操作的服务器“赢”并将锁定锁。在手头的锁时,该服务器应该能够开始处理作业。

Lock 1

然后,如果需要关闭此服务器(例如在部署期间),则应释放锁定,并为其他服务器之一提供机会获取它并继续处理后台作业。

Lock 2

钥匙到期的故障安全

到目前为止,当服务器需要关闭时,我们的示例一直集中在释放锁定。但是,在硬件故障期间会发生什么?如果您的应用程序恐慌并无处不在崩溃怎么办?这种情况可能会导致您进入deadlock,在该deadlock中,您的服务器池中没有其他服务器可以获取锁定以保持处理作业。

redis提供了一个非常有用的功能,每当您设置键值对时,您还可以定义此值可以在其内存上持续多长时间。雷迪斯将在给定时间过去后自动清理。

我们将很快看到如何在有效期设置钥匙,但这是高级的外观:

Lock 3

不时更新锁

鉴于我们的锁以我们在Redis中存储的条目表示,在一定时间后自动到期,我们必须确保持有锁的工人可以不时续签锁定,以确保它仍然是一个握着锁。

为执行此操作,Redis还提供了一个API,可以专门更改给定键的到期时间。 EXPIRE命令将帮助我们实现这一目标。

但是足够的理论,让我们介绍代码,然后在行动中看到它。

通过演示开始

我们首先要在行动中演示此实现,以便您了解它的工作原理,然后我们会逐步查看代码。

为了启动这个项目,我已经将RemixExpress adapter一起使用,所以我可以
拥有一个超级快速的项目已经设置了打字稿。该代码可用here,所以只需克隆它,npm install即可开始。

但是,在运行我们的node.js应用程序之前,我们需要redis作为依赖项可用,以便我们的应用可以连接到它。安装Redis的最简单方法是使用Docker。我们的项目已经配备了一个方便的docker-compose文件,可以为您重新引导您。

因此,请确保Docker正在运行并执行以下操作:

docker-compose up -d

这将在后台运行Redis,这将非常适合我们的应用程序。现在,复制.env.example文件并将其重命名为.env。这应该足以启动我们的应用程序:

npm run dev

如果您观看服务器日志,则应看到一些日志显示:

  • 您的节点试图获取锁
  • 锁已成功获得
  • 它正在做一些(模拟的)工作,例如处理背景工作
Trying to acquire lock { workerId: '01H63B5EAKQZTCDAT8AAVQ9WAG' }
Lock acquired. Starting work { workerId: '01H63B5EAKQZTCDAT8AAVQ9WAG' }
Doing some heavy work... { workerId: '01H63B5EAKQZTCDAT8AAVQ9WAG' }

现在,打开一个新的终端窗口,并在其他端口上从我们的node.js应用程序上启动一个新的过程:

PORT=3001 npm run dev

这模拟了您的应用程序运行的另一台服务器并尝试获取锁定。您只会看到您的新过程将不断尝试每5秒获取一次锁。您会看到如下:

Trying to acquire lock { workerId: '01H63B9M322QAP09D6EJ19SGWC' }

现在,在第一个终端窗口上关闭您的应用程序,一个握住锁的窗口。现在看看第二个终端窗口。它应该拿起锁并开始处理背景作业。

Trying to acquire lock { workerId: '01H63B9M322QAP09D6EJ19SGWC' }
Lock acquired. Starting work { workerId: '01H63B9M322QAP09D6EJ19SGWC' }
Doing some heavy work... { workerId: '01H63B9M322QAP09D6EJ19SGWC' }

这就是我们想要完成的。我们要确保我们的后台工作永远不会被抛在后面。当一个工人倒下时,另一名工人会捡起它并继续处理。

请注意,我们始终登录Worker ID,因此很容易识别哪个Worker
节点具有锁,哪些只是在等待并尝试获取它。
鉴于我们
,节点ID是该锁实现的基本部分 使用它来识别哪个节点正在积极持有锁。节点ID是
在服务器启动时间内自动分配。

使用状态机管理我们的锁定状态

您可能在问自己:

每个节点如何知道如何以及何时获取锁或等待尝试
再次获取?

这是州机器的进来。它们帮助我们以非常优雅的方式对状态过渡进行建模。

我个人喜欢使用xstate,这是一个有限状态机器库,可帮助我们建模状态以及它们之间的过渡,包括故障模式。

Xstate的维护者Stately的人们甚至提供了一个用于建模和可视化状态机器的Web UI。以下是:

Lock xstate machine

您也可以使用this interactive demo here玩。在此UI上,您可以模拟所有状态过渡,并在视觉上查看状态机的工作原理。

在服务器启动期间启动我们的州机器

该状态机的主要目标是协调以下内容:

  • 一旦我们的服务器启动,我们必须向状态机发出信号,该尝试获取锁定了:
    • 如果成功获取了锁,我们的工人可以开始处理背景作业。同时,我们的工人还需要续订锁,以确保它可以继续处理背景工作。
    • 以防万一它无法获取它,要么是因为另一个已经获得了它的工人,要么是因为Redis不可用,我们的工人需要坐在闲置状态几秒钟,然后稍后尝试获得锁定再次。

如果您不熟悉状态机,请不要担心。 XSTATE introductory article在介绍您的概念方面做得非常出色,并指导您创建第一个状态图表,但让我们浏览代码以查看整个过程的工作原理,我将在此处引用我们的演示项目,以便在此处使用链接,以便于读者将其跟踪回代码。

一旦我们开始收听HTTP请求,here我们就可以通过致电startLockWorker启动我们的工人:

httpServer.listen(process.env.PORT, () => {
  console.log("Express server started at port " + process.env.PORT);
  startLockWorker();
});

现在,让我们前往工人实施startLockWorker here

export function startLockWorker() {
  service.start();
  service.send("TRY_ACQUIRE_LOCK");
}

我们有一个service的实例。虽然您可以阅读有关它们的更多信息here,但服务包裹我们的州机器,并允许我们观察国家如何移动
及时向前,允许我们连接回回来并监视状态如何随着时间的推移而变化。

我们首先是start我们的服务,因此它会倾听事件,然后立即将其发送给TRY_ACQUIRE_LOCK。该活动将启动我们的州机器,该机器将在内部触发其内部服务以尝试获取锁。我们将有一个
很快就会查看我们的实际状态机代码。

如果您查看我们创建此service实例的位置,您将看到the following call

const service = interpret(
  createLockMachine({
    workerId,
    acquireLock: acquireWorkerLock,
    releaseLock: releaseWorkerLock,
    renewLock: renewWorkerLock,
    startWork: consumeResource,
    stopWork: stopConsumingResource,
  })
);

这实际上是我们在调用createLockMachine时创建状态计算机实例的地方。此呼叫采用一个带有几个参数的对象:

  • workerid :为我们提供该状态机属于哪个工人的上下文。如果您熟悉React状态管理,则Xstate中的context与React状态相似。在Xstate“状态”之间过渡时,通常会修改其上下文。
  • AcceariRock :试图获取锁的异步回调。当成功获取锁定或在失败时丢弃错误时,它可以解决。
  • 发行:试图释放锁定的异步回调。当锁成功释放或通过失败时丢弃错误拒绝承诺时,它可以解决。
  • Renewlock :一种异步回调,在工人持有时试图续订锁。当锁定成功或拒绝诺言时,它可以解决。
  • startwork :一种异步回调,开始进行实际的背景作业处理。
  • stopwork :一种异步回调,可停止任何背景作业处理。这对于工人不设法续订锁的情况很重要,因此需要停止和等待。

这些回调是我们的服务接口,使我们可以重复使用状态机器。我们可以通过创建它的不同实例并传递执行实际业务逻辑的服务回调来重复使用同一台机器来管理在我们应用程序中的不同分布式锁。

现在让我们看看our state machine

export function createLockMachine({
  workerId,
  acquireLock,
  renewLock,
  releaseLock,
  startWork,
  stopWork,
}: CreateMachineArgs) {
  return createMachine(
    {
      predictableActionArguments: true,
      id: "lock-worker",
      context: {
        workerId,
      },
      schema: {
        services: {} as Services,
        events: {} as Event,
      },
      initial: "idle",
      on: {
        STOP: "cleanup",
      },
      states: {
        idle: {
          on: {
            TRY_ACQUIRE_LOCK: "acquiring_lock",
          },
        },

        cleanup: {
          invoke: {
            src: "stopWork",
            onDone: {
              target: "releasing_lock",
            },
            onError: {
              target: "releasing_lock",
            },
          },
        },

        acquiring_lock: {
          invoke: {
            src: "acquireLock",
            onDone: {
              target: "working",
            },
            onError: {
              target: "waiting_to_acquire_lock",
            },
          },
        },

        working: {
          invoke: {
            src: "startWork",
          },
          after: {
            "5000": {
              target: "renew_lock",
            },
          },
        },

        renew_lock: {
          invoke: {
            src: "renewLock",
            onDone: {
              target: "working",
            },
            onError: {
              target: "pause_work",
            },
          },
        },

        pause_work: {
          invoke: {
            src: "stopWork",
            onDone: {
              target: "waiting_to_acquire_lock",
            },
            onError: {
              target: "waiting_to_acquire_lock",
            },
          },
        },

        releasing_lock: {
          invoke: {
            src: "releaseLock",
            onDone: {
              target: "idle",
            },
            onError: {
              target: "idle",
            },
          },
        },

        waiting_to_acquire_lock: {
          after: {
            "5000": {
              target: "acquiring_lock",
            },
          },
        },
      },
    },
    {
      services: {
        acquireLock: (context, _event) => {
          console.log("Trying to acquire lock", { workerId: context.workerId });
          return acquireLock();
        },
        renewLock: (context, _event) => {
          console.log("Renewing lock", { workerId: context.workerId });
          return renewLock();
        },
        releaseLock: (context, _event) => {
          console.log("Releasing lock", { workerId: context.workerId });
          return releaseLock();
        },
        startWork: (context, _event) => {
          console.log("Lock acquired. Starting work", {
            workerId: context.workerId,
          });
          return startWork();
        },
        stopWork: (context, _event) => {
          console.log("Stop work", { workerId: context.workerId });
          return stopWork();
        },
      },
    }
  );
}

好的,不要不知所措。这就是Xstate让我们声明状态机的方式。它主要是具有特定结构的JavaScript对象。

您要注意的主要事情是我们的机器从一个状态过渡到另一种状态。让我们看一下我们的机器进入acquiring_lock状态时会发生什么:

acquiring_lock: {
  invoke: {
    src: "acquireLock",
    onDone: {
      target: "working",
    },
    onError: {
      target: "waiting_to_acquire_lock",
    },
  },
},

进入此状态时,它是invokeS(SRC)服务,称为acquireLock。在我们的情况下,服务调用是异步的函数,当解决问题后,它将使用onDone过渡,并将移至target状态,在这种情况下为working

如果此回调拒绝退还的承诺,它将输入onError块,并将过渡到waiting_to_acquire_lock state。

让我们看一下working State here

working: {
  invoke: {
    src: "startWork",
  },
  after: {
    "5000": {
      target: "renew_lock",
    },
  },
},

首先,请注意,它在invoke子句中没有任何目标状态。鉴于我们的工人正在执行背景工作处理时,这是通过设计,我们希望它保持这种状态。但是它确实有一个称为startWorksrc服务。这是我们的服务回调,允许我们的工人有效地开始处理背景作业。

还注意到我们有一个after块。这就是Xstate让我们声明将在给定时间段后自动触发的状态转换的方式。在这种情况下,每当我们的状态机器处于working状态时,5
秒,它将过渡到renew_lock状态。

让我们前往renew_lockhere

renew_lock: {
  invoke: {
    src: "renewLock",
    onDone: {
      target: "working",
    },
    onError: {
      target: "pause_work",
    },
  },
},

这个状态很有趣。它将调用一项名为renewLock的服务,如果它成功,它将过渡到working State,这将使我们的工人有机会继续与我们的背景工作一起工作。对于失败的情况,我们的目标状态是pause_work,又有SRC服务,可以通知我们的工人停止任何背景处理。

在这次旅行之后,您可能想知道这些字符串标识符如何变成函数调用?让我们看一下状态机器中的services部分:

{
  services: {
    acquireLock: (context, _event) => {
      console.log("Trying to acquire lock", { workerId: context.workerId });
      return acquireLock();
    },
    renewLock: (context, _event) => {
      console.log("Renewing lock", { workerId: context.workerId });
      return renewLock();
    },
    releaseLock: (context, _event) => {
      console.log("Releasing lock", { workerId: context.workerId });
      return releaseLock();
    },
    startWork: (context, _event) => {
      console.log("Lock acquired. Starting work", {
        workerId: context.workerId,
      });
      return startWork();
    },
    stopWork: (context, _event) => {
      console.log("Stop work", { workerId: context.workerId });
      return stopWork();
    },
  },
}

这是createMachine调用的第二个参数。请注意,这里的关键名称都与我们之前见过的src匹配。这是我们连接给我们锁定机函数签名和状态
的回调的地方 机器。这是一种抽象,使我们能够使状态机在应用程序中灵活和重复使用。

REDIS在行动中到期钥匙

我们已经浏览了状态机,看到了锁定回调的调用,但是我们如何确保Redis正在按照我们的期望运行?让我们看看我们如何使用ioredis更新键的到期时间。

让我们前往我们的acquireLock函数here:

export async function acquireLock(
  lockKey: string,
  lockValue: string,
  expireAfterInSeconds: number
): Promise<boolean> {
  const result = await redis.call(
    "set",
    lockKey,
    lockValue,
    "NX",
    "EX",
    expireAfterInSeconds
  );

  return result === "OK";
}

在这里,我们使用本机redis命令来设置我们的密钥。秘密调味料在NXEX参数中:

  • nx :设置如果不存在。返回成功案例的“确定”,否则否则
  • ex :设置有效期。键将在经过的时间(以秒为单位给出)
  • 后删除

这是本实施的基本部分。 Redis允许我们拥有自动到期并从其内存中删除的键,这可以确保我们不会面对lock contention。这涵盖了服务器崩溃并且没有机会释放锁定的最终情况,这将为我们所有其他工人永远等待收购锁定。

通过延长到期时间来更新锁

每当我们的工人握住锁时,它都会不时更新锁。这也被Redis API覆盖。让我们看看我们的重新锁函数here

export async function renewLock(
  lockKey: string,
  lockValue: string,
  expireAfterInSeconds: number
): Promise<boolean> {
  const result = await redis.get(lockKey);

  // Lock is available, we can attempt to acquire it again.
  if (result === null) {
    return acquireLock(lockKey, lockValue, expireAfterInSeconds);
  } else if (result === lockValue) {
    // Lock is still held by this worker
    // we can safely renew it by extending its expiry time
    await redis.expire(lockKey, expireAfterInSeconds);
    return true;
  } else {
    throw new Error(
      "Lock held by another node. Can neither renew or acquire it"
    );
  }
}

在这里,我们使用ioredis API到期密钥。如果我们将到期时间设置为零,则该API允许立即从Redis中取出密钥,或者将现有密钥的到期时间延长给定秒。

这正是我们在这里所做的。每当我们的工人握住锁时,我们都会延长到期时间。请注意,workerId在这里如何发挥重要作用。我们将其用作lockValue。这样,我们确保只有持有锁的工人才能安全续订。

从这往哪儿走

我承认我主要浏览了这里的大多数概念,但是重要的是,您现在拥有所有构建块来在Node.js中构建分布式锁,包括一个可以使用的演示项目(available here)作为参考。

不过,该演示的进一步是增加我们的REDIS实例的可用性。目前,我们只处理Redis的一个实例,但是如果Redis失败了,会发生什么?为了改善此实现,您应该查看
具有带有多个主节点的redis群集。那就是您实际上可以全部实施Redlock algorithm的地方。