问题
我目前已分配为我的Side Project应用程序实现通知功能的任务。每当用户对其他用户的帖子发表评论或反应时,就会生成事件。随后,服务器发送一条消息,以通知所有者有关此事件的帖子。 tldr :您可以找到最终代码here
在探索了互联网上的许多解决方案后,我发现我们可以使用长期投票,Websockets或服务器量事件来实现目标。但是,由于服务器仅与客户端进行交互并需要实时通知,因此无需使用长池和WebSocket。
入门
根据Wikipedia:
服务器sent事件(SSE)是一种服务器推动技术,使客户端通过HTTP连接从服务器接收自动更新,并描述服务器在建立初始客户端连接后如何向客户端启动数据传输。 /p>
说话很便宜,让我们通过编码此通知服务器来弄脏我们的手。
// main.go
package main
import (
"fmt"
"log"
"net/http"
"time"
)
func sseHandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("Client connected")
w.Header().Set("Access-Control-Allow-Origin", "*") // must have since it enable cors
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Println("Could not init http.Flusher")
}
for {
select {
case <-r.Context().Done():
fmt.Println("Connection closed")
return
default:
fmt.Println("case message... sending message")
fmt.Fprintf(w, "data: Ping\n\n")
flusher.Flush()
time.Sleep(5 * time.Second)
}
}
}
func main() {
router := http.NewServeMux()
router.HandleFunc("/sse", sseHandler)
log.Fatal(http.ListenAndServe(":3500", router))
}
您可以使用curl
命令进行快速测试
curl http://localhost:3500/event
您应该看到与此类似的东西:
data: Ping
data: Ping
...
在上面的代码中,我们定义了一个将请求处理到/sse
端点的HTTP处理程序sseHandler
。在处理程序内部,我们为SSE设置了适当的标题,并每5秒钟将事件发送给客户。
标题器起着至关重要的作用,因为客户依靠它来建立连接。为了更好地了解标题的重要性,您可以通过禁用其中一个并观察其影响客户连接的方式来进行实验(扰流板警报:Content-Type: text/event-stream
似乎会影响连接机构)。
对于客户端,我们将利用您选择的框架甚至纯/香草JavaScript生成一个前端项目。
npx degit solidjs/templates/ts sse-client
cd sse-client
# yarn, npm i, pnpm i to install dependencies
import { Component, createEffect, createSignal, onCleanup } from "solid-js";
const App: Component = () => {
const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
createEffect(() => {
const ev = new EventSource("http://localhost:3500/sse");
ev.onmessage = (e) => {
console.log({ e });
};
setEventSource(ev);
});
onCleanup(() => {
eventSource()?.close();
});
return <main>Server-sent events</main>;
};
export default App;
访问URL http://localhost:3000,这样做时,您应该看到与提供的GIF类似的东西:
就是这样,我们已经成功创建了一个SSE端点,使用JavaScript
建立了与服务器的连接。但是,您可能会问:“好的,但是我该如何实施通知”。你还记得我们的要求吗?
每当用户对其他用户的帖子发表评论或反应时,就会生成事件。随后,服务器发送一条消息,以通知有关此事件的帖子所有者。
我们已经完全实现了the server sends message to notify
部分。因此,我们只剩下一个单独的任务,那就是在用户启动事件时派遣消息。我们将通过利用GO通道来执行此功能。
触发事件时冲洗消息
我们需要在功能之间利用有效的通信方法,这就是Go Channel Shine。我们的计划将使用全局商店为每个SSE连接创建并存储一个频道。此外,我们将介绍一个新的端点/time
,以触发事件。当用户调用此端点时,我们将把消息广播到 all 商店中的频道。
var CHANNEL_STORE []*chan string = make([]*chan string, 0)
func removeChannel(ch *chan string) {
pos := -1
storeLen := len(CHANNEL_STORE)
for i, msgChan := range CHANNEL_STORE {
if ch == msgChan {
pos = i
}
}
if pos == -1 {
return
}
CHANNEL_STORE[pos] = CHANNEL_STORE[storeLen-1]
CHANNEL_STORE = CHANNEL_STORE[:storeLen-1]
fmt.Println("Connection remains: ", len(CHANNEL_STORE))
}
func broadcast(msg string) {
for _, ch := range CHANNEL_STORE {
*ch <- msg
}
}
func getTime(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
msg := time.Now().Format("15:04:05")
broadcast(msg)
}
func sseHandler(w http.ResponseWriter, r *http.Request) {
ch := make(chan string)
CHANNEL_STORE = append(CHANNEL_STORE, &ch)
fmt.Println("Client connected: ", len(CHANNEL_STORE))
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
defer func() {
close(ch)
removeChannel(&ch)
fmt.Println("Client closed connection")
}()
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Println("Could not init http.Flusher")
}
for {
select {
case message := <-ch:
fmt.Println("case message... sending message")
fmt.Println(message)
fmt.Fprintf(w, "data: %s\n\n", message)
flusher.Flush()
case <-r.Context().Done():
fmt.Println("Client closed connection")
return
}
}
}
func main() {
router := http.NewServeMux()
router.HandleFunc("/sse", sseHandler)
router.HandleFunc("/time", getTime)
log.Fatal(http.ListenAndServe(":3500", router))
}
最后但并非最不重要的一点是,我们需要更新我们的客户端代码。
import { Component, createEffect, createSignal, onCleanup } from "solid-js";
const App: Component = () => {
const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
const [time, setTime] = createSignal<string>("");
createEffect(() => {
const ev = new EventSource("http://localhost:3500/sse");
ev.onmessage = (e) => {
console.log({ e });
setTime(e.data);
};
setEventSource(ev);
});
async function handleGetTime() {
const res = await fetch("http://localhost:3500/time");
if (res.status !== 200) {
console.log("Could not connect to the server");
} else {
console.log("OK");
}
}
onCleanup(() => {
eventSource()?.close();
});
return (
<main>
Time: {time()}
<button onClick={handleGetTime}>Get time</button>
</main>
);
};
export default App;
您的结果应该类似于此gif:
您可以看到,我已经在演示中启动了两个浏览器选项卡,并在触发/time/
端点时。两个选项卡立即收到更新的时间。这是通过将消息广播到所有渠道来实现的。我们可以通过使用GO map
来进一步增强实现,将用户身份作为密钥和频道的片段将作为相应的值,从而实现目标消息广播。
// main.go
type SSEConn struct {
mu sync.Mutex
clients map[string][]chan string
}
func NewSSEConn() *SSEConn {
return &SSEConn{clients: make(map[string][]chan string)}
}
func (p *SSEConn) addClient(id string) *chan string {
p.mu.Lock()
defer func() {
fmt.Println("Clients in add: ", p.clients)
for k, v := range p.clients {
fmt.Printf("Key: %s, value: %d\n", k, len(v))
fmt.Println("Channels from id=", id, v)
}
p.mu.Unlock()
}()
c, ok := p.clients[id]
if !ok {
client := []chan string{make(chan string)}
p.clients[id] = client
return &client[0]
}
newCh := make(chan string)
p.clients[id] = append(c, newCh)
return &newCh
}
func (p *SSEConn) removeClient(id string, conn chan string) {
p.mu.Lock()
defer func() {
fmt.Println("Clients in remove: ", p.clients)
for k, v := range p.clients {
fmt.Printf("Key: %s, value: %d", k, len(v))
}
p.mu.Unlock()
}()
c, ok := p.clients[id]
if !ok {
return
}
pos := -1
for i, ch := range c {
if ch == conn {
pos = i
}
}
if pos == -1 {
return
}
close(c[pos])
c = append(c[:pos], c[pos+1:]...)
if pos == 0 {
delete(p.clients, id)
}
}
func (p *SSEConn) broadcast(id string, data, event string) {
p.mu.Lock()
defer p.mu.Unlock()
c, ok := p.clients[id]
if !ok {
return
}
for _, ch := range c {
ch <- fmt.Sprintf("event: %s\ndata: %s\n\n", event, data)
}
}
在提供的代码段中,我定义了SSEConn
struct并实现了addClient
,removeClient
和broadcast
等方法。 SSEConn
由两个字段组成:clients
和mu
(Mutex)。 mu
领域在预防比赛条件中起着至关重要的作用,您可以阅读有关该here的更多信息。 clients
字段存储客户端的ID及其相应频道。另外,请记住更新处理程序实现以接受用户ID很重要。
var sseConn = NewSSEConn()
func getTime(w http.ResponseWriter, r *http.Request) {
id := strings.TrimPrefix(r.URL.Path, "/time/")
w.Header().Set("Access-Control-Allow-Origin", "*")
msg := time.Now().Format("15:04:05")
sseConn.broadcast(id, msg, "timeEvent")
}
func sseHandler(w http.ResponseWriter, r *http.Request) {
id := strings.TrimPrefix(r.URL.Path, "/sse/")
ch := sseConn.addClient(id)
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
defer sseConn.removeClient(id, *ch)
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Println("Could not init http.Flusher")
}
for {
select {
case message := <-*ch:
fmt.Println("case message... sending message")
fmt.Println(message)
fmt.Fprintf(w, message)
flusher.Flush()
case <-r.Context().Done():
fmt.Println("Client closed connection")
return
}
}
}
func main() {
router := http.NewServeMux()
router.HandleFunc("/sse/", sseHandler)
router.HandleFunc("/time/", getTime)
log.Fatal(http.ListenAndServe(":3500", router))
}
最终客户码:
import {
Component,
createEffect,
createSignal,
JSX,
onCleanup,
} from "solid-js";
const App: Component = () => {
const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
const [time, setTime] = createSignal("");
const [id, setId] = createSignal("");
async function handleGetTime() {
const res = await fetch(`http://localhost:3500/time/${id()}`);
if (res.status !== 200) {
console.log("Could not connect to the server");
} else {
console.log("OK");
}
}
function handleChange(e: InputEvent) {
setId((e.currentTarget as HTMLInputElement).value);
}
function handleConnect() {
const ev = new EventSource(`http://localhost:3500/sse/${id()}`);
ev.addEventListener("timeEvent", (e) => {
console.log({ event: e.type });
console.log({ data: e.data });
setTime(e.data);
});
setEventSource(ev);
}
onCleanup(() => {
eventSource()?.close();
});
return (
<main>
Time: {time()}
<button onClick={handleGetTime}>Get time</button>
<input type="text" onInput={handleChange} value={id()} />
<button onClick={handleConnect}>Connect</button>
</main>
);
};
export default App;
最终结果:
在本文结尾处,您已经了解了SSE,GO频道以及使用GO的通知功能的实现。我相信您发现这篇文章愉快而有用。 PHEW,考虑到文章的长度,我将在这里结束。