可扩展的Websocket服务器由Chatgpt实施
#node #websocket #chatgpt #scalable

possessedphotography

的封面照片

在上一篇文章Why Websockets are Hard to Scale中,我们已经讨论了扩展Websocket和一些现有解决方案的问题。使用负载平衡技术提到的帖子称为一致的哈希,这是解决此问题的非常优雅的解决方案。我向您保证了一篇有关如何通过一致的散列实现可扩展Websocket服务器的文章。

好吧,这一天来了,喝咖啡并跟随。特别感谢Chatgpt,您在这篇文章中几乎所有的代码都是由Chatgpt撰写的。我只进行了一些调整,因此代码实际上有效。


该计划是使用Chatgpt生成的代码,并在其上进行尽可能少的修改。如果您阅读了上一篇文章,我确定您会概述我们将要做什么。但是要再次提醒您,这里是我们实施的步骤:

  • 创建WebSocket服务器
  • 创建带有服务器地址的障碍,必要时进行更新
  • 检索服务器列表,并保留所有服务器的更新
  • 重定向客户端纠正服务器
  • 额外:创建一个可以重定向自己的客户端

新最喜欢的IDE解锁:求职者

我不愿意测试chatgpt,但事实证明,它非常有帮助,足够聪明,可以给我代码块,但仍然足够愚蠢,可以从事我的工作^^。无论我们是否喜欢它,它已经在这里。与所有其他技术一样,如果您更早地接受它,则可以利用它来发挥自己的优势。至少这就是我从经验中意识到的。

chatgpt有时可能会具有挑战性,但是通过练习并使用更具体的提示,可以解锁其全部潜力。我建议您看看我亲爱的同事FatihKadirAkä±n的awesome-chatgpt-prompts,以了解如何与Chatgpt有效交流。


如何更好地将chatgpt用于代码生成

由于chatgpt不会为我们创建一个完全完成的项目,因此每个实施步骤都涵盖一个概念。因此,我们可以为每个步骤获得一个工作代码块并将它们粘合在一起!

哈格实施

我已经从哈希环境开始,并要求chatgpt在打字稿中创建一个hashring enlashaiton,并支持虚拟节点。 Chatgpt的第一个输出似乎还可以,但它使用了>比较字符串。此外,哈希算法并未创建随机甚至结果。首先,我要求使用>比较来修复错误。然后,我希望它使用具有更好分布的哈希功能。

import { createHash } from 'crypto';

class HashRing {
  private nodes: string[];
  private ring: { [key: string]: string };
  private replicas: number;

  constructor(nodes: string[], replicas: number) {
    this.nodes = nodes;
    this.replicas = replicas;
    this.ring = {};

    // Create a hash for each node and add it to the ring
    for (const node of nodes) {
      for (let i = 0; i < replicas; i++) {
        const hash = createHash('sha1')
          .update(node + ':' + i)
          .digest('hex');
        this.ring[hash] = node;
      }
    }

    // Sort the keys in the ring to ensure that they are in clockwise order
    this.sortedKeys = Object.keys(this.ring).sort((a, b) =>
      a.localeCompare(b, undefined, { numeric: true })
    );
  }

  // Get the node responsible for a given key
  getNode(key: string): string {
    const hash = createHash('sha1')
      .update(key)
      .digest('hex');
    const keys = this.sortedKeys;

    // Find the first node with a position greater than the hash
    let index = keys.findIndex((k) => k.localeCompare(hash, undefined, { numeric: true }) > 0);

    // If no such node is found, the key belongs on the first node
    if (index === -1) {
      index = 0;
    }

    return this.ring[keys[index]];
  }
}

完成此完成,我将代码复制到hashring.ts中并导出了类。每当更新服务器列表时,将重新创建此类。然后,我们可以迭代所有连接的插座,如果不应该在此服务器上,请发送重定向消息。

与Hashring实施的完整对话:https://sharegpt.com/c/kxhBEp6

WebSocket服务器实现

接下来,我要求ChatGpt创建WebSocket服务器。它立即使用ws库创建了一个Hello World示例。

import * as WebSocket from 'ws';

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws: WebSocket) => {
  ws.on('message', (message: string) => {
    console.log(`Received message => ${message}`);
  });

  ws.send('Hello! Message From Server');
});

chatgpt倾向于不使用默认导入,但很容易修复。使用基本服务器设置,我请求ChatGpt添加用于基于hashring算法的客户的功能。

import * as crypto from 'crypto';

const serverList = ['server1', 'server2', 'server3'];

function getServer(clientIp: string) {
  const hash = crypto.createHash('md5').update(clientIp).digest('hex');
  const index = parseInt(hash, 16) % serverList.length;
  return serverList[index];
}

const clientIp = '192.168.0.1';
const server = getServer(clientIp);
console.log(`Redirecting client to ${server}`);

但是我的请求有点不清楚,因此输出具有一般想法,但缺少服务器和客户端交互。因此,我希望Chatgpt合并两个输出。在一些提示后,它创建了一个工作示例。

import * as WebSocket from 'ws';
import * as crypto from 'crypto';

const serverList = ['server1', 'server2', 'server3'];

function getServer(clientIp: string) {
  const hash = crypto.createHash('md5').update(clientIp).digest('hex');
  const index = parseInt(hash, 16) % serverList.length;
  return serverList[index];
}

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws: WebSocket) => {
  const clientIp = ws.upgradeReq.connection.remoteAddress;
  const server = getServer(clientIp);
  console.log(`Redirecting client ${clientIp} to ${server}`);

  ws.send(`You have been redirected to ${server}`);

  ws.on('message', (message: string) => {
    console.log(`Received message from ${clientIp} => ${message}`);
  });
});

示例看起来还不错,但是向客户发送纯文本消息不是很灵活。因此,我要求Chatgpt使用更可管理的消息主体。另外,它正在发送重定向消息,无论客户端是否在正确的服务器上,我都希望Chatgpt解决此问题。

import * as WebSocket from 'ws';
import * as crypto from 'crypto';

const serverList = ['server1', 'server2', 'server3'];

function getServer(clientIp: string) {
  const hash = crypto.createHash('md5').update(clientIp).digest('hex');
  const index = parseInt(hash, 16) % serverList.length;
  return serverList[index];
}

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws: WebSocket) => {
  const clientIp = ws.upgradeReq.connection.remoteAddress;
  const correctServer = getServer(clientIp);
  console.log(`Client ${clientIp} should be connected to ${correctServer}`);

  if (ws.upgradeReq.headers.host !== correctServer) {
    console.log(`Redirecting client ${clientIp} to ${correctServer}`);

    const message = {
      type: 'redirect',
      server: correctServer,
    };
    ws.send(JSON.stringify(message));

    ws.on('message', () => {});
  } else {
    console.log(`Client ${clientIp} is already connected to ${correctServer}`);

    ws.on('message', (message: string) => {
      console.log(`Received message from ${clientIp} => ${message}`);
    });
  }
});

瞧!这看起来很棒,但还有更多工作要做。例如,此示例中的服务器列表是静态的,我们需要用HashringgetNode方法手动替换getServer函数。一个了不起的旁注,我问changpt是否可以创建可以与该服务器进行交互的客户端示例。确实如此,这是一个非常聪明的工具,会让我很长时间。

与WebSocket服务器和WebSocket客户端实现的完整对话:https://sharegpt.com/c/pIc7amA

import * as WebSocket from 'ws';

let ws = new WebSocket('ws://localhost:8080');

ws.on('open', () => {
  console.log('Connected to WebSocket server');
});

ws.on('message', (data: string) => {
  const message = JSON.parse(data);
  if (message.type === 'redirect') {
    console.log(`Received redirect message. Disconnecting from current server and connecting to ${message.server}`);
    ws.close();
    ws = new WebSocket(`ws://${message.server}:8080`);
  } else {
    console.log(`Received message => ${data}`);
  }
});

ws.on('close', () => {
  console.log('Disconnected from WebSocket server');
});

要注意的另一个重要的事情是:chatgpt的代码示例并非完美无瑕,我认为它们本来不是。有时,它们很容易被IDE或人类引起人们的注意,但是有时Chatgpt会做一些非常不寻常的事情,并且无法以任何方式起作用。在这种情况下,您总是可以要求它修复自己的代码,令我惊讶的是,它也做得很好!

实施服务注册表

要创建一个hashring,我们需要所有可用服务器的地址。这听起来很简单,但是在服务器上实现此类功能确实很难。您必须考虑不超出本文范围的故障场景和更新通知等,但是作为一个简单的解决方案,我准备使用第三方服务注册表解决方案。问题是,我以前从未使用过服务注册表。

首先,我要求chatgpt在打字稿中创建服务注册表实现。它使用REDIS创建了一个解决方案,但是代码对我来说看起来太复杂了,我不想使用它。相反

**我:**您可以比较尤里卡,Zookeeper和领事

** chatgpt:**当然!

  • eureka,Zookeeper和领事都是可以在分布式环境中用于服务发现的工具。

  • eureka是一项基于REST的服务,主要用于Java生态系统,用于定位服务,以实现中间层服务器的负载平衡和故障转移。

  • Apache Zookeeper是一种分布式协调服务,可实现分布式应用程序以实现高可用性。 Zookeeper通过为他们提供存储和访问共享数据的中心位置来帮助协调分布式应用程序。

  • 领事是一种分布式,高度可用和数据中心的工具,可用于服务发现,配置和编排。它是用GO编程语言编写的,并提供了用于与之交互的HTTP API。

  • 总而言之,Eureka主要用于Java生态系统,Zookeeper是一种分布式协调服务,领事是一种可用于服务发现,配置和编排的工具。

然后,我询问了使用每个工具的简单示例。最后,Consul似乎是一个很好的解决方案,作为额外的奖励,它是在Go中写的,所以我尝试了一下。

kude7来自chatgpt的实现不起作用。即使我要求chatgpt修复代码,但这是不对的。我不想调试和修复它,但是我想从chatgpt使用尽可能多的代码。问题很可能是由于NPM软件包的破坏变化(ChatGpt没有有关所有内容的最新信息)。

那么,我做了什么?我回到了第一方,要求重新实施,但是这次我要求提供服务发现而不是服务注册表。看来,我在要求一些稍有不同的东西-related SO post

import Redis from 'ioredis';

const redis = new Redis({ host: '127.0.0.1' });
const updateInterval = 5; // in seconds
const heartbeatInterval = 10; // in seconds

const serviceList: Set<string> = new Set();
const heartbeatIntervals: { [instance: string]: NodeJS.Timeout } = {};

async function registerServiceInstance(instance: string) {
  await redis.sadd('service-name', instance);
  await redis.set(instance, 'alive', 'EX', heartbeatInterval * 2);
  heartbeatIntervals[instance] = setInterval(() => {
    redis.set(instance, 'alive', 'EX', heartbeatInterval * 2);
  }, heartbeatInterval * 1000);
}

async function deregisterServiceInstance(instance: string) {
  await redis.srem('service-name', instance);
  await redis.del(instance);
  clearInterval(heartbeatIntervals[instance]);
  delete heartbeatIntervals[instance];
}

async function removeStaleServiceInstances() {
  const instances = await redis.smembers('service-name');
  for (const instance of instances) {
    const ttl = await redis.ttl(instance);
    if (ttl === -2 || ttl === -1) {
      await redis.srem('service-name', instance);
    }
  }
}

async function updateServiceList() {
  const instances = await redis.smembers('service-name');
  serviceList.clear();
  for (const instance of instances) {
    serviceList.add(instance);
  }
}

setInterval(removeStaleServiceInstances, updateInterval * 1000);
setInterval(updateServiceList, updateInterval * 1000);

除了原始服务发现实现外,我还要求自动更新和删除过时的服务。而且它在创建它们方面也做得很好。

与服务发现实现的完整对话:https://sharegpt.com/c/bGUAItM

将所有内容放在一起:可扩展的Websocket服务器

到目前为止,我们已经实施:

  • hashring,用于确定用户属于哪个服务器,
  • 服务发现要保留可用服务器列表,
  • WebSocket服务器供客户端连接,
  • WebSocket API,可在必要时重新路由客户端,
  • WebSocket客户端连接我们的服务器

在Chatgpt的帮助下。我们当然可以自己实施这些,但是获得帮助并加快流程没有错。据您所知,您的要求,您可以按预期验证代码的工作,ChatGPT只会加快我们的开发时间。另外,与之合作也很有趣。

现在我们手里拿着所有的碎片,我们可以将它们粘在一起! GitHub存储库与最终项目和Chatgpt对话:https://github.com/nooptoday/chatgpt-scalable-websockets

结果

您可以克隆存储库并亲自测试!这是运行实例的终端输出:

node1              | [node1] - [client_size] -> [1884]
node2              | [node2] - [client_size] -> [2237]
node3              | [node3] - [client_size] -> [1879]

6000个连接最初将发送到Node1,并且客户端被重定向到其他节点。在理想的世界中,我们希望看到类似的东西:

node1              | [node1] - [client_size] -> [2000]
node2              | [node2] - [client_size] -> [2000]
node3              | [node3] - [client_size] -> [2000]

您可以使用数量的虚拟节点,即要使用的哈希函数或从客户端到哈希的哪个参数,并查看结果如何变化。

如果您对实现详细信息有任何疑问,则可以在注释中询问或在GitHub存储库中创建问题。如果您在问,为什么需要实施这种解决方案,您可以阅读导致该实现的先前帖子:Why Websockets are Hard to Scale

就是这篇文章,希望您喜欢并学到了新知识,让我知道您在评论中的想法!