<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>
<6.Design and Implementation of Zinx Multi-Router Mode>
<7. Building Zinx's Read-Write Separation Model>
<8.Zinx Message Queue and Task Worker Pool Design and Implementation>
<9. Zinx Connection Management and Property Setting>
在本章中,我们将向ZINX框架添加连接限制。如果客户端连接的数量超过一定阈值,Zinx将拒绝新的连接请求,以确保对现有连接的及时响应。此外,我们将向Zinx介绍连接属性,使开发人员可以将特定于业务的参数与连接相关联,以便于在业务处理过程中访问。
9.1连接管理
我们需要为Zinx创建一个连接管理模块,该模块由抽象层和实现层组成。我们将首先在位于zinx/ziface
目录中的iconnmanager.go
文件中实现抽象层:
// zinx/ziface/iconnmanager.go
package ziface
/*
Connection management abstract layer
*/
type IConnManager interface {
Add(conn IConnection) // Add a connection
Remove(conn IConnection) // Remove a connection
Get(connID uint32) (IConnection, error) // Get a connection using the connection ID
Len() int // Get the current number of connections
ClearConn() // Remove and stop all connections
}
IConnManager
接口定义以下方法:
-
Add
:添加连接到连接管理器。 -
Remove
:从连接管理器中删除连接。这不会关闭连接;它只是将其从管理层中删除。 -
Get
:根据连接ID检索连接对象。 -
Len
:获得连接管理器管理的连接总数。 -
ClearConn
:从经理中删除所有连接并关闭他们。</li>
接下来,我们将在zinx/znet
目录中的connmanager.go
文件中为IconnManager创建实现层:
// zinx/znet/connmanager.go
package znet
import (
"errors"
"fmt"
"sync"
"zinx/ziface"
)
/*
Connection manager module
*/
type ConnManager struct {
connections map[uint32]ziface.IConnection // Map to hold connection information
connLock sync.RWMutex // Read-write lock for concurrent access to the map
}
ConnManager
struct包含一个存储所有连接信息的connections
映射。关键是连接ID,值是连接本身。 connLock
是用于保护对地图的并发访问的读写锁。
ConnManager
的构造函数使用make
函数初始化地图:
// zinx/znet/connmanager.go
/*
Create a connection manager
*/
func NewConnManager() *ConnManager {
return &ConnManager{
connections: make(map[uint32]ziface.IConnection),
}
}
添加到经理的连接的Add
方法如下实现:
// zinx/znet/connmanager.go
// Add a connection
func (connMgr *ConnManager) Add(conn ziface.IConnection) {
// Protect shared resource (map) with a write lock
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
// Add the connection to ConnManager
connMgr.connections[conn.GetConnID()] = conn
fmt.Println("Connection added to ConnManager successfully: conn num =", connMgr.Len())
}
由于GO的标准库映射不是线程安全,因此我们需要使用锁来保护并发写操作。在这里,我们使用写锁(connLock
)来确保在修改地图时相互排除。
删除从管理器的连接的Remove
方法如下实现:
// zinx/znet/connmanager.go
// Remove a connection
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {
// Protect shared resource (map) with a write lock
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
// Remove the connection
delete(connMgr.connections, conn.GetConnID())
fmt.Println("Connection removed: ConnID =", conn.GetConnID(), "successfully: conn num =", connMgr.Len())
}
Remove
方法只需在不停止连接的业务处理的情况下从地图上删除连接。
Get
和Len
方法的实现如下:
// zinx/znet/connmanager.go
// Get a connection using the connection ID
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
// Protect shared resource (map) with a read lock
connMgr.connLock.RLock()
defer connMgr.connLock.RUnlock()
if conn, ok := connMgr.connections[connID]; ok {
return conn, nil
} else {
return nil, errors.New("connection not found")
}
}
// Get the current number of connections
func (connMgr *ConnManager) Len() int {
return len(connMgr.connections)
}
Get
方法使用读取锁(connLock.RLock()
)允许同时读取对地图的访问,从而确保数据一致性。如果找到与给定ID的连接,则将其返回;否则,返回错误。
ClearConn
方法的实现如下:
// zinx/znet/connmanager.go
// Remove and stop all connections
func (connMgr *ConnManager) ClearConn() {
// Protect shared resource (map) with a write lock
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
// Stop and remove all connections
for connID, conn := range connMgr.connections {
// Stop the connection
conn.Stop()
// Remove the connection
delete(connMgr.connections, connID)
}
fmt.Println("All connections cleared successfully: conn num =", connMgr.Len())
}
ClearConn
方法通过调用conn.Stop()
来停止每个连接的业务处理,然后删除地图上的所有连接。
9.1.2将连接管理模块集成到Zinx
1.将Connmanager添加到服务器
我们需要将ConnManager
添加到Server
struct中,并在服务器的构造函数中初始化它。 zinx/znet/server.go
文件中的Server
结构将具有一个名为ConnMgr
的新成员属性,该属性是ziface.IConnManager
类型:
// zinx/znet/server.go
// Server is a server service class implementing the IServer interface
type Server struct {
// Server name
Name string
// IP version (e.g., tcp4 or other)
IPVersion string
// IP address to bind the server to
IP string
// Port to bind the server to
Port int
// Message handler for binding MsgID and corresponding processing methods
MsgHandler ziface.IMsgHandle
// Connection manager for the server
ConnMgr ziface.IConnManager
}
在服务器的构造函数NewServer()
中,我们需要初始化ConnMgr
:
// zinx/znet/server.go
// NewServer creates a server instance
func NewServer() ziface.IServer {
utils.GlobalObject.Reload()
s := &Server{
Name: utils.GlobalObject.Name,
IPVersion: "tcp4",
IP: utils.GlobalObject.Host,
Port: utils.GlobalObject.TcpPort,
MsgHandler: NewMsgHandle(),
ConnMgr: NewConnManager(), // Create a ConnManager
}
return s
}
NewServer()
函数创建一个新的服务器实例并初始化了ConnMgr
属性。
要从服务器提供对ConnMgr
的访问,我们需要在zinx/ziface/iserver.go
文件中的IServer
Interface中添加方法GetConnMgr()
:
// zinx/ziface/iserver.go
type IServer interface {
// Start the server
Start()
// Stop the server
Stop()
// Serve the business services
Serve()
// Register a router business method for the current service, used by client connection processing
AddRouter(msgID uint32, router IRouter)
// Get the connection manager
GetConnMgr() IConnManager
}
GetConnMgr()
方法应返回服务器的ConnMgr
属性:
// zinx/znet/server.go
// GetConnMgr returns the connection manager
func (s *Server) GetConnMgr() ziface.IConnManager {
return s.ConnMgr
}
通过实现GetConnMgr()
方法,我们提供了一种从服务器访问连接管理器的方法。
由于连接(Connection
)有时需要访问服务器中的连接管理器(ConnMgr
)(Server
),因此我们需要在Server
和Connection
对象之间建立相互参考关系。在Connection
结构中,我们将添加一个名为TcpServer
的成员,该成员表示当前连接所属的服务器。将TcpServer
成员添加到Connection
struct中的zinx/znet/connection.go
文件中:
// zinx/znet/connection.go
type Connection struct {
// The server to which the current connection belongs
TcpServer ziface.IServer // Add this line to indicate the server to which the connection belongs
// The TCP socket of the current connection
Conn *net.TCPConn
// The ID of the current connection (also known as SessionID, globally unique)
ConnID uint32
// The closing state of the current connection
isClosed bool
// The message handler that manages MsgID and corresponding processing methods
MsgHandler ziface.IMsgHandle
// The channel that informs that the connection has exited/stopped
ExitBuffChan chan bool
// The unbuffered channel used for message communication between the reading and writing goroutines
msgChan chan []byte
// The buffered channel used for message communication between the reading and writing goroutines
msgBuffChan chan []byte
}
通过将TcpServer
成员添加到Connection
struct,我们建立了对连接所属服务器的引用。 TcpServer
属性是类型ziface.IServer
。
2.将连接添加到连接管理器
初始化连接时,我们需要将连接添加到服务器的连接管理器中。在zinx/znet/connection.go
文件中,修改NewConnection()
函数以包括服务器对象:
// zinx/znet/connection.go
// NewConnection creates a connection
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
c := &Connection{
TcpServer: server, // Set the server object
Conn: conn,
ConnID: connID,
isClosed: false,
MsgHandler: msgHandler,
ExitBuffChan: make(chan bool, 1),
msgChan: make(chan []byte),
msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
}
// Add the newly created connection to the connection manager
c.TcpServer.GetConnMgr().Add(c)
return c
}
在NewConnection()
函数中,我们将服务器对象作为参数传递,并将其设置在连接对象的TcpServer
属性中。然后,我们使用c.TcpServer.GetConnMgr().Add(c)
添加连接到连接管理器。
3.检查服务器中的连接计数
在服务器的Start()
方法中,在与客户端建立成功的连接后,我们可以检查连接数量并终止连接创建,如果它超过了最大连接数量。在zinx/znet/server.go
文件中修改Start()
方法如下:
// zinx/znet/server.go
// Start the network service
func (s *Server) Start() {
// ... (omitted code)
// Start a goroutine to handle the server listener
go func() {
// ... (omitted code)
// Start the server network connection business
for {
// 1. Block and wait for client connection requests
// ... (omitted code)
// 2. Set the maximum connection limit for the server
// If the limit is exceeded, close the new connection
if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {
conn.Close()
continue
}
// ... (omitted code)
}
}()
}
在服务器的Start()
方法中,我们使用s.ConnMgr.Len()
检查连接数,并将其与最大连接限制进行比较。如果达到限制,我们关闭新连接(conn.Close()
),然后继续进行下一个迭代。
开发人员可以使用MaxConn
属性在配置文件zinx.json
或GlobalObject
全局配置中定义最大连接计数。
4.删除连接
关闭连接时,应将其从Connmanager中删除。在连接结构的stop()方法中,我们添加了Connmanager的删除动作。修改Zinx/Znet/connection的stop()方法,如下所示:
// zinx/znet/connection.go
func (c *Connection) Stop() {
fmt.Println("Conn Stop()... ConnID =", c.ConnID)
if c.isClosed == true {
return
}
c.isClosed = true
c.Conn.Close()
c.ExitBuffChan <- true
// Remove the connection from the ConnManager
c.TcpServer.GetConnMgr().Remove(c)
close(c.ExitBuffChan)
close(c.msgBuffChan)
}
在Stop()
方法中,关闭连接后,我们使用c.TcpServer.GetConnMgr().Remove(c)
从Connmanager删除连接。
此外,在stop()方法中停止服务器时,我们也需要清除所有连接:
// zinx/znet/server.go
func (s *Server) Stop() {
fmt.Println("[STOP] Zinx server, name", s.Name)
// Stop or clean up other necessary connection information or other information
s.ConnMgr.ClearConn()
}
在Stop()
方法中,我们致电s.ConnMgr.ClearConn()
停止并删除ConnManager
的所有连接。
使用上述代码,我们已成功将连接管理集成到Zinx。
9.1.3连接缓冲消息发送方法
以前,为Connection
结构提供了一种称为SendMsg()
的方法,该方法将数据发送到一个名为msgChan
的未封闭通道。但是,如果有大量的客户端连接,并且收件人无法及时处理消息,则可能导致暂时阻止。为了提供非障碍的发送体验,可以添加缓冲消息发送方法。
iconnection接口定义(zinx/ziface/iconnection.go)
// Connection interface definition
type IConnection interface {
// Start the connection, allowing the current connection to start working
Start()
// Stop the connection, ending the current connection state
Stop()
// Get the raw TCPConn of the current connection
GetTCPConnection() *net.TCPConn
// Get the current connection ID
GetConnID() uint32
// Get the remote client address information
RemoteAddr() net.Addr
// Send Message data directly to the remote TCP client (unbuffered)
SendMsg(msgID uint32, data []byte) error
// Send Message data directly to the remote TCP client (buffered)
SendBuffMsg(msgID uint32, data []byte) error // Add buffered message sending interface
}
除了SendMsg()
方法外,我们还将在IConnection
Interface中提供SendBuffMsg()
方法。 SendBuffMsg()
方法类似于SendMSG(),但使用一个缓冲通道进行两个Goroutines之间的通信。 zinx/znet/connection.go
文件中连接结构的定义将被修改如下:
连接结构定义(zinx/znet/connection.go)
type Connection struct {
// The server to which the current connection belongs
TcpServer ziface.IServer
// The TCP socket of the current connection
Conn *net.TCPConn
// The ID of the current connection (also known as SessionID, globally unique)
ConnID uint32
// The closing state of the current connection
isClosed bool
// The message handler that manages MsgID and corresponding processing methods
MsgHandler ziface.IMsgHandle
// The channel that informs that the connection has exited/stopped
ExitBuffChan chan bool
// The unbuffered channel used for message communication between the reading and writing goroutines
msgChan chan []byte
// The buffered channel used for message communication between the reading and writing goroutines
msgBuffChan chan []byte
}
为了实现缓冲消息发送功能,chan []byte
类型的msgBuffChan
被添加到Connection
struct中。 msgBuffChan
将用于阅读和写作goroutines之间的通信。确保在连接的构造函数方法中初始化msgBuffChan
成员:
newConnection构造函数(zinx/znet/connection.go)
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
// Initialize Conn properties
c := &Connection{
TcpServer: server,
Conn: conn,
ConnID: connID,
isClosed: false,
MsgHandler: msgHandler,
ExitBuffChan: make(chan bool, 1),
msgChan: make(chan []byte),
msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen), // Don't forget to initialize
}
// Add the newly created Conn to the connection manager
c.TcpServer.GetConnMgr().Add(c)
return c
}
SendBuffMsg()
方法实现与SendMsg()
相似。它通过msgBuffChan
包装并将数据发送给客户端。这是实现:
sendbuffmsg()方法实现(zinx/znet/connection.go)
func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
if c.isClosed {
return errors.New("Connection closed when sending buffered message")
}
// Pack the data and send it
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgID, data))
if err != nil {
fmt.Println("Pack error msg ID =", msgID)
return errors.New("Pack error message")
}
// Write to the client
c.msgBuffChan <- msg
return nil
}
Connection
结构中的StartWriter()
方法需要处理msgBuffChan
以进行数据传输。这是实现:
startwriter()方法实现(zinx/znet/connection.go)
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
for {
select {
case data := <-c.msgChan:
// Data to be written to the client
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send Data error:", err, "Conn Writer exit")
return
}
case data, ok := <-c.msgBuffChan:
// Handling data for buffered channel
if ok {
// Data to be written to the client
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send Buffered Data error:", err, "Conn Writer exit")
return
}
} else {
fmt.Println("msgBuffChan is Closed")
break
}
case <-c.ExitBuffChan:
return
}
}
}
StartWriter()
方法聆听msgChan
和MSGBuffchan通道。如果msgChan
中有数据,则将其直接写给客户端。如果msgBuffChan
中有数据,则将其写入相应的处理后将其写入客户。
9.1.5注册连接开始/停止自定义挂钩方法链接初始化/关机
在连接的生命周期中,有两个时刻需要注册回调功能来执行自定义业务逻辑。这些时刻发生在创建连接和断开连接之前。为了满足此要求,Zinx需要添加回调函数(也称为Hook函数),这些功能是在创建连接和断开连接之前触发的。
zinx/ziface/iserver.go
文件中的IServer
接口提供了用于注册开发人员可以使用的连接挂钩的方法。接口定义如下:
type IServer interface {
// Start the server
Start()
// Stop the server
Stop()
// Start the business service
Serve()
// Register a routing business method for the current server to be used for client connection processing
AddRouter(msgID uint32, router IRouter)
// Get the connection manager
GetConnMgr() IConnManager
// Set the hook function to be called when a connection is created for this server
SetOnConnStart(func(IConnection))
// Set the hook function to be called when a connection is about to be disconnected for this server
SetOnConnStop(func(IConnection))
// Invoke the OnConnStart hook function for the connection
CallOnConnStart(conn IConnection)
// Invoke the OnConnStop hook function for the connection
CallOnConnStop(conn IConnection)
}
添加了四种新的钩方法:
1.SetOnConnStart
:设置为当前服务器创建连接时要调用的挂钩函数。
2.SetOnConnStop
:设置即将与当前服务器断开连接时要调用的挂钩函数。
3.CallOnConnStart
:创建连接后调用钩函数。
4.CallOnConnStop
:在即将断开连接之前调用挂钩函数。
zinx/znet/server.go
文件中的Server
struct已更新,以包括两个新的钩函数字段:
type Server struct {
// Server name
Name string
// TCP version (e.g., tcp4 or other)
IPVersion string
// IP address to which the server is bound
IP string
// Port to which the server is bound
Port int
// Message handler for the server, used to bind MsgID with corresponding handling methods
MsgHandler ziface.IMsgHandle
// Connection manager for the server
ConnMgr ziface.IConnManager
// New hook function prototypes //
// Hook function to be called when a connection is created for this server
OnConnStart func(conn ziface.IConnection)
// Hook function to be called when a connection is about to be disconnected for this server
OnConnStop func(conn ziface.IConnection)
}
Server
结构现在包括OnConnStart
和OnConnStop
字段,以保留开发人员传递的挂钩功能的地址。
实现四种新挂钩方法如下:
// Set the hook function to be called when a connection is created for the server
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {
s.OnConnStart = hookFunc
}
// Set the hook function to be called when a connection is about to be disconnected for the server
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {
s.OnConnStop = hookFunc
}
// Invoke the OnConnStart hook function for the connection
func (s *Server) CallOnConnStart(conn ziface.IConnection) {
if s.OnConnStart != nil {
fmt.Println("---> CallOnConnStart....")
s.OnConnStart(conn)
}
}
// Invoke the OnConnStop hook function for the connection
func (s *Server) CallOnConnStop(conn ziface.IConnection) {
if s.OnConnStop != nil {
fmt.Println("---> CallOnConnStop....")
s.OnConnStop(conn)
}
}
现在,让我们确定应调用这两个钩方法的位置。第一个位置是在创建连接之后,这是Connection
struct的Start()
方法的最后一步:
// Start the connection, allowing it to begin working
func (c *Connection) Start() {
// 1. Start the Goroutine for reading data from the client
go c.StartReader()
// 2. Start the Goroutine for writing data back to the client
go c.StartWriter()
// Call the registered hook method for connection creation according to the user's requirements
c.TcpServer.CallOnConnStart(c)
}
第二个位置就在停止连接之前,这是调用Connection
struct的Stop()
方法。应该在插座的Close()
动作之前调用,因为一旦插座关闭,就会终止与遥控端的通信。如果挂钩方法涉及将数据写回客户,则将无法正确通信。因此,应在Close()
方法之前调用钩方法。这是代码:
// Stop the connection, ending the current connection state
func (c *Connection) Stop() {
fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
// If the current connection is already closed
if c.isClosed == true {
return
}
c.isClosed = true
// ==================
// If the user registered a callback function for this connection's closure, it should be called explicitly at this moment
c.TcpServer.CallOnConnStop(c)
// ==================
// Close the socket connection
c.Conn.Close()
// Close the writer
c.ExitBuffChan <- true
// Remove the connection from the connection manager
c.TcpServer.GetConnMgr().Remove(c)
// Close all channels of this connection
close(c.ExitBuffChan)
close(c.msgBuffChan)
}
9.1.5使用Zinx-V0.9完成应用程序
到目前为止,所有连接管理功能都已集成到zinx中。下一步是测试连接管理模块是否功能正常。让我们测试一台服务器,以说明处理连接管理挂钩函数回调的能力。代码如下:
// Server.go
package main
import (
"fmt"
"zinx/ziface"
"zinx/znet"
)
// Ping test custom router
type PingRouter struct {
znet.BaseRouter
}
// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
// Read the data from the client first, then write back ping...ping...ping
fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))
if err != nil {
fmt.Println(err)
}
}
type HelloZinxRouter struct {
znet.BaseRouter
}
// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {
fmt.Println("Call HelloZinxRouter Handle")
// Read the data from the client first, then write back Hello Zinx Router V0.8
fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.8"))
if err != nil {
fmt.Println(err)
}
}
// Executed when a connection is created
func DoConnectionBegin(conn ziface.IConnection) {
fmt.Println("DoConnectionBegin is Called...")
err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))
if err != nil {
fmt.Println(err)
}
}
// Executed when a connection is lost
func DoConnectionLost(conn ziface.IConnection) {
fmt.Println("DoConnectionLost is Called...")
}
func main() {
// Create a server handler
s := znet.NewServer()
// Register connection hook callback functions
s.SetOnConnStart(DoConnectionBegin)
s.SetOnConnStop(DoConnectionLost)
// Configure routers
s.AddRouter(0, &PingRouter{})
s.AddRouter(1, &HelloZinxRouter{})
// Start the server
s.Serve()
}
服务器端业务代码登记两个挂钩函数:创建连接后要执行的DoConnectionBegin()
并在连接丢失之前将执行DoConnectionLost()
。
-
DoConnectionBegin()
:创建连接后,它将带有ID 2的消息发送给客户端,并在服务器端打印一个调试消息,说“ doconnectionbebegin被称为...”。 -
DoConnectionLost()
:在连接丢失之前,它在服务器端打印了一个调试消息,说“ doconnectionlost称为...”。
客户端Client.go
的代码保持不变。要测试服务器和客户端,请打开不同的终端,并使用以下命令启动服务器和客户端:
1.启动服务器:
$ go run Server.go
2.启动客户端:
$ go run Client.go
服务器端输出将如下:
$ go run Server.go
Add api msgId = 0
Add api msgId = 1
[START] Server name: zinx v-0.8 demoApp, listener at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
start Zinx server zinx v-0.8 demoApp succ, now listening...
Worker ID = 9 is started.
Worker ID = 5 is started.
Worker ID = 6 is started.
Worker ID = 7 is started.
Worker ID = 8 is started.
Worker ID = 1 is started.
Worker ID = 0 is started.
Worker ID = 2 is started.
Worker ID = 3 is started.
Worker ID = 4 is started.
connection add to ConnManager successfully: conn num = 1
---> CallOnConnStart....
DoConnectionBegin is Called...
[Writer Goroutine is running]
[Reader Goroutine is running]
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
read msg head error read tcp4 127.0.0.1:7777->127.0.0.1:49510: read: connection reset by peer
Conn Stop()...ConnID = 0
---> CallOnConnStop....
DoConnectionLost is Called...
connection Remove ConnID= 0 successfully: conn num = 0
127.0.0.1:49510 [conn Reader exit!]
127.0.0.1:49510 [conn Writer exit!]
客户端输出将如下:
$ go run Client0.go
Client Test... start
==> Recv Msg: ID= 2 , len= 21 , data= DoConnection BEGIN...
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
^Csignal: interrupt
从上述结果中,我们可以看到客户端已成功创建,并且已经执行了回调钩。连接已添加到服务器中的ConnManager
,当前连接计数Conn Num是1。当我们手动按“ Ctrl+C”关闭客户端时,服务器端上的ConnManager成功地删除了连接,并且连接count conn num
变为0。此外,打印了连接停止回调的服务器端调试信息。
9.2设置连接属性在zinx中
处理连接时,开发人员通常希望将某些用户数据或参数绑定到连接。这使他们可以在手柄处理过程中从连接中检索传递的参数,并相应地执行业务逻辑。为了提供此功能,Zinx需要建立在当前连接上设置属性的接口或方法。本节将实现设置连接属性的功能。
9.2.1添加连接配置接口
首先,在Iconnection摘要层中,添加了三个相关的配置连接属性的接口。代码如下:
//zinx/ziface/iconnection.go
// Definition of the connection interface
type IConnection interface {
// Start the connection and initiate its operations
Start()
// Stop the connection and terminate its current state
Stop()
// Get the underlying TCPConn from the current connection
GetTCPConnection() *net.TCPConn
// Get the connection's unique ID
GetConnID() uint32
// Get the remote client's address information
RemoteAddr() net.Addr
// Send Message data directly to the remote TCP client (unbuffered)
SendMsg(msgId uint32, data []byte) error
// Send Message data directly to the remote TCP client (buffered)
SendBuffMsg(msgId uint32, data []byte) error
// Set connection attributes
SetProperty(key string, value interface{})
// Get connection attributes
GetProperty(key string) (interface{}, error)
// Remove connection attributes
RemoveProperty(key string)
}
在提供的代码段中,Iconnection具有三种添加的方法:SetProperty()
,GetProperty()
和RemoveProperty()
。每种方法中的密钥参数是类型字符串,值参数为多功能接口{}类型。随后的步骤涉及定义连接内的特定属性类型。
9.2.2连接属性方法的实现
在实现层中,连接结构使用命名属性的成员属性增强。该属性将保留通过连接传递的所有用户提供的参数,并将其定义为map[string]interface{}
类型。定义如下:
//zinx/znet/connction.go
type Connection struct {
// Current Conn belongs to which Server
TcpServer ziface.IServer
// The TCP socket of the current connection
Conn *net.TCPConn
// ID of the current connection, also known as SessionID; globally unique
ConnID uint32
// Current connection's closure state
isClosed bool
// Message manager module for managing MsgId and corresponding message handling methods
MsgHandler ziface.IMsgHandle
// Channel to signal that the connection has exited/stopped
ExitBuffChan chan bool
// Unbuffered channel for message communication between read and write goroutines
msgChan chan []byte
// Buffered channel for message communication between read and write goroutines
msgBuffChan chan []byte
// Connection properties
property map[string]interface{}
// Lock for protecting concurrent property modifications
propertyLock sync.RWMutex
}
property
地图不是并发安全,并且在地图上读写操作需要使用propertyLock
保护。此外,Connection
的构造函数必须初始化property
和propertyLock
。为此提供了代码段:
//zinx/znet/connction.go
// Method to create a connection
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
// Initialize Conn properties
c := &Connection{
TcpServer: server,
Conn: conn,
ConnID: connID,
isClosed: false,
MsgHandler: msgHandler,
ExitBuffChan: make(chan bool, 1),
msgChan: make(chan []byte),
msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
property: make(map[string]interface{}), // Initialize connection property map
}
// Add the newly created Conn to the connection manager
c.TcpServer.GetConnMgr().Add(c)
return c
}
实现了处理连接属性的三种方法很简单。它们分别涉及从地图中添加,阅读和删除条目。这些方法的实现如下:
//zinx/znet/connction.go
// Set connection property
func (c *Connection) SetProperty(key string, value interface{}) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
c.property[key] = value
}
// Get connection property
func (c *Connection) GetProperty(key string) (interface{}, error) {
c.propertyLock.RLock()
defer c.propertyLock.RUnlock()
if value, ok := c.property[key]; ok {
return value, nil
} else {
return nil, errors.New("no property found")
}
}
// Remove connection property
func (c *Connection) RemoveProperty(key string) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
delete(c.property, key)
}
这些方法管理与属性图的交互,允许添加,检索和删除与连接关联的属性。
9.1.3 Zinx-V0.10中的连接属性测试
封装连接属性的功能后,我们可以使用服务器端上的相关接口设置一些属性并测试属性的设置和提取是否功能。以下是服务器端代码:
// Server.go
package main
import (
"fmt"
"zinx/ziface"
"zinx/znet"
)
// ping test custom router
type PingRouter struct {
znet.BaseRouter
}
// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
// Read client data first, then reply with ping...ping...ping
fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))
if err != nil {
fmt.Println(err)
}
}
type HelloZinxRouter struct {
znet.BaseRouter
}
// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {
fmt.Println("Call HelloZinxRouter Handle")
// Read client data first, then reply with ping...ping...ping
fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.10"))
if err != nil {
fmt.Println(err)
}
}
// Executed when a connection is created
func DoConnectionBegin(conn ziface.IConnection) {
fmt.Println("DoConnectionBegin is Called ... ")
// Set two connection properties after the connection is created
fmt.Println("Set conn Name, Home done!")
conn.SetProperty("Name", "Aceld")
conn.SetProperty("Home", "https://github.com/aceld/zinx")
err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))
if err != nil {
fmt.Println(err)
}
}
// Executed when a connection is lost
func DoConnectionLost(conn ziface.IConnection) {
// Before the connection is destroyed, query the "Name" and "Home" properties of the conn
if name, err := conn.GetProperty("Name"); err == nil {
fmt.Println("Conn Property Name =", name)
}
if home, err := conn.GetProperty("Home"); err == nil {
fmt.Println("Conn Property Home =", home)
}
fmt.Println("DoConnectionLost is Called ... ")
}
func main() {
// Create a server handle
s := znet.NewServer()
// Register connection hook callback functions
s.SetOnConnStart(DoConnectionBegin)
s.SetOnConnStop(DoConnectionLost)
// Configure routers
s.AddRouter(0, &PingRouter{})
s.AddRouter(1, &HelloZinxRouter{})
// Start the server
s.Serve()
}
关键重点是DoConnectionBegin()
和DoConnectionLost()
功能的实现。在这些功能中,使用挂钩回调设置并提取连接属性。建立连接后,使用conn.SetProperty()
方法将属性“名称”和“ HOME”分配给连接。稍后,可以使用conn.GetProperty()
方法检索这些属性。
打开终端启动服务器程序,并观察以下结果:
$ go run Server.go
Add api msgId = 0
Add api msgId = 1
[START] Server name: zinx v-0.10 demoApp,listener at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
start Zinx server zinx v-0.10 demoApp succ, now listening...
Worker ID = 9 is started.
Worker ID = 5 is started.
Worker ID = 6 is started.
Worker ID = 7 is started.
Worker ID = 8 is started.
Worker ID = 1 is started.
Worker ID = 0 is started.
Worker ID = 2 is started.
Worker ID = 3 is started.
Worker ID = 4 is started.
connection add to ConnManager successfully: conn num = 1
---> CallOnConnStart....
DoConnecionBegin is Called ...
Set conn Name, Home done!
[Writer Goroutine is running]
[Reader Goroutine is running]
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
read msg head error read tcp4 127.0.0.1:7777->127.0.0.1:55208: read: connection reset by peer
Conn Stop()...ConnID = 0
---> CallOnConnStop....
Conn Property Name = Aceld
Conn Property Home = https://github.com/aceld/zinx
DoConneciotnLost is Called ...
connection Remove ConnID= 0 successfully: conn num = 0
127.0.0.1:55208 [conn Reader exit!]
127.0.0.1:55208 [conn Writer exit!]
在新的终端中,启动客户端程序,并观察以下结果:
$ go run Client0.go
Client Test ... start
==> Recv Msg: ID= 2 , len= 21 , data= DoConnection BEGIN...
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
^Csignal: interrupt
服务器输出特别值得注意:
---> CallOnConnStop....
Conn Property Name = Aceld
Conn Property Home = https://github.com/aceld/zinx
DoConneciotnLost is Called ...
9.3摘要
在本章中,引入了两个功能,以增强Zinx中连接的功能。第一个是连接管理模块,该模块聚集了Zinx服务器中的所有连接,提供了汇总的视图和计数连接总数。当前,Zinx仅对连接ID采用基本的增量计算。鼓励读者通过用更常见的分布式ID替换Connid来优化这一方面,从而确保跨ID的独特性。
连接管理模块通过限制连接数来帮助控制服务器上的同时负载。第二个附加的连接属性丰富了ZINX内处理业务逻辑的便利性。开发人员可以利用SetProperty()
和GetProperty()
方法将不同的属性与不同的连接相关联。这使各种标记或属性链接到不同的连接,从而促进灵活的业务逻辑处理。
<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>
<6.Design and Implementation of Zinx Multi-Router Mode>
<7. Building Zinx's Read-Write Separation Model>
<8.Zinx Message Queue and Task Worker Pool Design and Implementation>
<9. Zinx Connection Management and Property Setting>
未完待续...
作者:
不和谐:https://discord.gg/xQ8Xxfyfcz
zinx:https://github.com/aceld/zinx
github:https://github.com/aceld
Aceld的家:https://yuque.com/aceld