GO服务器范围事件 - 向特定客户端发送消息的指南?
#编程 #教程 #go #solidjs

问题

我目前已分配为我的Side Project应用程序实现通知功能的任务。每当用户对其他用户的帖子发表评论或反应时,就会生成事件。随后,服务器发送一条消息,以通知所有者有关此事件的帖子。 tldr :您可以找到最终代码here

在探索了互联网上的许多解决方案后,我发现我们可以使用长期投票,Websockets或服务器量事件来实现目标。但是,由于服务器仅与客户端进行交互并需要实时通知,因此无需使用长池和WebSocket。

入门

根据Wikipedia:

服务器sent事件(SSE)是一种服务器推动技术,使客户端通过HTTP连接从服务器接收自动更新,并描述服务器在建立初始客户端连接后如何向客户端启动数据传输。 /p>

说话很便宜,让我们通过编码此通知服务器来弄脏我们的手。

// main.go
package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

func sseHandler(w http.ResponseWriter, r *http.Request) {
    fmt.Println("Client connected")
    w.Header().Set("Access-Control-Allow-Origin", "*") // must have since it enable cors
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    flusher, ok := w.(http.Flusher)
    if !ok {
        fmt.Println("Could not init http.Flusher")
    }

    for {
        select {
        case <-r.Context().Done():
            fmt.Println("Connection closed")
            return
        default:
            fmt.Println("case message... sending message")
            fmt.Fprintf(w, "data: Ping\n\n")
            flusher.Flush()
            time.Sleep(5 * time.Second)
        }
    }
}

func main() {
    router := http.NewServeMux()
    router.HandleFunc("/sse", sseHandler)
    log.Fatal(http.ListenAndServe(":3500", router))
}

您可以使用curl命令进行快速测试

curl http://localhost:3500/event

您应该看到与此类似的东西:

data: Ping

data: Ping
...

在上面的代码中,我们定义了一个将请求处理到/sse端点的HTTP处理程序sseHandler。在处理程序内部,我们为SSE设置了适当的标题,并每5秒钟将事件发送给客户。
标题器起着至关重要的作用,因为客户依靠它来建立连接。为了更好地了解标题的重要性,您可以通过禁用其中一个并观察其影响客户连接的方式来进行实验(扰流板警报:Content-Type: text/event-stream似乎会影响连接机构)。

对于客户端,我们将利用您选择的框架甚至纯/香草JavaScript生成一个前端项目。

npx degit solidjs/templates/ts sse-client
cd sse-client
# yarn, npm i, pnpm i to install dependencies
import { Component, createEffect, createSignal, onCleanup } from "solid-js";

const App: Component = () => {
  const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
  createEffect(() => {
    const ev = new EventSource("http://localhost:3500/sse");
    ev.onmessage = (e) => {
      console.log({ e });
    };
    setEventSource(ev);
  });
  onCleanup(() => {
    eventSource()?.close();
  });
  return <main>Server-sent events</main>;
};

export default App;

访问URL http://localhost:3000,这样做时,您应该看到与提供的GIF类似的东西:

Console gif

就是这样,我们已经成功创建了一个SSE端点,使用JavaScript建立了与服务器的连接。但是,您可能会问:“好的,但是我该如何实施通知”。你还记得我们的要求吗?

每当用户对其他用户的帖子发表评论或反应时,就会生成事件。随后,服务器发送一条消息,以通知有关此事件的帖子所有者。

我们已经完全实现了the server sends message to notify部分。因此,我们只剩下一个单独的任务,那就是在用户启动事件时派遣消息。我们将通过利用GO通道来执行此功能。

触发事件时冲洗消息

我们需要在功能之间利用有效的通信方法,这就是Go Channel Shine。我们的计划将使用全局商店为每个SSE连接创建并存储一个频道。此外,我们将介绍一个新的端点/time,以触发事件。当用户调用此端点时,我们将把消息广播到 all 商店中的频道。

var CHANNEL_STORE []*chan string = make([]*chan string, 0)

func removeChannel(ch *chan string) {
    pos := -1
    storeLen := len(CHANNEL_STORE)
    for i, msgChan := range CHANNEL_STORE {
        if ch == msgChan {
            pos = i
        }
    }

    if pos == -1 {
        return
    }
    CHANNEL_STORE[pos] = CHANNEL_STORE[storeLen-1]
    CHANNEL_STORE = CHANNEL_STORE[:storeLen-1]
    fmt.Println("Connection remains: ", len(CHANNEL_STORE))
}

func broadcast(msg string) {
    for _, ch := range CHANNEL_STORE {
        *ch <- msg
    }
}

func getTime(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Access-Control-Allow-Origin", "*")

    msg := time.Now().Format("15:04:05")
        broadcast(msg)
}

func sseHandler(w http.ResponseWriter, r *http.Request) {
    ch := make(chan string)
    CHANNEL_STORE = append(CHANNEL_STORE, &ch)

    fmt.Println("Client connected: ", len(CHANNEL_STORE))
    w.Header().Set("Access-Control-Allow-Origin", "*")
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    defer func() {
        close(ch)
        removeChannel(&ch)
        fmt.Println("Client closed connection")
    }()

    flusher, ok := w.(http.Flusher)
    if !ok {
        fmt.Println("Could not init http.Flusher")
    }

    for {
        select {
        case message := <-ch:
            fmt.Println("case message... sending message")
            fmt.Println(message)
            fmt.Fprintf(w, "data: %s\n\n", message)
            flusher.Flush()
        case <-r.Context().Done():
            fmt.Println("Client closed connection")
            return
        }
    }
}

func main() {
    router := http.NewServeMux()

    router.HandleFunc("/sse", sseHandler)
    router.HandleFunc("/time", getTime)

    log.Fatal(http.ListenAndServe(":3500", router))
}

最后但并非最不重要的一点是,我们需要更新我们的客户端代码。

import { Component, createEffect, createSignal, onCleanup } from "solid-js";

const App: Component = () => {
  const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
  const [time, setTime] = createSignal<string>("");

  createEffect(() => {
    const ev = new EventSource("http://localhost:3500/sse");
    ev.onmessage = (e) => {
      console.log({ e });

      setTime(e.data);
    };

    setEventSource(ev);
  });

  async function handleGetTime() {
    const res = await fetch("http://localhost:3500/time");
    if (res.status !== 200) {
      console.log("Could not connect to the server");
    } else {
      console.log("OK");
    }
  }

  onCleanup(() => {
    eventSource()?.close();
  });

  return (
    <main>
      Time: {time()}
      <button onClick={handleGetTime}>Get time</button>
    </main>
  );
};

export default App;

您的结果应该类似于此gif:

End result

您可以看到,我已经在演示中启动了两个浏览器选项卡,并在触发/time/端点时。两个选项卡立即收到更新的时间。这是通过将消息广播到所有渠道来实现的。我们可以通过使用GO map来进一步增强实现,将用户身份作为密钥和频道的片段将作为相应的值,从而实现目标消息广播。

// main.go
type SSEConn struct {
    mu      sync.Mutex
    clients map[string][]chan string
}

func NewSSEConn() *SSEConn {
    return &SSEConn{clients: make(map[string][]chan string)}
}

func (p *SSEConn) addClient(id string) *chan string {
    p.mu.Lock()
    defer func() {
        fmt.Println("Clients in add: ", p.clients)
        for k, v := range p.clients {
            fmt.Printf("Key: %s, value: %d\n", k, len(v))
            fmt.Println("Channels from id=", id, v)
        }
        p.mu.Unlock()
    }()

    c, ok := p.clients[id]
    if !ok {
        client := []chan string{make(chan string)}
        p.clients[id] = client
        return &client[0]
    }

    newCh := make(chan string)
    p.clients[id] = append(c, newCh)
    return &newCh
}

func (p *SSEConn) removeClient(id string, conn chan string) {
    p.mu.Lock()
    defer func() {
        fmt.Println("Clients in remove: ", p.clients)
        for k, v := range p.clients {
            fmt.Printf("Key: %s, value: %d", k, len(v))
        }
        p.mu.Unlock()
    }()

    c, ok := p.clients[id]
    if !ok {
        return
    }

    pos := -1

    for i, ch := range c {
        if ch == conn {
            pos = i
        }
    }

    if pos == -1 {
        return
    }

    close(c[pos])
    c = append(c[:pos], c[pos+1:]...)
    if pos == 0 {
        delete(p.clients, id)
    }
}

func (p *SSEConn) broadcast(id string, data, event string) {
    p.mu.Lock()
    defer p.mu.Unlock()

    c, ok := p.clients[id]
    if !ok {
        return
    }

    for _, ch := range c {
        ch <- fmt.Sprintf("event: %s\ndata: %s\n\n", event, data)
    }
}

在提供的代码段中,我定义了SSEConn struct并实现了addClientremoveClientbroadcast等方法。 SSEConn由两个字段组成:clientsmu(Mutex)。 mu领域在预防比赛条件中起着至关重要的作用,您可以阅读有关该here的更多信息。 clients字段存储客户端的ID及其相应频道。另外,请记住更新处理程序实现以接受用户ID很重要。

var sseConn = NewSSEConn()

func getTime(w http.ResponseWriter, r *http.Request) {
    id := strings.TrimPrefix(r.URL.Path, "/time/")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    msg := time.Now().Format("15:04:05")
    sseConn.broadcast(id, msg, "timeEvent")
}

func sseHandler(w http.ResponseWriter, r *http.Request) {
    id := strings.TrimPrefix(r.URL.Path, "/sse/")
    ch := sseConn.addClient(id)

    w.Header().Set("Access-Control-Allow-Origin", "*")
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    defer sseConn.removeClient(id, *ch)

    flusher, ok := w.(http.Flusher)
    if !ok {
        fmt.Println("Could not init http.Flusher")
    }

    for {
        select {
        case message := <-*ch:
            fmt.Println("case message... sending message")
            fmt.Println(message)
            fmt.Fprintf(w, message)
            flusher.Flush()
        case <-r.Context().Done():
            fmt.Println("Client closed connection")
            return
        }
    }
}

func main() {
    router := http.NewServeMux()

    router.HandleFunc("/sse/", sseHandler)
    router.HandleFunc("/time/", getTime)

    log.Fatal(http.ListenAndServe(":3500", router))
}

最终客户码:

import {
  Component,
  createEffect,
  createSignal,
  JSX,
  onCleanup,
} from "solid-js";

const App: Component = () => {
  const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
  const [time, setTime] = createSignal("");
  const [id, setId] = createSignal("");

  async function handleGetTime() {
    const res = await fetch(`http://localhost:3500/time/${id()}`);
    if (res.status !== 200) {
      console.log("Could not connect to the server");
    } else {
      console.log("OK");
    }
  }

  function handleChange(e: InputEvent) {
    setId((e.currentTarget as HTMLInputElement).value);
  }

  function handleConnect() {
    const ev = new EventSource(`http://localhost:3500/sse/${id()}`);
    ev.addEventListener("timeEvent", (e) => {
      console.log({ event: e.type });
      console.log({ data: e.data });

      setTime(e.data);
    });

    setEventSource(ev);
  }

  onCleanup(() => {
    eventSource()?.close();
  });

  return (
    <main>
      Time: {time()}
      <button onClick={handleGetTime}>Get time</button>
      <input type="text" onInput={handleChange} value={id()} />
      <button onClick={handleConnect}>Connect</button>
    </main>
  );
};

export default App;

最终结果:

Final result

在本文结尾处,您已经了解了SSE,GO频道以及使用GO的通知功能的实现。我相信您发现这篇文章愉快而有用。 PHEW,考虑到文章的长度,我将在这里结束。