9. Zinx连接管理和属性设置
#go #zinx

<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>
<6.Design and Implementation of Zinx Multi-Router Mode>
<7. Building Zinx's Read-Write Separation Model>
<8.Zinx Message Queue and Task Worker Pool Design and Implementation>
<9. Zinx Connection Management and Property Setting>


在本章中,我们将向ZINX框架添加连接限制。如果客户端连接的数量超过一定阈值,Zinx将拒绝新的连接请求,以确保对现有连接的及时响应。此外,我们将向Zinx介绍连接属性,使开发人员可以将特定于业务的参数与连接相关联,以便于在业务处理过程中访问。

9.1连接管理

我们需要为Zinx创建一个连接管理模块,该模块由抽象层和实现层组成。我们将首先在位于zinx/ziface目录中的iconnmanager.go文件中实现抽象层:

// zinx/ziface/iconnmanager.go

package ziface

/*
   Connection management abstract layer
*/
type IConnManager interface {
    Add(conn IConnection)                            // Add a connection
    Remove(conn IConnection)                         // Remove a connection
    Get(connID uint32) (IConnection, error)           // Get a connection using the connection ID
    Len() int                                        // Get the current number of connections
    ClearConn()                                      // Remove and stop all connections
}

IConnManager接口定义以下方法:

  • Add:添加连接到连接管理器。
  • Remove:从连接管理器中删除连接。这不会关闭连接;它只是将其从管理层中删除。
  • Get:根据连接ID检索连接对象。
  • Len:获得连接管理器管理的连接总数。
  • ClearConn:从经理中删除所有连接并关闭他们。<​​/li>

接下来,我们将在zinx/znet目录中的connmanager.go文件中为IconnManager创建实现层:

// zinx/znet/connmanager.go

package znet

import (
    "errors"
    "fmt"
    "sync"
    "zinx/ziface"
)

/*
   Connection manager module
*/
type ConnManager struct {
    connections map[uint32]ziface.IConnection // Map to hold connection information
    connLock    sync.RWMutex                  // Read-write lock for concurrent access to the map
}

ConnManager struct包含一个存储所有连接信息的connections映射。关键是连接ID,值是连接本身。 connLock是用于保护对地图的并发访问的读写锁。

ConnManager的构造函数使用make函数初始化地图:

// zinx/znet/connmanager.go

/*
   Create a connection manager
*/
func NewConnManager() *ConnManager {
    return &ConnManager{
        connections: make(map[uint32]ziface.IConnection),
    }
}

添加到经理的连接的Add方法如下实现:

// zinx/znet/connmanager.go

// Add a connection
func (connMgr *ConnManager) Add(conn ziface.IConnection) {
    // Protect shared resource (map) with a write lock
    connMgr.connLock.Lock()
    defer connMgr.connLock.Unlock()

    // Add the connection to ConnManager
    connMgr.connections[conn.GetConnID()] = conn

    fmt.Println("Connection added to ConnManager successfully: conn num =", connMgr.Len())
}

由于GO的标准库映射不是线程安全,因此我们需要使用锁来保护并发写操作。在这里,我们使用写锁(connLock)来确保在修改地图时相互排除。

删除从管理器的连接的Remove方法如下实现:

// zinx/znet/connmanager.go

// Remove a connection
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {
    // Protect shared resource (map) with a write lock
    connMgr.connLock.Lock()
    defer connMgr.connLock.Unlock()

    // Remove the connection
    delete(connMgr.connections, conn.GetConnID())

    fmt.Println("Connection removed: ConnID =", conn.GetConnID(), "successfully: conn num =", connMgr.Len())
}

Remove方法只需在不停止连接的业务处理的情况下从地图上删除连接。

GetLen方法的实现如下:

// zinx/znet/connmanager.go

// Get a connection using the connection ID
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
    // Protect shared resource (map) with a read lock
    connMgr.connLock.RLock()
    defer connMgr.connLock.RUnlock()

    if conn, ok := connMgr.connections[connID]; ok {
        return conn, nil
    } else {
        return nil, errors.New("connection not found")
    }
}

// Get the current number of connections
func (connMgr *ConnManager) Len() int {
    return len(connMgr.connections)
}

Get方法使用读取锁(connLock.RLock())允许同时读取对地图的访问,从而确保数据一致性。如果找到与给定ID的连接,则将其返回;否则,返回错误。

ClearConn方法的实现如下:

// zinx/znet/connmanager.go

// Remove and stop all connections
func (connMgr *ConnManager) ClearConn() {
    // Protect shared resource (map) with a write lock
    connMgr.connLock.Lock()
    defer connMgr.connLock.Unlock()

    // Stop and remove all connections
    for connID, conn := range connMgr.connections {
        // Stop the connection
        conn.Stop()
        // Remove the connection
        delete(connMgr.connections, connID)
    }

    fmt.Println("All connections cleared successfully: conn num =", connMgr.Len())
}

ClearConn方法通过调用conn.Stop()来停止每个连接的业务处理,然后删除地图上的所有连接。

9.1.2将连接管理模块集成到Zinx

1.将Connmanager添加到服务器

我们需要将ConnManager添加到Server struct中,并在服务器的构造函数中初始化它。 zinx/znet/server.go文件中的Server结构将具有一个名为ConnMgr的新成员属性,该属性是ziface.IConnManager类型:

// zinx/znet/server.go

// Server is a server service class implementing the IServer interface
type Server struct {
    // Server name
    Name string
    // IP version (e.g., tcp4 or other)
    IPVersion string
    // IP address to bind the server to
    IP string
    // Port to bind the server to
    Port int
    // Message handler for binding MsgID and corresponding processing methods
    MsgHandler ziface.IMsgHandle
    // Connection manager for the server
    ConnMgr ziface.IConnManager
}

在服务器的构造函数NewServer()中,我们需要初始化ConnMgr

// zinx/znet/server.go

// NewServer creates a server instance
func NewServer() ziface.IServer {
    utils.GlobalObject.Reload()

    s := &Server{
        Name:      utils.GlobalObject.Name,
        IPVersion: "tcp4",
        IP:        utils.GlobalObject.Host,
        Port:      utils.GlobalObject.TcpPort,
        MsgHandler: NewMsgHandle(),
        ConnMgr:   NewConnManager(), // Create a ConnManager
    }
    return s
}

NewServer()函数创建一个新的服务器实例并初始化了ConnMgr属性。

要从服务器提供对ConnMgr的访问,我们需要在zinx/ziface/iserver.go文件中的IServer Interface中添加方法GetConnMgr()

// zinx/ziface/iserver.go

type IServer interface {
    // Start the server
    Start()
    // Stop the server
    Stop()
    // Serve the business services
    Serve()
    // Register a router business method for the current service, used by client connection processing
    AddRouter(msgID uint32, router IRouter)
    // Get the connection manager
    GetConnMgr() IConnManager
}

GetConnMgr()方法应返回服务器的ConnMgr属性:

// zinx/znet/server.go

// GetConnMgr returns the connection manager
func (s *Server) GetConnMgr() ziface.IConnManager {
    return s.ConnMgr
}

通过实现GetConnMgr()方法,我们提供了一种从服务器访问连接管理器的方法。

由于连接(Connection)有时需要访问服务器中的连接管理器(ConnMgr)(Server),因此我们需要在ServerConnection对象之间建立相互参考关系。在Connection结构中,我们将添加一个名为TcpServer的成员,该成员表示当前连接所属的服务器。将TcpServer成员添加到Connection struct中的zinx/znet/connection.go文件中:

// zinx/znet/connection.go

type Connection struct {
    // The server to which the current connection belongs
    TcpServer    ziface.IServer  // Add this line to indicate the server to which the connection belongs
    // The TCP socket of the current connection
    Conn         *net.TCPConn
    // The ID of the current connection (also known as SessionID, globally unique)
    ConnID       uint32
    // The closing state of the current connection
    isClosed     bool
    // The message handler that manages MsgID and corresponding processing methods
    MsgHandler   ziface.IMsgHandle
    // The channel that informs that the connection has exited/stopped
    ExitBuffChan chan bool
    // The unbuffered channel used for message communication between the reading and writing goroutines
    msgChan      chan []byte
    // The buffered channel used for message communication between the reading and writing goroutines
    msgBuffChan  chan []byte
}

通过将TcpServer成员添加到Connection struct,我们建立了对连接所属服务器的引用。 TcpServer属性是类型ziface.IServer

2.将连接添加到连接管理器

初始化连接时,我们需要将连接添加到服务器的连接管理器中。在zinx/znet/connection.go文件中,修改NewConnection()函数以包括服务器对象:

// zinx/znet/connection.go

// NewConnection creates a connection
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
    c := &Connection{
        TcpServer:    server,      // Set the server object
        Conn:         conn,
        ConnID:       connID,
        isClosed:     false,
        MsgHandler:   msgHandler,
        ExitBuffChan: make(chan bool, 1),
        msgChan:      make(chan []byte),
        msgBuffChan:  make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
    }

    // Add the newly created connection to the connection manager
    c.TcpServer.GetConnMgr().Add(c)
    return c
}

NewConnection()函数中,我们将服务器对象作为参数传递,并将其设置在连接对象的TcpServer属性中。然后,我们使用c.TcpServer.GetConnMgr().Add(c)添加连接到连接管理器。

3.检查服务器中的连接计数

在服务器的Start()方法中,在与客户端建立成功的连接后,我们可以检查连接数量并终止连接创建,如果它超过了最大连接数量。在zinx/znet/server.go文件中修改Start()方法如下:

// zinx/znet/server.go

// Start the network service
func (s *Server) Start() {
    // ... (omitted code)

    // Start a goroutine to handle the server listener
    go func() {
        // ... (omitted code)

        // Start the server network connection business
        for {
            // 1. Block and wait for client connection requests
            // ... (omitted code)

            // 2. Set the maximum connection limit for the server
            //    If the limit is exceeded, close the new connection
            if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {
                conn.Close()
                continue
            }

            // ... (omitted code)
        }
    }()
}

在服务器的Start()方法中,我们使用s.ConnMgr.Len()检查连接数,并将其与最大连接限制进行比较。如果达到限制,我们关闭新连接(conn.Close()),然后继续进行下一个迭代。

开发人员可以使用MaxConn属性在配置文件zinx.jsonGlobalObject全局配置中定义最大连接计数。

4.删除连接

关闭连接时,应将其从Connmanager中删除。在连接结构的stop()方法中,我们添加了Connmanager的删除动作。修改Zinx/Znet/connection的stop()方法,如下所示:

// zinx/znet/connection.go

func (c *Connection) Stop() {
    fmt.Println("Conn Stop()... ConnID =", c.ConnID)

    if c.isClosed == true {
        return
    }
    c.isClosed = true

    c.Conn.Close()
    c.ExitBuffChan <- true

    // Remove the connection from the ConnManager
    c.TcpServer.GetConnMgr().Remove(c)

    close(c.ExitBuffChan)
    close(c.msgBuffChan)
}

Stop()方法中,关闭连接后,我们使用c.TcpServer.GetConnMgr().Remove(c)从Connmanager删除连接。

此外,在stop()方法中停止服务器时,我们也需要清除所有连接:

// zinx/znet/server.go

func (s *Server) Stop() {
    fmt.Println("[STOP] Zinx server, name", s.Name)

    // Stop or clean up other necessary connection information or other information
    s.ConnMgr.ClearConn()
}

Stop()方法中,我们致电s.ConnMgr.ClearConn()停止并删除ConnManager的所有连接。

使用上述代码,我们已成功将连接管理集成到Zinx。

9.1.3连接缓冲消息发送方法

以前,为Connection结构提供了一种称为SendMsg()的方法,该方法将数据发送到一个名为msgChan的未封闭通道。但是,如果有大量的客户端连接,并且收件人无法及时处理消息,则可能导致暂时阻止。为了提供非障碍的发送体验,可以添加缓冲消息发送方法。

iconnection接口定义(zinx/ziface/iconnection.go)

// Connection interface definition
type IConnection interface {
    // Start the connection, allowing the current connection to start working
    Start()
    // Stop the connection, ending the current connection state
    Stop()
    // Get the raw TCPConn of the current connection
    GetTCPConnection() *net.TCPConn
    // Get the current connection ID
    GetConnID() uint32
    // Get the remote client address information
    RemoteAddr() net.Addr
    // Send Message data directly to the remote TCP client (unbuffered)
    SendMsg(msgID uint32, data []byte) error
    // Send Message data directly to the remote TCP client (buffered)
    SendBuffMsg(msgID uint32, data []byte) error  // Add buffered message sending interface
}

除了SendMsg()方法外,我们还将在IConnection Interface中提供SendBuffMsg()方法。 SendBuffMsg()方法类似于SendMSG(),但使用一个缓冲通道进行两个Goroutines之间的通信。 zinx/znet/connection.go文件中连接结构的定义将被修改如下:

连接结构定义(zinx/znet/connection.go)

type Connection struct {
    // The server to which the current connection belongs
    TcpServer    ziface.IServer
    // The TCP socket of the current connection
    Conn         *net.TCPConn
    // The ID of the current connection (also known as SessionID, globally unique)
    ConnID       uint32
    // The closing state of the current connection
    isClosed     bool
    // The message handler that manages MsgID and corresponding processing methods
    MsgHandler   ziface.IMsgHandle
    // The channel that informs that the connection has exited/stopped
    ExitBuffChan chan bool
    // The unbuffered channel used for message communication between the reading and writing goroutines
    msgChan      chan []byte
    // The buffered channel used for message communication between the reading and writing goroutines
    msgBuffChan  chan []byte
}

为了实现缓冲消息发送功能,chan []byte类型的msgBuffChan被添加到Connection struct中。 msgBuffChan将用于阅读和写作goroutines之间的通信。确保在连接的构造函数方法中初始化msgBuffChan成员:

newConnection构造函数(zinx/znet/connection.go)

func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
    // Initialize Conn properties
    c := &Connection{
        TcpServer:    server,
        Conn:         conn,
        ConnID:       connID,
        isClosed:     false,
        MsgHandler:   msgHandler,
        ExitBuffChan: make(chan bool, 1),
        msgChan:      make(chan []byte),
        msgBuffChan:  make(chan []byte, utils.GlobalObject.MaxMsgChanLen), // Don't forget to initialize
    }

    // Add the newly created Conn to the connection manager
    c.TcpServer.GetConnMgr().Add(c)
    return c
}

SendBuffMsg()方法实现与SendMsg()相似。它通过msgBuffChan包装并将数据发送给客户端。这是实现:

sendbuffmsg()方法实现(zinx/znet/connection.go)

func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
    if c.isClosed {
        return errors.New("Connection closed when sending buffered message")
    }
    // Pack the data and send it
    dp := NewDataPack()
    msg, err := dp.Pack(NewMsgPackage(msgID, data))
    if err != nil {
        fmt.Println("Pack error msg ID =", msgID)
        return errors.New("Pack error message")
    }

    // Write to the client
    c.msgBuffChan <- msg

    return nil
}

Connection结构中的StartWriter()方法需要处理msgBuffChan以进行数据传输。这是实现:

startwriter()方法实现(zinx/znet/connection.go)

func (c *Connection) StartWriter() {
    fmt.Println("[Writer Goroutine is running]")
    defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")

    for {
        select {
        case data := <-c.msgChan:
            // Data to be written to the client
            if _, err := c.Conn.Write(data); err != nil {
                fmt.Println("Send Data error:", err, "Conn Writer exit")
                return
            }

        case data, ok := <-c.msgBuffChan:
            // Handling data for buffered channel
            if ok {
                // Data to be written to the client
                if _, err := c.Conn.Write(data); err != nil {
                    fmt.Println("Send Buffered Data error:", err, "Conn Writer exit")
                    return
                }
            } else {
                fmt.Println("msgBuffChan is Closed")
                break
            }

        case <-c.ExitBuffChan:
            return
        }
    }
}

StartWriter()方法聆听msgChan和MSGBuffchan通道。如果msgChan中有数据,则将其直接写给客户端。如果msgBuffChan中有数据,则将其写入相应的处理后将其写入客户。

9.1.5注册连接开始/停止自定义挂钩方法链接初始化/关机

在连接的生命周期中,有两个时刻需要注册回调功能来执行自定义业务逻辑。这些时刻发生在创建连接和断开连接之前。为了满足此要求,Zinx需要添加回调函数(也称为Hook函数),这些功能是在创建连接和断开连接之前触发的。

zinx/ziface/iserver.go文件中的IServer接口提供了用于注册开发人员可以使用的连接挂钩的方法。接口定义如下:

type IServer interface {
    // Start the server
    Start()
    // Stop the server
    Stop()
    // Start the business service
    Serve()
    // Register a routing business method for the current server to be used for client connection processing
    AddRouter(msgID uint32, router IRouter)
    // Get the connection manager
    GetConnMgr() IConnManager
    // Set the hook function to be called when a connection is created for this server
    SetOnConnStart(func(IConnection))
    // Set the hook function to be called when a connection is about to be disconnected for this server
    SetOnConnStop(func(IConnection))
    // Invoke the OnConnStart hook function for the connection
    CallOnConnStart(conn IConnection)
    // Invoke the OnConnStop hook function for the connection
    CallOnConnStop(conn IConnection)
}

添加了四种新的钩方法:

1.SetOnConnStart:设置为当前服务器创建连接时要调用的挂钩函数。
2.SetOnConnStop:设置即将与当前服务器断开连接时要调用的挂钩函数。
3.CallOnConnStart:创建连接后调用钩函数。
4.CallOnConnStop:在即将断开连接之前调用挂钩函数。

zinx/znet/server.go文件中的Server struct已更新,以包括两个新的钩函数字段:

type Server struct {
    // Server name
    Name string
    // TCP version (e.g., tcp4 or other)
    IPVersion string
    // IP address to which the server is bound
    IP string
    // Port to which the server is bound
    Port int
    // Message handler for the server, used to bind MsgID with corresponding handling methods
    MsgHandler ziface.IMsgHandle
    // Connection manager for the server
    ConnMgr ziface.IConnManager

    // New hook function prototypes //

    // Hook function to be called when a connection is created for this server
    OnConnStart func(conn ziface.IConnection)
    // Hook function to be called when a connection is about to be disconnected for this server
    OnConnStop func(conn ziface.IConnection)
}

Server结构现在包括OnConnStartOnConnStop字段,以保留开发人员传递的挂钩功能的地址。

实现四种新挂钩方法如下:

// Set the hook function to be called when a connection is created for the server
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {
    s.OnConnStart = hookFunc
}

// Set the hook function to be called when a connection is about to be disconnected for the server
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {
    s.OnConnStop = hookFunc
}

// Invoke the OnConnStart hook function for the connection
func (s *Server) CallOnConnStart(conn ziface.IConnection) {
    if s.OnConnStart != nil {
        fmt.Println("---> CallOnConnStart....")
        s.OnConnStart(conn)
    }
}

// Invoke the OnConnStop hook function for the connection
func (s *Server) CallOnConnStop(conn ziface.IConnection) {
    if s.OnConnStop != nil {
        fmt.Println("---> CallOnConnStop....")
        s.OnConnStop(conn)
    }
}

现在,让我们确定应调用这两个钩方法的位置。第一个位置是在创建连接之后,这是Connection struct的Start()方法的最后一步:

// Start the connection, allowing it to begin working
func (c *Connection) Start() {
    // 1. Start the Goroutine for reading data from the client
    go c.StartReader()
    // 2. Start the Goroutine for writing data back to the client
    go c.StartWriter()

    // Call the registered hook method for connection creation according to the user's requirements
    c.TcpServer.CallOnConnStart(c)
}

第二个位置就在停止连接之前,这是调用Connection struct的Stop()方法。应该在插座的Close()动作之前调用,因为一旦插座关闭,就会终止与遥控端的通信。如果挂钩方法涉及将数据写回客户,则将无法正确通信。因此,应在Close()方法之前调用钩方法。这是代码:

// Stop the connection, ending the current connection state
func (c *Connection) Stop() {
    fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
    // If the current connection is already closed
    if c.isClosed == true {
        return
    }
    c.isClosed = true

    // ==================
    // If the user registered a callback function for this connection's closure, it should be called explicitly at this moment
    c.TcpServer.CallOnConnStop(c)
    // ==================

    // Close the socket connection
    c.Conn.Close()
    // Close the writer
    c.ExitBuffChan <- true

    // Remove the connection from the connection manager
    c.TcpServer.GetConnMgr().Remove(c)

    // Close all channels of this connection
    close(c.ExitBuffChan)
    close(c.msgBuffChan)
}

9.1.5使用Zinx-V0.9完成应用程序

到目前为止,所有连接管理功能都已集成到zinx中。下一步是测试连接管理模块是否功能正常。让我们测试一台服务器,以说明处理连接管理挂钩函数回调的能力。代码如下:

// Server.go

package main

import (
    "fmt"
    "zinx/ziface"
    "zinx/znet"
)

// Ping test custom router
type PingRouter struct {
    znet.BaseRouter
}

// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
    fmt.Println("Call PingRouter Handle")
    // Read the data from the client first, then write back ping...ping...ping
    fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))

    err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))
    if err != nil {
        fmt.Println(err)
    }
}

type HelloZinxRouter struct {
    znet.BaseRouter
}

// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {
    fmt.Println("Call HelloZinxRouter Handle")
    // Read the data from the client first, then write back Hello Zinx Router V0.8
    fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))

    err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.8"))
    if err != nil {
        fmt.Println(err)
    }
}

// Executed when a connection is created
func DoConnectionBegin(conn ziface.IConnection) {
    fmt.Println("DoConnectionBegin is Called...")
    err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))
    if err != nil {
        fmt.Println(err)
    }
}

// Executed when a connection is lost
func DoConnectionLost(conn ziface.IConnection) {
    fmt.Println("DoConnectionLost is Called...")
}

func main() {
    // Create a server handler
    s := znet.NewServer()

    // Register connection hook callback functions
    s.SetOnConnStart(DoConnectionBegin)
    s.SetOnConnStop(DoConnectionLost)

    // Configure routers
    s.AddRouter(0, &PingRouter{})
    s.AddRouter(1, &HelloZinxRouter{})

    // Start the server
    s.Serve()
}

服务器端业务代码登记两个挂钩函数:创建连接后要执行的DoConnectionBegin()并在连接丢失之前将执行DoConnectionLost()

  • DoConnectionBegin():创建连接后,它将带有ID 2的消息发送给客户端,并在服务器端打印一个调试消息,说“ doconnectionbebegin被称为...”。

  • DoConnectionLost():在连接丢失之前,它在服务器端打印了一个调试消息,说“ doconnectionlost称为...”。

客户端Client.go的代码保持不变。要测试服务器和客户端,请打开不同的终端,并使用以下命令启动服务器和客户端:

1.启动服务器:

$ go run Server.go

2.启动客户端:

$ go run Client.go

服务器端输出将如下:

$ go run Server.go
Add api msgId = 0
Add api msgId =  1
[START] Server name: zinx v-0.8 demoApp, listener at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
start Zinx server zinx v-0.8 demoApp succ, now listening...
Worker ID =  9  is started.
Worker ID =  5  is started.
Worker ID =  6  is started.
Worker ID =  7  is started.
Worker ID =  8  is started.
Worker ID =  1  is started.
Worker ID =  0  is started.
Worker ID =  2  is started.
Worker ID =  3  is started.
Worker ID =  4  is started.
connection add to ConnManager successfully: conn num =  1
---> CallOnConnStart....
DoConnectionBegin is Called...
[Writer Goroutine is running]
[Reader Goroutine is running]
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client: msgId= 0 , data= Zinx V0.8 Client0 Test Message
read msg head error read tcp4 127.0.0.1:7777->127.0.0.1:49510: read: connection reset by peer
Conn Stop()...ConnID =  0
---> CallOnConnStop....
DoConnectionLost is Called...
connection Remove ConnID= 0  successfully: conn num =  0
127.0.0.1:49510 [conn Reader exit!]
127.0.0.1:49510 [conn Writer exit!]

客户端输出将如下:

$ go run Client0.go
Client Test... start
==> Recv Msg: ID= 2 , len= 21 , data= DoConnection BEGIN...
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
^Csignal: interrupt

从上述结果中,我们可以看到客户端已成功创建,并且已经执行了回调钩。连接已添加到服务器中的ConnManager,当前连接计数Conn Num是1。当我们手动按“ Ctrl+C”关闭客户端时,服务器端上的ConnManager成功地删除了连接,并且连接count conn num变为0。此外,打印了连接停止回调的服务器端调试信息。

9.2设置连接属性在zinx中

处理连接时,开发人员通常希望将某些用户数据或参数绑定到连接。这使他们可以在手柄处理过程中从连接中检索传递的参数,并相应地执行业务逻辑。为了提供此功能,Zinx需要建立在当前连接上设置属性的接口或方法。本节将实现设置连接属性的功能。

9.2.1添加连接配置接口

首先,在Iconnection摘要层中,添加了三个相关的配置连接属性的接口。代码如下:

//zinx/ziface/iconnection.go

// Definition of the connection interface
type IConnection interface {
    // Start the connection and initiate its operations
    Start()
    // Stop the connection and terminate its current state
    Stop()

    // Get the underlying TCPConn from the current connection
    GetTCPConnection() *net.TCPConn
    // Get the connection's unique ID
    GetConnID() uint32
    // Get the remote client's address information
    RemoteAddr() net.Addr

    // Send Message data directly to the remote TCP client (unbuffered)
    SendMsg(msgId uint32, data []byte) error
    // Send Message data directly to the remote TCP client (buffered)
    SendBuffMsg(msgId uint32, data []byte) error

    // Set connection attributes
    SetProperty(key string, value interface{})
    // Get connection attributes
    GetProperty(key string) (interface{}, error)
    // Remove connection attributes
    RemoveProperty(key string)
}

在提供的代码段中,Iconnection具有三种添加的方法:SetProperty()GetProperty()RemoveProperty()。每种方法中的密钥参数是类型字符串,值参数为多功能接口{}类型。随后的步骤涉及定义连接内的特定属性类型。

9.2.2连接属性方法的实现

在实现层中,连接结构使用命名属性的成员属性增强。该属性将保留通过连接传递的所有用户提供的参数,并将其定义为map[string]interface{}类型。定义如下:

//zinx/znet/connction.go

type Connection struct {
    // Current Conn belongs to which Server
    TcpServer ziface.IServer
    // The TCP socket of the current connection
    Conn *net.TCPConn
    // ID of the current connection, also known as SessionID; globally unique
    ConnID uint32
    // Current connection's closure state
    isClosed bool
    // Message manager module for managing MsgId and corresponding message handling methods
    MsgHandler ziface.IMsgHandle
    // Channel to signal that the connection has exited/stopped
    ExitBuffChan chan bool
    // Unbuffered channel for message communication between read and write goroutines
    msgChan chan []byte
    // Buffered channel for message communication between read and write goroutines
    msgBuffChan chan []byte
    // Connection properties
    property     map[string]interface{}
    // Lock for protecting concurrent property modifications
    propertyLock sync.RWMutex
}

property地图不是并发安全,并且在地图上读写操作需要使用propertyLock保护。此外,Connection的构造函数必须初始化propertypropertyLock。为此提供了代码段:

//zinx/znet/connction.go

// Method to create a connection
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
    // Initialize Conn properties
    c := &Connection{
        TcpServer:    server,
        Conn:         conn,
        ConnID:       connID,
        isClosed:     false,
        MsgHandler:   msgHandler,
        ExitBuffChan: make(chan bool, 1),
        msgChan:      make(chan []byte),
        msgBuffChan:  make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
        property:     make(map[string]interface{}), // Initialize connection property map
    }

    // Add the newly created Conn to the connection manager
    c.TcpServer.GetConnMgr().Add(c)
    return c
}

实现了处理连接属性的三种方法很简单。它们分别涉及从地图中添加,阅读和删除条目。这些方法的实现如下:

//zinx/znet/connction.go

// Set connection property
func (c *Connection) SetProperty(key string, value interface{}) {
    c.propertyLock.Lock()
    defer c.propertyLock.Unlock()

    c.property[key] = value
}

// Get connection property
func (c *Connection) GetProperty(key string) (interface{}, error) {
    c.propertyLock.RLock()
    defer c.propertyLock.RUnlock()

    if value, ok := c.property[key]; ok {
        return value, nil
    } else {
        return nil, errors.New("no property found")
    }
}

// Remove connection property
func (c *Connection) RemoveProperty(key string) {
    c.propertyLock.Lock()
    defer c.propertyLock.Unlock()

    delete(c.property, key)
}

这些方法管理与属性图的交互,允许添加,检索和删除与连接关联的属性。

9.1.3 Zinx-V0.10中的连接属性测试

封装连接属性的功能后,我们可以使用服务器端上的相关接口设置一些属性并测试属性的设置和提取是否功能。以下是服务器端代码:

// Server.go

package main

import (
    "fmt"
    "zinx/ziface"
    "zinx/znet"
)

// ping test custom router
type PingRouter struct {
    znet.BaseRouter
}

// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
    fmt.Println("Call PingRouter Handle")
    // Read client data first, then reply with ping...ping...ping
    fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))

    err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))
    if err != nil {
        fmt.Println(err)
    }
}

type HelloZinxRouter struct {
    znet.BaseRouter
}

// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {
    fmt.Println("Call HelloZinxRouter Handle")
    // Read client data first, then reply with ping...ping...ping
    fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))

    err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.10"))
    if err != nil {
        fmt.Println(err)
    }
}

// Executed when a connection is created
func DoConnectionBegin(conn ziface.IConnection) {
    fmt.Println("DoConnectionBegin is Called ... ")

    // Set two connection properties after the connection is created
    fmt.Println("Set conn Name, Home done!")
    conn.SetProperty("Name", "Aceld")
    conn.SetProperty("Home", "https://github.com/aceld/zinx")

    err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))
    if err != nil {
        fmt.Println(err)
    }
}

// Executed when a connection is lost
func DoConnectionLost(conn ziface.IConnection) {
    // Before the connection is destroyed, query the "Name" and "Home" properties of the conn
    if name, err := conn.GetProperty("Name"); err == nil {
        fmt.Println("Conn Property Name =", name)
    }

    if home, err := conn.GetProperty("Home"); err == nil {
        fmt.Println("Conn Property Home =", home)
    }

    fmt.Println("DoConnectionLost is Called ... ")
}

func main() {
    // Create a server handle
    s := znet.NewServer()

    // Register connection hook callback functions
    s.SetOnConnStart(DoConnectionBegin)
    s.SetOnConnStop(DoConnectionLost)

    // Configure routers
    s.AddRouter(0, &PingRouter{})
    s.AddRouter(1, &HelloZinxRouter{})

    // Start the server
    s.Serve()
}

关键重点是DoConnectionBegin()DoConnectionLost()功能的实现。在这些功能中,使用挂钩回调设置并提取连接属性。建立连接后,使用conn.SetProperty()方法将属性“名称”和“ HOME”分配给连接。稍后,可以使用conn.GetProperty()方法检索这些属性。

打开终端启动服务器程序,并观察以下结果:

$ go run Server.go 
Add api msgId =  0
Add api msgId =  1
[START] Server name: zinx v-0.10 demoApp,listener at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
start Zinx server   zinx v-0.10 demoApp  succ, now listening...
Worker ID =  9  is started.
Worker ID =  5  is started.
Worker ID =  6  is started.
Worker ID =  7  is started.
Worker ID =  8  is started.
Worker ID =  1  is started.
Worker ID =  0  is started.
Worker ID =  2  is started.
Worker ID =  3  is started.
Worker ID =  4  is started.
connection add to ConnManager successfully: conn num =  1
---> CallOnConnStart....
DoConnecionBegin is Called ... 
Set conn Name, Home done!
[Writer Goroutine is running]
[Reader Goroutine is running]
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
read msg head error  read tcp4 127.0.0.1:7777->127.0.0.1:55208: read: connection reset by peer
Conn Stop()...ConnID =  0
---> CallOnConnStop....
Conn Property Name =  Aceld
Conn Property Home =  https://github.com/aceld/zinx
DoConneciotnLost is Called ... 
connection Remove ConnID= 0  successfully: conn num =  0
127.0.0.1:55208 [conn Reader exit!]
127.0.0.1:55208 [conn Writer exit!]

在新的终端中,启动客户端程序,并观察以下结果:

$ go run Client0.go 
Client Test ... start
==> Recv Msg: ID= 2 , len= 21 , data= DoConnection BEGIN...
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
^Csignal: interrupt

服务器输出特别值得注意:

---> CallOnConnStop....
Conn Property Name =  Aceld
Conn Property Home =  https://github.com/aceld/zinx
DoConneciotnLost is Called ... 

9.3摘要

在本章中,引入了两个功能,以增强Zinx中连接的功能。第一个是连接管理模块,该模块聚集了Zinx服务器中的所有连接,提供了汇总的视图和计数连接总数。当前,Zinx仅对连接ID采用基本的增量计算。鼓励读者通过用更常见的分布式ID替换Connid来优化这一方面,从而确保跨ID的独特性。

连接管理模块通过限制连接数来帮助控制服务器上的同时负载。第二个附加的连接属性丰富了ZINX内处理业务逻辑的便利性。开发人员可以利用SetProperty()GetProperty()方法将不同的属性与不同的连接相关联。这使各种标记或属性链接到不同的连接,从而促进灵活的业务逻辑处理。


<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>
<6.Design and Implementation of Zinx Multi-Router Mode>
<7. Building Zinx's Read-Write Separation Model>
<8.Zinx Message Queue and Task Worker Pool Design and Implementation>
<9. Zinx Connection Management and Property Setting>

未完待续...

作者:
不和谐:https://discord.gg/xQ8Xxfyfcz
zinx:https://github.com/aceld/zinx
github:https://github.com/aceld
Aceld的家:https://yuque.com/aceld