使用无限(无界)缓冲通道在GO中进行非阻滞顺序处理
#go #goroutine #channels

在开发后端服务时,该服务将处理我工作的公司的许多实时事件,因为我们需要顺序排队和处理事件,而不会阻止阅读它们的读者。在图中表示此要求如下:

Processing requirement

事件排队以处理控件后,将其提供给读者,以便可以读取新事件而不能阻止阅读。

在不使用频道和goroutines的情况下,无法解决这种类型的问题,因此我们会走那条路。

缓冲和无情的GO频道

创建GO通道时,我们可以将其创建为缓冲通道或未封闭的通道。这是一个示例:

ch := make(chan int) // Unbuffered
ch := make(chan int, 3) // Buffered

未封闭的通道阻止了发送的goroutine,直到有相应的接收器准备接收正在发送的值。这意味着保证按发送的顺序收到数据,并且同步已内置在频道中。但是,如果我们根据要求发送一个事件并开始处理,下一个事件将阻止,并且必须等待第一个完成处理。下一个事件的阅读将被阻止。我们不想那个。

另一方面,

缓冲通道可以容纳有限数量的值(由缓冲区大小确定),并且在缓冲区满足时只能阻止发送Goroutine。这可以允许一些额外的并发性,但需要仔细考虑以避免死锁和其他同步问题。首先,如果我们知道系统要求我们可以提供足够大的缓冲区,以便我们不必担心它会被阻止。

,但是,我们知道更好,我们希望消除所有IF。这就是为什么我们将使用无限(无限)容量构建GO缓冲通道。

以无限的容量进行缓冲通道

让我们从名称开始。我命名了我的软件包和数据结构executor。有很多用于排队数据的用例,因此我们需要使用GO GENRICS,以便可以将每种类型的数据传递到executor中。 GO GENRICS可从1.18及以上版本中获得。我们还需要定义executor API,在GO软件中使用时将如何使用它。我决定将函数传递给executor,该功能将在某些数据准备好处理时被调用。

为了使自己变得容易,我们可以定义一种可以描述此功能的新类型:

type ExecHandler[T any] func(T)

,由于我们将在另一个goroutine上排队,因此我们将需要一对频道来从该goroutine发送数据并读取数据以进行处理。封装上述所有内容的数据结构可以像:
一样定义

type Executor[T any] struct {
    reader chan T
    writer chan T
    buffer []T
    execHandler ExecHandler[T]
}

缓冲区属性将容纳等待处理的排队元素。

我们还需要为executor提供工厂功能,以便所有内容都适当初始化:

func New[T any](execHandler ExecHandler[T]) *Executor[T] {
    e := &Executor[T]{
        reader: make(chan T),
        writer: make(chan T),
        buffer: make([]T, 0),
        execHandler: execHandler,
    }

    go e.run()

    return e
}

New函数构建并返回executor对象。它还催生了一个新的goroutine,并在executor上调用了run方法,该方法开始聆听将排队用于处理的新元素。

func (e *Executor[T]) run() {
    go e.listenForReading()
    for {
        if len(e.buffer) > 0 {
            select {
            case e.reader <- e.buffer[0]:
                e.buffer = e.buffer[1:]
            case data := <-e.writer:
                e.buffer = append(e.buffer, data)
            }
        } else {
            data := <-e.writer
            e.buffer = append(e.buffer, data)
        }
    }
}

func (e *Executor[T]) listenForReading() {
    for data := range e.reader {
        e.execHandler(data)
    }
}

run方法中发生了很多事情,我们现在将其分解。在第一行中,我们产生了一个新的goroutine,将处理排队的元素。通过处理,我的意思是使用该元素来调用Exec处理程序。

接下来,我们有一个无限循环,该循环排队并将数据发送到处理。如果排队元素的缓冲区为空,我们将等待writer Channel的新元素。当我们获得一个新元素时,它将存储在buffer中,循环的新迭代开始。

如果缓冲区不是空的,我们会在选择两件事可能发生的选择语句上停下来:

  • 如果reader通道准备就绪,可以将元素派发到处理中,并将其从buffer删除。
  • 以防新元素等待排队,它将被添加到缓冲区中。

剩下的唯一的是将导出的方法添加到executor以进行排队元素:

func (e *Executor[T]) Dispatch(data T) {
    e.writer <- data
}

就是这样,我们现在有一个功能性的无限(无限)通道,该通道顺序处理数据。

如果您想查看整个文件,我在github上创建了要点,因此您可以访问this链接或从下面的github复制代码。

解决此问题的解决方案有很多解决方案,因此,如果您的解决方案与文章中的问题有所不同,请在下面讨论它。我很想看看还有什么可能。如果您从事其他技术或GO,我鼓励您玩此类内容,以更好地了解语言及其并发机制。