<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>
接下来,让我们简单地升级到zinx。当前,所有服务器请求的数据都存储在单个请求对象中。当前请求的结构如下:
type Request struct {
conn ziface.IConnection // the connection established with the client
data []byte // the data requested by the client
}
正如我们所看到的,该请求当前使用[]byte
接收所有数据,而无需任何长度或消息类型信息。这种方法缺乏全面的数据管理。为了更好地表示数据格式,我们需要定义自定义消息类型并封装其中的所有消息。
5.1创建消息封装类型
在zinx/ziface
目录中创建一个名为imessage.go
的文件。该文件用作消息封装的抽象层。在此文件中,定义IMessage
接口如下:
package ziface
// IMessaage is an abstract interface that encapsulates a message
type IMessage interface {
GetDataLen() uint32 // Get the length of the message data segment
GetMsgId() uint32 // Get the message ID
GetData() []byte // Get the message content
SetMsgId(uint32) // Set the message ID
SetData([]byte) // Set the message content
SetDataLen(uint32) // Set the length of the message data segment
}
IMessage
接口定义了六种方法,包括消息长度,消息ID和消息内容的Getters和Setter。从接口中,我们可以看到一条消息应该具有三个元素:ID,长度和数据。
接下来,在zinx/znet
目录中的message.go
文件中创建Message
struct的实现:
package znet
type Message struct {
Id uint32 // ID of the message
DataLen uint32 // Length of the message
Data []byte // Content of the message
}
接下来,为Message
struct提供构造方法:
// Create a new Message package
func NewMsgPackage(id uint32, data []byte) *Message {
return &Message{
Id: id,
DataLen: uint32(len(data)),
Data: data,
}
}
实现相关的getter和setter方法如下:
// Get the length of the message data segment
func (msg *Message) GetDataLen() uint32 {
return msg.DataLen
}
// Get the message ID
func (msg *Message) GetMsgId() uint32 {
return msg.Id
}
// Get the message content
func (msg *Message) GetData() []byte {
return msg.Data
}
// Set the length of the message data segment
func (msg *Message) SetDataLen(len uint32) {
msg.DataLen = len
}
// Set the message ID
func (msg *Message) SetMsgId(msgId uint32) {
msg.Id = msgId
}
// Set the message content
func (msg *Message) SetData(data []byte) {
msg.Data = data
}
Message
结构是将来封装优化的占位符。它还提供了一种初始化方法NewMsgPackage()
来创建一个消息包。
5.2消息包装和打开包装
Zinx采用经典的TLV(型长度值)包装格式来解决TCP数据包贴纸问题。特定的消息结构在图5.1中以图形方式表示。
5.2.1创建抽象类,以用于消息包装和解开包装
在本节中,我们将创建用于消息包装的实现,并解开包装以解决TCP数据包粘性问题。首先,在zinx/ziface
目录中创建文件idatapack.go
。该文件是用于消息包装和解开包装的抽象层接口。代码如下:
//zinx/ziface/idatapack.go
package ziface
/*
Packaging and unpacking data
Directly operates on the data stream of TCP connections, adding header information for transmitting data to handle TCP packet sticking problem.
*/
type IDataPack interface {
GetHeadLen() uint32 // Method to get the length of the packet header
Pack(msg IMessage) ([]byte, error) // Method to pack the message
Unpack([]byte) (IMessage, error) // Method to unpack the message
}
IDataPack
是一个抽象接口,定义了三种方法:
-
GetHeadLen()
返回消息标头的长度。对于应用程序层数据包,通信的两面都需要已知的标头长度,以读取与标题当前消息相关的其他数据。因此,提供了用于检索标题长度的接口,以用于包装和解压缩的特定实现类别。 -
Pack(msg IMessage)
以IMessage
结构的形式将数据包装到二进制流中。 -
Unpack([]byte)
按照ZINX消息协议将应用层二进制数据流解析为IMessage
结构。此方法对应于Pack()方法。
5.2.2实施包装和解压缩类
在本节中,我们将在Zinx中实现包装和解开功能。在zinx/znet
目录中创建文件datapack.go
。该文件是用于包装和解开包装的具体实现类。代码如下:
//zinx/znet/datapack.go
package znet
import (
"bytes"
"encoding/binary"
"errors"
"zinx/utils"
"zinx/ziface"
)
// DataPack class for packaging and unpacking, no need for members currently
type DataPack struct{}
// Initialization method for the DataPack class
func NewDataPack() *DataPack {
return &DataPack{}
}
在这里,我们定义了没有任何属性的DataPack
类。我们提供NewDataPack()
构造方法。
DataPack
类需要实现IDataPack
接口,包括GetHeadLen()
,Pack()
和Unpack()
方法。首先,让我们看一下GetHeadLen()
方法的实现:
//zinx/znet/datapack.go
// Method to get the length of the packet header
func (dp *DataPack) GetHeadLen() uint32 {
// Id uint32(4 bytes) + DataLen uint32(4 bytes)
return 8
}
在这里,我们只返回固定长度为8个字节。我们如何获得这个8字节的长度?根据图17.1所示的TLV消息格式,数据部分的长度不可控制,但头部部分的长度是固定的。头部部分由4个字节组成,用于存储消息ID和另外4个字节来存储数据部分的长度。因此,Zinx中的GetHeadLen()
方法直接返回8个字节的长度。
Pack()
方法的实现如下:
//zinx/znet/datapack.go
// Method for message packaging (compressing data)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
// Create a buffer to store the bytes
dataBuff := bytes.NewBuffer([]byte{})
// Write DataLen
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {
return nil, err
}
// Write MsgID
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
return nil, err
}
// Write data
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
return nil, err
}
return dataBuff.Bytes(), nil
}
在这里,我们使用Little-Endian字节顺序(只要包装和解开包装匹配的字节订单)即可。压缩顺序很重要,因为我们需要按照该顺序包装Datalen,Msgid和数据。同样,拆箱过程必须遵循此顺序。
Unpack()
方法的相应实现如下:
//zinx/znet/datapack.go
// Method for message unpacking (decompressing data)
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
// Create an io.Reader from the input binary data
dataBuff := bytes.NewReader(binaryData)
// Only extract the information from the header, obtaining dataLen and msgID
msg := &Message{}
// Read dataLen
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {
return nil, err
}
// Read msgID
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
return nil, err
}
// Check if the dataLen exceeds the maximum allowed packet size
if utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize {
return nil, errors.New("Too large msg data received")
}
// We only need to unpack the header data, and then read the data once more from the connection based on the length of the header
return msg, nil
}
请注意,在这种情况下,Unpack()
方法以两个步骤执行,如图16.1所示。第二步取决于第一步中获得的数据结果。因此,Unpack()
方法只能提取标头(头)内容,获得MSGID和DATALEN值。然后,呼叫者将根据Datalen指定的长度继续读取IO流中的身体数据。以与Pack()
方法相同的顺序读取Datalen和MSGID很重要。
5.2.3测试包装和解压缩功能
为了更好地理解该概念,让我们通过创建单独的服务器和客户端来测试包装并解开包装功能,然后再将数据集成到Zinx
框架中。这是服务器的代码,server.go
:
// server.go
package main
import (
"fmt"
"io"
"net"
"zinx/znet"
)
// Test the datapack packaging and unpacking functionality
func main() {
// Create a TCP server socket
listener, err := net.Listen("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("Server listen error:", err)
return
}
// Create a goroutine to handle reading and parsing the data from the client goroutine, which is responsible for handling sticky packets
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("Server accept error:", err)
}
// Handle client requests
go func(conn net.Conn) {
// Create a datapack object dp for packaging and unpacking
dp := znet.NewDataPack()
for {
// 1. Read the head part from the stream
headData := make([]byte, dp.GetHeadLen())
// ReadFull fills the buffer completely
_, err := io.ReadFull(conn, headData)
if err != nil {
fmt.Println("Read head error")
break
}
// 2. Unpack the headData bytes into msgHead
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("Server unpack error:", err)
return
}
// 3. Read the data bytes from the IO based on dataLen
if msgHead.GetDataLen() > 0 {
// msg has data, need to read data again
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("Server unpack data error:", err)
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
}
}(conn)
}
}
让我们分析代码中的关键步骤:
(1)首先,我们从流中读取头部:
headData := make([]byte, dp.GetHeadLen())
_, err := io.ReadFull(conn, headData)
我们分配一个称为HeadData的缓冲区,固定长度为8个字节,这是协议中头部的大小。然后,我们使用io.ReadFull()
准确地读取从插座的IO到Headdata的8个字节。如果下面的缓冲区中少于8个字节,则io.ReadFull()
将在缓冲区完全填充之前返回。
接下来,我们将headdata字节打开到msghead:
msgHead, err := dp.Unpack(headData)
我们使用Datapack对象的Unpack()
方法将HEADDATA字节打开到msgHead
对象中。
最后,我们基于Datalen读取IO的数据字节:
if msgHead.GetDataLen() > 0 {
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
_, err := io.ReadFull(conn, msg.Data)
// ... process the received message data
}
如果dataLen
大于0,则表示消息中有数据。我们根据dataLen
创建一个缓冲区,使用io.ReadFull()
读取从IO到缓冲区的数据字节,然后处理接收到的消息数据。
通过使用io.ReadFull()
,我们确保从插座的IO读取每个读取操作所需的确切字节数。如果基础缓冲区的数据超过所需的数据,则其余数据将留在下一个读取操作的缓冲区中。
现在,让我们运行服务器并创建一个客户端以测试包装和解开包装功能。
(2)解开从headData
到Message
的标题信息流的包装。
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("Server unpack error:", err)
return
}
Unpack()
功能可以从8字节headData
中提取Message
的标头信息。 msgHead.ID
和msgHead.DataLen
字段将分配值。
(3)基于Datalen的IO读取字节流:
// 3. Read the data bytes from the IO based on dataLen
if msgHead.GetDataLen() > 0 {
// If the message has data, read the data again
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("Server unpack data error:", err)
return
}
// ... (omitted code)
}
在代码的这一部分中,我们根据DataLen
执行第二次读取,以读取完整的消息数据。我们使用ReadFull()
方法读取io中的固定长度datalen字节,并用读取数据填充msgHead.Data
属性。这使我们能够阅读具有固定数据长度的完整消息,从而有效地解决了消息碎片的问题。
现在让我们看一下客户的测试代码:
// client.go
package main
import (
"fmt"
"net"
"zinx/znet"
)
func main() {
// Create a client goroutine to simulate the data for sticky packets and send them
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("Client dial error:", err)
return
}
// 1. Create a DataPack object dp
dp := znet.NewDataPack()
// 2. Pack msg1
msg1 := &znet.Message{
Id: 0,
DataLen: 5,
Data: []byte{'h', 'e', 'l', 'l', 'o'},
}
sendData1, err := dp.Pack(msg1)
if err != nil {
fmt.Println("Client pack msg1 error:", err)
return
}
// 3. Pack msg2
msg2 := &znet.Message{
Id: 1,
DataLen: 7,
Data: []byte{'w', 'o', 'r', 'l', 'd', '!', '!'},
}
sendData2, err := dp.Pack(msg2)
if err != nil {
fmt.Println("Client pack msg2 error:", err)
return
}
// 4. Concatenate sendData1 and sendData2 to create a sticky packet
sendData1 = append(sendData1, sendData2...)
// 5. Write data to the server
conn.Write(sendData1)
// Block the client
select {}
}
在此代码中,有五个明确的步骤:
-
客户端使用Zinx/Znet的数据包来包装数据。两条消息连续包装,而append()用于连接两个消息的字节流,模拟粘性数据包。然后将组合的senddata1写入远程服务器。
-
如果服务器可以解析两条消息的数据,则证明了Datapack的包装和解压缩方法正常运行。
运行服务器,执行以下命令:
go run Server.go
在新终端中,使用命令运行客户端代码:
go run Client.go
从服务器的角度来看,观察到以下输出:
==> Recv Msg: ID=0, len=5, data=hello
==> Recv Msg: ID=1, len=7, data=world!!
结果表明Zinx已收到客户端发送的两个数据包并成功解析了。
5.3 ZINX-V0.5代码实现
在本节中,我们需要将数据包包装和解开功能集成到Zinx中,并测试该功能是否有效。
5.3.1修改请求结构
首先,我们需要修改Request
struct的data
字段。
package znet
import "zinx/ziface"
type Request struct {
conn ziface.IConnection // The connection already established with the client
msg ziface.IMessage // Data requested by the client
}
// Get the connection information
func (r *Request) GetConnection() ziface.IConnection {
return r.conn
}
// Get the data of the request message
func (r *Request) GetData() []byte {
return r.msg.GetData()
}
// Get the ID of the request message
func (r *Request) GetMsgID() uint32 {
return r.msg.GetMsgId()
}
还需要相应地修改相关的Getter方法。
5.3.2拆箱过程的集成
接下来,我们需要修改Connection
方法中读取客户端数据的代码,如下所示:
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
// Create a data packing/unpacking object
dp := NewDataPack()
// Read the client's message header
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head error", err)
c.ExitBuffChan <- true
continue
}
// Unpack the message, obtain msgid and datalen, and store them in msg
msg, err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error", err)
c.ExitBuffChan <- true
continue
}
// Read the data based on dataLen and store it in msg.Data
var data []byte
if msg.GetDataLen() > 0 {
data = make([]byte, msg.GetDataLen())
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data error", err)
c.ExitBuffChan <- true
continue
}
}
msg.SetData(data)
// Get the Request data of the current client request
req := Request{
conn: c,
msg: msg, // Replace buf with msg
}
// Find the corresponding Handle registered in Routers based on the bound Conn
go func(request ziface.IRequest) {
// Execute the registered router methods
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
}
}
集成逻辑与上一节中测试的GO代码相似。
5.3.3发送数据包方法
现在,当使用Zinx时,数据包拆分功能已集成到Zinx中,如果开发人员希望将TLV构型数据返回给用户,则他们每次都无法通过此繁琐的过程。因此,Zinx应提供用于发送数据包的数据包封装接口,该接口应在Iconnection接口中定义为SendMsg()方法。代码如下:
//zinx/ziface/iconnection.go
package ziface
import "net"
// Define the connection interface
type IConnection interface {
// Start the connection and make the current connection work
Start()
// Stop the connection and end the current connection status
Stop()
// Get the raw socket TCPConn from the current connection
GetTCPConnection() *net.TCPConn
// Get the current connection ID
GetConnID() uint32
// Get the remote client address information
RemoteAddr() net.Addr
// Directly send the Message data to the remote TCP client
SendMsg(msgId uint32, data []byte) error
}
sendmsg()方法提供了两个参数:正在发送的当前消息的消息ID和消息携带的数据。该方法的连接实现如下:
//zinx/znet/connection.go
// Send the Message data to the remote TCP client
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send msg")
}
// Package the data and send it
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id =", msgId)
return errors.New("Pack error msg")
}
// Write back to the client
if _, err := c.Conn.Write(msg); err != nil {
fmt.Println("Write msg id", msgId, "error")
c.ExitBuffChan <- true
return errors.New("conn Write error")
}
return nil
}
sendmsg()方法可以使包装过程透明到业务开发人员,使数据包发送更可读,并且接口更清晰。
5.3.4使用zinx-v0.5完成应用程序
现在,我们可以使用Zinx框架来完成发送消息消息的月球服务程序的应用程序级测试案例。应用程序服务器的代码如下:
//Server.go
package main
import (
"fmt"
"zinx/ziface"
"zinx/znet"
)
// Ping test custom router
type PingRouter struct {
znet.BaseRouter
}
// Test Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
// Read the client's data first, then write back ping...ping...ping
fmt.Println("recv from client: msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
// Write back data
err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping"))
if err != nil {
fmt.Println(err)
}
}
func main() {
// Create a server handle
s := znet.NewServer()
// Configure the router
s.AddRouter(&PingRouter{})
// Start the server
s.Serve()
}
现在,如果要将Zinx的应用程序层数据发送到另一端,则只需要调用连接对象的SendMsg()方法即可。您还可以指定当前消息的ID号,以便开发人员可以基于不同的消息ID处理不同的业务逻辑。
在当前服务器中,客户的发送消息首先解析,然后返回带有ID 1的消息。消息内容是“ ping ... ping ... ping”。
应用程序客户端的实现代码如下:
//Client.go
package main
import (
"fmt"
"io"
"net"
"time"
"zinx/znet"
)
/*
Simulate the client
*/
func main() {
fmt.Println("Client Test... start")
// Wait for 3 seconds to give the server a chance to start the service
time.Sleep(3 * time.Second)
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client start err, exit!")
return
}
for {
// Send packet message
dp := znet.NewDataPack()
msg, _ := dp.Pack(znet.NewMsgPackage(0, []byte("Zinx V0.5 Client Test Message")))
_, err := conn.Write(msg)
if err != nil {
fmt.Println("write error err", err)
return
}
// Read the head part of the stream first
headData := make([]byte, dp.GetHeadLen())
_, err = io.ReadFull(conn, headData) // ReadFull will fill msg until it's full
if err != nil {
fmt.Println("read head error")
break
}
// Unpack the headData byte stream into msg
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("server unpack err:", err)
return
}
if msgHead.GetDataLen() > 0 {
// msg has data, need to read data again
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
// Read byte stream from the io based on dataLen
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("server unpack data err:", err)
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
time.Sleep(1 * time.Second)
}
}
在此,客户端模拟了一个带有ID 0的消息,“ Zinx V0.5客户端测试消息”,然后打印服务器返回的数据。
在两个单独的终端中运行以下命令:
$ go run Server.go
$ go run Client.go
服务器输出将如下:
$ go run Server.go
Add Router succ!
[START] Server name: zinx v-0.5 demoApp, listener at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
start Zinx server zinx v-0.5 demoApp succ, now listening...
Reader Goroutine is running
Call PingRouter Handle
recv from client: msgId=0, data=Zinx V0.5 Client Test Message
Call PingRouter Handle
recv from client: msgId=0, data=Zinx V0.5 Client Test Message
Call PingRouter Handle
recv from client: msgId=0, data=Zinx V0.5 Client Test Message
...
客户端输出将如下:
$ go run Client.go
Client Test... start
==> Recv Msg: ID=1, len=18, data=ping...ping...ping
==> Recv Msg: ID=1, len=18, data=ping...ping...ping
==> Recv Msg: ID=1, len=18, data=ping...ping...ping
...
5.4摘要
Zinx成功地集成了消息包装功能,为Zinx的通信提供了基本的协议标准。具有识别消息类型的能力,Zinx的通信路由器可以基于不同的消息ID重定向到不同的业务逻辑。
消息包装是服务器框架的重要模块。这种包装实际上是定义了可以基于TCP/IP或UDP的通信框架的应用程序层协议。 Zinx的通信协议相对简单。如果读者希望在框架中使用更高级的通信协议,他们可以在消息标题中添加属性,例如数据包的完整检查,身份信息,解密密钥,消息状态等。但是,重要的是要注意每个添加属性的时间,应增加标头的固定长度,并且开发人员需要跟踪此长度以确保读取完整的标头信息。
<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>
未完待续...
作者:
不和谐:https://discord.gg/xQ8Xxfyfcz
zinx:https://github.com/aceld/zinx
github:https://github.com/aceld
Aceld的家:https://yuque.com/aceld