发电机
#javascript #编程 #deno

previous article中,我在JavaScript中谈到了迭代器和发电机,但没有提供任何实际示例。在本文中,我旨在填补这一空白,并通过简单的Web应用程序使用发电机演示特定模式。

本文也可能对那些希望了解Redux-Saga的工作方式有所帮助。

此外,我还渴望探索 deno (节点的替代方案),并且将其用作代码的运行时环境,因此示例将在这次中使用。

tldr -Code

有点关于DeNo

由于我已经提到 deno ,让我简要概述它。 DENO是一个用于执行JavaScript和TypeScript代码的运行时环境,可作为Node.js的替代方案。这是其与节点的主要区别:

  • 本机打字稿支持:deno本身支持打字稿,这意味着您不必手动将打字稿手动转移到JavaScript中; Deno的子系统为您照顾。
  • 基于权限的系统:DENO的权限基于命令行标志,提供了对脚本运行时能够拥有的权限的更多控制。
  • 完整的URL而不是软件包名称:在DeNo中,您指定完整的URL代替软件包名称,允许您在同一程序中使用不同版本的软件包。
  • 现代标准库:Deno配备了自己的一套标准实用程序,这些实用程序与Node.js相比提供了更现代的API。例如,所有异步操作都返回承诺,而不是依靠回调。

应用程序

那么,我们的Web应用程序将是什么样的:

这将是一个能够做几件事的聊天机器人:

  • 报告当前时间
  • 添加数字
  • 它将能够同时与多个用户进行交互

Web界面的设计如下:

web interface ui

用于在DENO中使用WebSocket创建Web服务器,不需要专门的库。标准功能足以满足我们的任务。但首先...

关于发电机的更多信息

在上一篇文章中,我介绍了发电机和迭代器,但我仅介绍了它们能力的一小部分。在本文中,我也不涵盖所有方面,但是我将重点介绍两个功能,这些功能对于解决我们的初始任务很有用。

异步迭代器

在每个调用中,迭代器都会返回一些值。如果每个步骤返回的值是一个承诺,则将这样的迭代器称为异步迭代器。对于此类迭代器,我们可以使用一种特殊的循环来遍历它们:等待。为此,我们要使用此循环支持遍历的对象必须具有一种称为符号的特殊方法。关键字返回异步迭代器)。

示例:

async function* timer() {
  let i = 0;
  while (true) {
    yield new Promise(
      (resolve) => setTimeout(() => resolve(++i), 1000)
    );
  }
}

for await (const tick of timer()) {
  console.log(tick);
}
// 1 2 3 ... 

屈服*

除了用于返回迭代器的当前值的运算符外,还存在 atreffer*运算符。它将迭代剂作为参数并顺序返回其所有值。进入发电机,它返回迭代器的输出( done 是True的第一个值,或者在生成器的情况下,该值传递给 return ) 。

function* concat<T>(...iterables: Iterable<T>[]) {
  for (const iter of iterables) {
    yield* iter;
  }
}

for (const i of concat([1, 2], [3, 4])) {
  console.log(i);
}
// 1 2 3 4

代码

在特定端口上创建HTTP服务器在DENO中很简单:

import { serve } from "https://deno.land/std@0.74.0/http/server.ts";

const server = serve({ port: 8080 });
console.log("Listening on 8080");

for await (const req of server) {
  req.respond({ status: 200, body: 'Hello, world!' });
}

Github

您可以看到,由服务功能调用产生的服务器是异步迭代器。 Next 方法的每个调用都会返回接收到传入连接时解决的承诺。

接下来,我们需要服务器中的两件事:服务静态文件和处理WebSocket连接。让我们创建一个简单的助手,将链条请求处理程序融合在一起,类似于Express中的中间Wares。

type MiddlewarePayload = {
  url: URL;
  req: ServerRequest;
};

type MiddlewareFn = (options: MiddlewarePayload) => Promise<true | undefined>;

const combineProcessors = (...fns: MiddlewareFn[]) => async (options: MiddlewarePayload) => {
  for (const fn of fns) {
    const result = await fn(options);
    if (result) {
      return result;
    }
  }
}

Github

每个处理程序传递到这样的组合者,应归还一个承诺,如果处理该请求,该诺言将其解决;否则,它可以解决不确定的

最终,请求处理代码看起来像这样:

const processors = combineProcessors(index, staticFiles, wsMiddleware);

const server = serve({ port: 8080 });

console.log("Listening on 8080");

// The request's host is not important for our program,
// but it's required for the URL constructor
const BASE = "http://localhost";
for await (const req of server) {
  const url = new URL(req.url, BASE);
  const result = await processors({ url, req });
  if (!result) {
    req.respond({ status: 404 });
  }
}

Github

我不会在此处解释 index staticfiles 功能,因为它们处理静态文件的服务,如果您有兴趣,可以在GitHub上找到。但是,让我们更详细地研究Websocket Connection处理程序。

频道

渠道的概念在JavaScript语言诞生之前就存在。该模型描述了在异步环境中的过程间通信和消息传递。可以在许多现代编程语言中找到渠道的本地实现,例如 go rust kotlin clojure 和其他人。

如果您熟悉流的概念,则过渡到频道将相对简单。像通道一样的流提供了对顺序数据的异步访问。主要区别在于他们的访问模型:一流使用订阅模型(消息到达时,致电处理程序),而通道使用阻止模型(给我下一条消息,并且在到达之前,请勿继续进行)。这是他们用法的一个例子:

/** Streams **/
const stream = new Stream();
stream.subscribe(callback);
// Somewhere else in the code
stream.emit(data);

/** Channels **/
const ch = new Channel();
// Somewhere else in the code
ch.put(data);
// Somewhere else in the code
const data = await ch.take();

这是我们的示例中如何实现频道的方式:

class Channel {
  private takers: Array<(payload: string) => void> = [];
  private buffer: string[] = [];

  private callTakers() {
    while (this.takers.length > 0 && this.buffer.length > 0) {
      const taker = this.takers.shift()!;
      const payload = this.buffer.shift()!;
      taker(payload);
    }
  }

  take() {
    const p = new Promise<string>((resolve) => {
      this.takers.push(resolve);
    });
    this.callTakers();
    return p;
  }

  put(message: string) {
    this.buffer.push(message);
    this.callTakers();
  }

  async listen(sock: WebSocket) {
    for await (const event of sock) {
      if (typeof event === "string") {
        this.put(event);
      }
    }
  }
}

Github

说明这里发生的事情:

  • 有一个数组buffer,存储了传入消息。
  • 有一个阵列takers,可以存储解决诺言的函数。
  • 在每个put(将消息输入频道)上(等待并从频道中获取消息)方法调用,进行检查以查看缓冲区中至少有一条消息,至少一个消息接受者。如果是这样,将解决该消息,从缓冲区中删除,然后将其从接管器数组中删除。
  • 还有一个助手listen方法,它订阅了给定插座的所有消息并将其放入频道。

为什么我们首先需要频道?订阅模型怎么了?这将在稍后变得明显。目前,我只说这使我们能够编写异步代码,就好像它是同步的(这正是为 async/neagait 创建的)。

那发电机呢?

的确,如果您希望在文章早期看到发电机的实际应用,那么您可能会感到惊讶。但是,是时候探索核心概念了。

提醒我们我们要实现的目标以及我们已经拥有的目标:我们通过Websocket从用户接收到频道的消息,并且我们通过WebSocket将消息从机器人发送给用户。

让我们定义“效果”的形状:

type Effect = {
  type: string;
  [key: string]: any;
};

如果您熟悉通量或redux体系结构,则会识别出这种结构 - 与动作非常相似!在我们的情况下,效果将达到非常相似的目的。在redux中,以下公式适用:

const newState = reducer(state, action);

对我们来说,它将这样工作:

while (true) {
  const { value: effect, done } = iter.next(current);
  // Code to handle the effect goes here
  if (done) break;
}

这是一个想法:让我们代表与用户作为生成器的整个对话。该发电机将产生影响并接受处理这些效果的结果。我们将拥有一个特殊的代码来运行发电机并处理效果。在这里是:

export async function handleWs(sock: WebSocket) {
  const incoming = new Channel();
  incoming.listen(sock);

  let current: string = "";
  const iter = dialog();
  while (true) {
    const { value: effect, done } = iter.next(current);
    if (!effect) {
      break;
    }
    switch (effect.type) {
      case "say": {
        sock.send(effect.text);
        break;
      }
      case "listen": {
        current = await incoming.take();
        break;
      }
    }
    if (done) {
      break;
    }
  }
}

Github

建立新的WebSocket连接时,调用此功能。

我们使用两种类型的效果(尽管可能有尽可能多的效果):

  • say 效果 - 指出我们需要向用户发送答复。
  • listen 效果 - 指出我们需要等待用户的消息。 尽管循环是无限的,但由于循环中存在await,这不会导致该过程的任何阻塞。这将中断循环的执行,直到在频道中收到消息为止。通过使用频道使这成为可能。

现在让我们看一下实际对话的外观:

const say = (text: string) => ({ type: "say", text } as const);
const listen = () => ({ type: "listen" } as const);

function* dialog() {
  yield say('Welcome to "Do what I say BOT"');
  while (true) {
    const message: string = yield listen();
    if (message.toLowerCase().includes("time")) {
      yield say(`It is ${format(new Date(), "HH:mm:ss")}`);
    } else if (message.toLowerCase().includes("sum")) {
      yield* sumSubDialog();
    } else {
      yield say(`I don't know what to say!`);
    }
  }
}

function* sumSubDialog() {
  yield say("Okay, what numbers should we sum?");
  let result = 0;
  let message = yield listen();
  while (true) {
    const num = Number(message);
    if (isNaN(num)) {
      break;
    } else {
      result += num;
    }
    yield say("Got it!");
    message = yield listen();
  }
  yield say(`The result is: ${result}`);
}

Github

对话表示为在每个步骤中产生不同效果的发电机。 say 效果立即执行,向套接字发送消息,并且发电机的代码继续无需等待外部环境的数据。 侦听效果暂停发电机的执行,直到收到消息为止,然后立即将其传递回发电机。

这种方法的优点如下:

  • 简洁:发电机提供了一个紧凑而可读的业务逻辑表示,只要代码不会变得太深嵌套(可以通过分解生成器来减轻)。
  • 分解的便利性:您可以使用 atreffer*运算符将主发电机的逻辑拆分为多个子基因生成器,从而简化了代码结构。
  • 效果的简单性:效果是简单的结构,易于构造,可以通过网络进行序列化和传播(尽管这种用例尚不清楚)。
  • 对话的隔离:对发电机的每个调用都会返回具有其自身关闭的新迭代器,启用了无资源泄漏的多个并行对话(假设发电机内部不使用全局变量)。

这种方法的一个主要缺点是,未自动推断出 tarme 运算符的返回值的类型。这是合乎逻辑的,因为生成器的代码是签名,并且可以将其传递给下一个创建的迭代器的类型已经从中推断出来。因此,必须在我们的方法中手动提供屈服值的类型。

我想突出这种方法的重要方面:发电机内部的代码(对话说明)非常抽象。它不处理通过频道的消息,传输错误处理或执行环境特定的任何内容。它代表纯业务逻辑 - 对业务流程的精确和无声描述。

在理想的情况下,生成器应为纯函数。对于发电机,这意味着对于相同的输入参数,返回的迭代器应相同(相同的输入数据序列会生成相同的输出值序列)。发电机的纯度确保了业务逻辑的封装,这意味着它描述了特定的用例,并且不依赖执行环境。这也使得为您的业务逻辑编写测试很容易:您只需要检查发电机返回的效果顺序与预期序列匹配。

想阅读更多吗?

  1. js-csp。 JS实施中的CSP。
  2. redux-saga。 JS中最流行的效果库
  3. Coroutines