5.Zinx消息封装模块设计和实现
#go #zinx

<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>


接下来,让我们简单地升级到zinx。当前,所有服务器请求的数据都存储在单个请求对象中。当前请求的结构如下:

type Request struct {
    conn ziface.IConnection // the connection established with the client
    data []byte             // the data requested by the client
}

正如我们所看到的,该请求当前使用[]byte接收所有数据,而无需任何长度或消息类型信息。这种方法缺乏全面的数据管理。为了更好地表示数据格式,我们需要定义自定义消息类型并封装其中的所有消息。

5.1创建消息封装类型

zinx/ziface目录中创建一个名为imessage.go的文件。该文件用作消息封装的抽象层。在此文件中,定义IMessage接口如下:

package ziface

// IMessaage is an abstract interface that encapsulates a message
type IMessage interface {
    GetDataLen() uint32 // Get the length of the message data segment
    GetMsgId() uint32   // Get the message ID
    GetData() []byte    // Get the message content

    SetMsgId(uint32)     // Set the message ID
    SetData([]byte)      // Set the message content
    SetDataLen(uint32)  // Set the length of the message data segment
}

IMessage接口定义了六种方法,包括消息长度,消息ID和消息内容的Getters和Setter。从接口中,我们可以看到一条消息应该具有三个元素:ID,长度和数据。

接下来,在zinx/znet目录中的message.go文件中创建Message struct的实现:

package znet

type Message struct {
    Id      uint32   // ID of the message
    DataLen uint32   // Length of the message
    Data    []byte   // Content of the message
}

接下来,为Message struct提供构造方法:

// Create a new Message package
func NewMsgPackage(id uint32, data []byte) *Message {
    return &Message{
        Id:      id,
        DataLen: uint32(len(data)),
        Data:    data,
    }
}

实现相关的getter和setter方法如下:

// Get the length of the message data segment
func (msg *Message) GetDataLen() uint32 {
    return msg.DataLen
}

// Get the message ID
func (msg *Message) GetMsgId() uint32 {
    return msg.Id
}

// Get the message content
func (msg *Message) GetData() []byte {
    return msg.Data
}

// Set the length of the message data segment
func (msg *Message) SetDataLen(len uint32) {
    msg.DataLen = len
}

// Set the message ID
func (msg *Message) SetMsgId(msgId uint32) {
    msg.Id = msgId
}

// Set the message content
func (msg *Message) SetData(data []byte) {
    msg.Data = data
}

Message结构是将来封装优化的占位符。它还提供了一种初始化方法NewMsgPackage()来创建一个消息包。

5.2消息包装和打开包装

Zinx采用经典的TLV(型长度值)包装格式来解决TCP数据包贴纸问题。特定的消息结构在图5.1中以图形方式表示。

图5.1
Image description

5.2.1创建抽象类,以用于消息包装和解开包装

在本节中,我们将创建用于消息包装的实现,并解开包装以解决TCP数据包粘性问题。首先,在zinx/ziface目录中创建文件idatapack.go。该文件是用于消息包装和解开包装的抽象层接口。代码如下:

//zinx/ziface/idatapack.go

package ziface

/*
    Packaging and unpacking data
    Directly operates on the data stream of TCP connections, adding header information for transmitting data to handle TCP packet sticking problem.
*/
type IDataPack interface {
    GetHeadLen() uint32               // Method to get the length of the packet header
    Pack(msg IMessage) ([]byte, error) // Method to pack the message
    Unpack([]byte) (IMessage, error)   // Method to unpack the message
}

IDataPack是一个抽象接口,定义了三种方法:

  • GetHeadLen()返回消息标头的长度。对于应用程序层数据包,通信的两面都需要已知的标头长度,以读取与标题当前消息相关的其他数据。因此,提供了用于检索标题长度的接口,以用于包装和解压缩的特定实现类别。
  • Pack(msg IMessage)IMessage结构的形式将数据包装到二进制流中。
  • Unpack([]byte)按照ZINX消息协议将应用层二进制数据流解析为IMessage结构。此方法对应于Pack()方法。

5.2.2实施包装和解压缩类

在本节中,我们将在Zinx中实现包装和解开功能。在zinx/znet目录中创建文件datapack.go。该文件是用于包装和解开包装的具体实现类。代码如下:

//zinx/znet/datapack.go

package znet

import (
    "bytes"
    "encoding/binary"
    "errors"
    "zinx/utils"
    "zinx/ziface"
)

// DataPack class for packaging and unpacking, no need for members currently
type DataPack struct{}

// Initialization method for the DataPack class
func NewDataPack() *DataPack {
    return &DataPack{}
}

在这里,我们定义了没有任何属性的DataPack类。我们提供NewDataPack()构造方法。

DataPack类需要实现IDataPack接口,包括GetHeadLen()Pack()Unpack()方法。首先,让我们看一下GetHeadLen()方法的实现:

//zinx/znet/datapack.go

// Method to get the length of the packet header
func (dp *DataPack) GetHeadLen() uint32 {
    // Id uint32(4 bytes) + DataLen uint32(4 bytes)
    return 8
}

在这里,我们只返回固定长度为8个字节。我们如何获得这个8字节的长度?根据图17.1所示的TLV消息格式,数据部分的长度不可控制,但头部部分的长度是固定的。头部部分由4个字节组成,用于存储消息ID和另外4个字节来存储数据部分的长度。因此,Zinx中的GetHeadLen()方法直接返回8个字节的长度。

Pack()方法的实现如下:

//zinx/znet/datapack.go

// Method for message packaging (compressing data)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
    // Create a buffer to store the bytes
    dataBuff := bytes.NewBuffer([]byte{})

    // Write DataLen
    if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {
        return nil, err
    }

    // Write MsgID
    if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
        return nil, err
    }

    // Write data
    if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
        return nil, err
    }

    return dataBuff.Bytes(), nil
}

在这里,我们使用Little-Endian字节顺序(只要包装和解开包装匹配的字节订单)即可。压缩顺序很重要,因为我们需要按照该顺序包装Datalen,Msgid和数据。同样,拆箱过程必须遵循此顺序。

Unpack()方法的相应实现如下:

//zinx/znet/datapack.go

// Method for message unpacking (decompressing data)
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
    // Create an io.Reader from the input binary data
    dataBuff := bytes.NewReader(binaryData)

    // Only extract the information from the header, obtaining dataLen and msgID
    msg := &Message{}

    // Read dataLen
    if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {
        return nil, err
    }

    // Read msgID
    if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
        return nil, err
    }

    // Check if the dataLen exceeds the maximum allowed packet size
    if utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize {
        return nil, errors.New("Too large msg data received")
    }

    // We only need to unpack the header data, and then read the data once more from the connection based on the length of the header
    return msg, nil
}

请注意,在这种情况下,Unpack()方法以两个步骤执行,如图16.1所示。第二步取决于第一步中获得的数据结果。因此,Unpack()方法只能提取标头(头)内容,获得MSGID和DATALEN值。然后,呼叫者将根据Datalen指定的长度继续读取IO流中的身体数据。以与Pack()方法相同的顺序读取Datalen和MSGID很重要。

5.2.3测试包装和解压缩功能

为了更好地理解该概念,让我们通过创建单独的服务器和客户端来测试包装并解开包装功能,然后再将数据集成到Zinx框架中。这是服务器的代码,server.go

// server.go

package main

import (
    "fmt"
    "io"
    "net"
    "zinx/znet"
)

// Test the datapack packaging and unpacking functionality
func main() {
    // Create a TCP server socket
    listener, err := net.Listen("tcp", "127.0.0.1:7777")
    if err != nil {
        fmt.Println("Server listen error:", err)
        return
    }

    // Create a goroutine to handle reading and parsing the data from the client goroutine, which is responsible for handling sticky packets
    for {
        conn, err := listener.Accept()
        if err != nil {
            fmt.Println("Server accept error:", err)
        }

        // Handle client requests
        go func(conn net.Conn) {
            // Create a datapack object dp for packaging and unpacking
            dp := znet.NewDataPack()
            for {
                // 1. Read the head part from the stream
                headData := make([]byte, dp.GetHeadLen())
                // ReadFull fills the buffer completely
                _, err := io.ReadFull(conn, headData)
                if err != nil {
                    fmt.Println("Read head error")
                    break
                }

                // 2. Unpack the headData bytes into msgHead
                msgHead, err := dp.Unpack(headData)
                if err != nil {
                    fmt.Println("Server unpack error:", err)
                    return
                }

                // 3. Read the data bytes from the IO based on dataLen
                if msgHead.GetDataLen() > 0 {
                    // msg has data, need to read data again
                    msg := msgHead.(*znet.Message)
                    msg.Data = make([]byte, msg.GetDataLen())
                    _, err := io.ReadFull(conn, msg.Data)
                    if err != nil {
                        fmt.Println("Server unpack data error:", err)
                        return
                    }

                    fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
                }
            }
        }(conn)
    }
}

让我们分析代码中的关键步骤:

(1)首先,我们从流中读取头部:

headData := make([]byte, dp.GetHeadLen())
_, err := io.ReadFull(conn, headData)

我们分配一个称为HeadData的缓冲区,固定长度为8个字节,这是协议中头部的大小。然后,我们使用io.ReadFull()准确地读取从插座的IO到Headdata的8个字节。如果下面的缓冲区中少于8个字节,则io.ReadFull()将在缓冲区完全填充之前返回。

接下来,我们将headdata字节打开到msghead:

msgHead, err := dp.Unpack(headData)

我们使用Datapack对象的Unpack()方法将HEADDATA字节打开到msgHead对象中。

最后,我们基于Datalen读取IO的数据字节:

if msgHead.GetDataLen() > 0 {
    msg := msgHead.(*znet.Message)
    msg.Data = make([]byte, msg.GetDataLen())
    _, err := io.ReadFull(conn, msg.Data)
    // ... process the received message data
}

如果dataLen大于0,则表示消息中有数据。我们根据dataLen创建一个缓冲区,使用io.ReadFull()读取从IO到缓冲区的数据字节,然后处理接收到的消息数据。

通过使用io.ReadFull(),我们确保从插座的IO读取每个读取操作所需的确切字节数。如果基础缓冲区的数据超过所需的数据,则其余数据将留在下一个读取操作的缓冲区中。

现在,让我们运行服务器并创建一个客户端以测试包装和解开包装功能。

(2)解开从headDataMessage的标题信息流的包装。

msgHead, err := dp.Unpack(headData)
if err != nil {
    fmt.Println("Server unpack error:", err)
    return
}

Unpack()功能可以从8字节headData中提取Message的标头信息。 msgHead.IDmsgHead.DataLen字段将分配值。

(3)基于Datalen的IO读取字节流:

// 3. Read the data bytes from the IO based on dataLen
if msgHead.GetDataLen() > 0 {
    // If the message has data, read the data again
    msg := msgHead.(*znet.Message)
    msg.Data = make([]byte, msg.GetDataLen())
    _, err := io.ReadFull(conn, msg.Data)
    if err != nil {
        fmt.Println("Server unpack data error:", err)
        return
    }
    // ... (omitted code)
}

在代码的这一部分中,我们根据DataLen执行第二次读取,以读取完整的消息数据。我们使用ReadFull()方法读取io中的固定长度datalen字节,并用读取数据填充msgHead.Data属性。这使我们能够阅读具有固定数据长度的完整消息,从而有效地解决了消息碎片的问题。

现在让我们看一下客户的测试代码:

// client.go
package main

import (
    "fmt"
    "net"
    "zinx/znet"
)

func main() {
    // Create a client goroutine to simulate the data for sticky packets and send them
    conn, err := net.Dial("tcp", "127.0.0.1:7777")
    if err != nil {
        fmt.Println("Client dial error:", err)
        return
    }

    // 1. Create a DataPack object dp
    dp := znet.NewDataPack()

    // 2. Pack msg1
    msg1 := &znet.Message{
        Id:      0,
        DataLen: 5,
        Data:    []byte{'h', 'e', 'l', 'l', 'o'},
    }

    sendData1, err := dp.Pack(msg1)
    if err != nil {
        fmt.Println("Client pack msg1 error:", err)
        return
    }

    // 3. Pack msg2
    msg2 := &znet.Message{
        Id:      1,
        DataLen: 7,
        Data:    []byte{'w', 'o', 'r', 'l', 'd', '!', '!'},
    }
    sendData2, err := dp.Pack(msg2)
    if err != nil {
        fmt.Println("Client pack msg2 error:", err)
        return
    }

    // 4. Concatenate sendData1 and sendData2 to create a sticky packet
    sendData1 = append(sendData1, sendData2...)

    // 5. Write data to the server
    conn.Write(sendData1)

    // Block the client
    select {}
}

在此代码中,有五个明确的步骤:

  1. 客户端使用Zinx/Znet的数据包来包装数据。两条消息连续包装,而append()用于连接两个消息的字节流,模拟粘性数据包。然后将组合的senddata1写入远程服务器。

  2. 如果服务器可以解析两条消息的数据,则证明了Datapack的包装和解压缩方法正常运行。

运行服务器,执行以下命令:

go run Server.go

在新终端中,使用命令运行客户端代码:

go run Client.go

从服务器的角度来看,观察到以下输出:

==> Recv Msg: ID=0, len=5, data=hello
==> Recv Msg: ID=1, len=7, data=world!!

结果表明Zinx已收到客户端发送的两个数据包并成功解析了。

5.3 ZINX-V0.5代码实现

在本节中,我们需要将数据包包装和解开功能集成到Zinx中,并测试该功能是否有效。

5.3.1修改请求结构

首先,我们需要修改Request struct的data字段。

package znet

import "zinx/ziface"

type Request struct {
    conn ziface.IConnection // The connection already established with the client
    msg  ziface.IMessage    // Data requested by the client
}

// Get the connection information
func (r *Request) GetConnection() ziface.IConnection {
    return r.conn
}

// Get the data of the request message
func (r *Request) GetData() []byte {
    return r.msg.GetData()
}

// Get the ID of the request message
func (r *Request) GetMsgID() uint32 {
    return r.msg.GetMsgId()
}

还需要相应地修改相关的Getter方法。

5.3.2拆箱过程的集成

接下来,我们需要修改Connection方法中读取客户端数据的代码,如下所示:

func (c *Connection) StartReader() {
    fmt.Println("Reader Goroutine is running")
    defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
    defer c.Stop()

    for {
        // Create a data packing/unpacking object
        dp := NewDataPack()

        // Read the client's message header
        headData := make([]byte, dp.GetHeadLen())
        if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
            fmt.Println("read msg head error", err)
            c.ExitBuffChan <- true
            continue
        }

        // Unpack the message, obtain msgid and datalen, and store them in msg
        msg, err := dp.Unpack(headData)
        if err != nil {
            fmt.Println("unpack error", err)
            c.ExitBuffChan <- true
            continue
        }

        // Read the data based on dataLen and store it in msg.Data
        var data []byte
        if msg.GetDataLen() > 0 {
            data = make([]byte, msg.GetDataLen())
            if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
                fmt.Println("read msg data error", err)
                c.ExitBuffChan <- true
                continue
            }
        }
        msg.SetData(data)

        // Get the Request data of the current client request
        req := Request{
            conn: c,
            msg:  msg, // Replace buf with msg
        }

        // Find the corresponding Handle registered in Routers based on the bound Conn
        go func(request ziface.IRequest) {
            // Execute the registered router methods
            c.Router.PreHandle(request)
            c.Router.Handle(request)
            c.Router.PostHandle(request)
        }(&req)
    }
}

集成逻辑与上一节中测试的GO代码相似。

5.3.3发送数据包方法

现在,当使用Zinx时,数据包拆分功能已集成到Zinx中,如果开发人员希望将TLV构型数据返回给用户,则他们每次都无法通过此繁琐的过程。因此,Zinx应提供用于发送数据包的数据包封装接口,该接口应在Iconnection接口中定义为SendMsg()方法。代码如下:

//zinx/ziface/iconnection.go

package ziface

import "net"

// Define the connection interface
type IConnection interface {
    // Start the connection and make the current connection work
    Start()
    // Stop the connection and end the current connection status
    Stop()
    // Get the raw socket TCPConn from the current connection
    GetTCPConnection() *net.TCPConn
    // Get the current connection ID
    GetConnID() uint32
    // Get the remote client address information
    RemoteAddr() net.Addr
    // Directly send the Message data to the remote TCP client
    SendMsg(msgId uint32, data []byte) error
}

sendmsg()方法提供了两个参数:正在发送的当前消息的消息ID和消息携带的数据。该方法的连接实现如下:

//zinx/znet/connection.go

// Send the Message data to the remote TCP client
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
    if c.isClosed == true {
        return errors.New("Connection closed when send msg")
    }

    // Package the data and send it
    dp := NewDataPack()
    msg, err := dp.Pack(NewMsgPackage(msgId, data))
    if err != nil {
        fmt.Println("Pack error msg id =", msgId)
        return errors.New("Pack error msg")
    }

    // Write back to the client
    if _, err := c.Conn.Write(msg); err != nil {
        fmt.Println("Write msg id", msgId, "error")
        c.ExitBuffChan <- true
        return errors.New("conn Write error")
    }

    return nil
}

sendmsg()方法可以使包装过程透明到业务开发人员,使数据包发送更可读,并且接口更清晰。

5.3.4使用zinx-v0.5完成应用程序

现在,我们可以使用Zinx框架来完成发送消息消息的月球服务程序的应用程序级测试案例。应用程序服务器的代码如下:

//Server.go

package main

import (
    "fmt"
    "zinx/ziface"
    "zinx/znet"
)

// Ping test custom router
type PingRouter struct {
    znet.BaseRouter
}

// Test Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
    fmt.Println("Call PingRouter Handle")
    // Read the client's data first, then write back ping...ping...ping
    fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))

    // Write back data
    err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping"))
    if err != nil {
        fmt.Println(err)
    }
}

func main() {
    // Create a server handle
    s := znet.NewServer()

    // Configure the router
    s.AddRouter(&PingRouter{})

    // Start the server
    s.Serve()
}

现在,如果要将Zinx的应用程序层数据发送到另一端,则只需要调用连接对象的SendMsg()方法即可。您还可以指定当前消息的ID号,以便开发人员可以基于不同的消息ID处理不同的业务逻辑。

在当前服务器中,客户的发送消息首先解析,然后返回带有ID 1的消息。消息内容是“ ping ... ping ... ping”。

应用程序客户端的实现代码如下:

//Client.go

package main

import (
    "fmt"
    "io"
    "net"
    "time"
    "zinx/znet"
)

/*
   Simulate the client
*/
func main() {
    fmt.Println("Client Test... start")
    // Wait for 3 seconds to give the server a chance to start the service
    time.Sleep(3 * time.Second)

    conn, err := net.Dial("tcp", "127.0.0.1:7777")
    if err != nil {
        fmt.Println("client start err, exit!")
        return
    }

    for {
        // Send packet message
        dp := znet.NewDataPack()
        msg, _ := dp.Pack(znet.NewMsgPackage(0, []byte("Zinx V0.5 Client Test Message")))
        _, err := conn.Write(msg)
        if err != nil {
            fmt.Println("write error err", err)
            return
        }

        // Read the head part of the stream first
        headData := make([]byte, dp.GetHeadLen())
        _, err = io.ReadFull(conn, headData) // ReadFull will fill msg until it's full
        if err != nil {
            fmt.Println("read head error")
            break
        }
        // Unpack the headData byte stream into msg
        msgHead, err := dp.Unpack(headData)
        if err != nil {
            fmt.Println("server unpack err:", err)
            return
        }

        if msgHead.GetDataLen() > 0 {
            // msg has data, need to read data again
            msg := msgHead.(*znet.Message)
            msg.Data = make([]byte, msg.GetDataLen())

            // Read byte stream from the io based on dataLen
            _, err := io.ReadFull(conn, msg.Data)
            if err != nil {
                fmt.Println("server unpack data err:", err)
                return
            }

            fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
        }

        time.Sleep(1 * time.Second)
    }
}

在此,客户端模拟了一个带有ID 0的消息,“ Zinx V0.5客户端测试消息”,然后打印服务器返回的数据。

在两个单独的终端中运行以下命令:

$ go run Server.go
$ go run Client.go

服务器输出将如下:

$ go run Server.go
Add Router succ!
[START] Server name: zinx v-0.5 demoApp, listener at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
start Zinx server zinx v-0.5 demoApp succ, now listening...
Reader Goroutine is running
Call PingRouter Handle
recv from client: msgId=0, data=Zinx V0.5 Client Test Message
Call PingRouter Handle
recv from client: msgId=0, data=Zinx V0.5 Client Test Message
Call PingRouter Handle
recv from client: msgId=0, data=Zinx V0.5 Client Test Message
...

客户端输出将如下:

$ go run Client.go
Client Test... start
==> Recv Msg: ID=1, len=18, data=ping...ping...ping
==> Recv Msg: ID=1, len=18, data=ping...ping...ping
==> Recv Msg: ID=1, len=18, data=ping...ping...ping
...

5.4摘要

Zinx成功地集成了消息包装功能,为Zinx的通信提供了基本的协议标准。具有识别消息类型的能力,Zinx的通信路由器可以基于不同的消息ID重定向到不同的业务逻辑。

消息包装是服务器框架的重要模块。这种包装实际上是定义了可以基于TCP/IP或UDP的通信框架的应用程序层协议。 Zinx的通信协议相对简单。如果读者希望在框架中使用更高级的通信协议,他们可以在消息标题中添加属性,例如数据包的完整检查,身份信息,解密密钥,消息状态等。但是,重要的是要注意每个添加属性的时间,应增加标头的固定长度,并且开发人员需要跟踪此长度以确保读取完整的标头信息。


<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>

未完待续...

作者:
不和谐:https://discord.gg/xQ8Xxfyfcz
zinx:https://github.com/aceld/zinx
github:https://github.com/aceld
Aceld的家:https://yuque.com/aceld