使用GO通道限制并发函数执行
#go #channels

本文将解释如何使用GO通道限制并发函数执行。我从事一项GO服务,该服务在其HTTP请求处理程序中执行另一个程序。执行的程序使用大量的CPU使用和内存使用量,因为它可以与照片和视频一起使用,因此我必须限制其中有多少可以同时运行。

幸运的是,在这种情况下,Go频道非常方便。我们还将添加超时支持,以使我们的等待goroutines不会无限期地块。需要缓冲的GO频道来开发它,所以让我们记住它们是什么。

缓冲GO通道

缓冲通道可以容纳有限数量的值(由缓冲区大小确定),并且在缓冲区满足时只能阻止发送goroutine。这可以允许一些额外的并发性,但需要仔细考虑以避免死锁和其他同步问题。

缓冲通道是我们将在本文中构建的理想解决方案。

限制并发函数执行

我们首先创建一个名为limiter的新软件包,然后为Limiter类型定义数据结构。当创建Limiter时,我们将提供一个限制和超时,数据结构将保持超时和以提供的限制创建的缓冲通道。

翻译成代码,我们可以写这样的东西:

type Limiter struct {
    timeout time.Duration
    ch chan struct{}
}

func NewLimiter(limit int, timeout time.Duration) *Limiter {
    return &Limiter{
        timeout: timeout,
        ch: make(chan struct{}, limit),
    }
}

我们现在需要定义如何使用Limiter类型。我认为,如果我们可以传达一个没有参数并且不返回限制器的函数,那将是最好的。如果缓冲通道的缓冲区不满,则将执行传递的功能。如果缓冲区已满,则呼叫者将等待创建Limiter时定义的超时。如果释放了一个插槽,则将执行该函数,如果超时时间通过,将返回错误。

使用空结构的类型创建缓冲通道。空结构对于数据不重要的信号很有用,因为它们的内存足迹为0。

我们可以在下面的代码中看到Execute函数的定义。

func (l *Limiter) Execute(f func()) error {
    select {
    case l.ch <- struct{}{}:
        // Added execution for running...
    case <-time.After(l.timeout):
        return errors.New("execution timed out")
    }

    // On exit, remove this execution from limit channel
    defer func() {
        <-l.ch
    }()

    f()

    return nil
}

Execute函数首先尝试将空结构插入缓冲通道中。如果频道不满,则将继续执行SELECT语句,并执行提供的功能。但是,如果频道已满,则选择语句将等待,并且可能会发生两件事。缓冲通道将倒空容量,我们将执行提供的功能。如果按时间通过在limiter超时值中定义的时间量,则选择语句将输入返回错误的情况。在这种情况下,函数将不会执行。在调用提供的函数之前,我们添加了一个函数,并使用defer语句添加函数,该功能将在Execute完成时执行。此功能将使缓冲通道被清空。

这全都适用于limiter,它可以限制函数调用的结构。我还想指出,在选择语句中,您可以使用上述示例代码中的频道定义自定义时间间隔。这对于定期检查某些事情或在此示例中进行超时会非常有用。

这是有关如何使用我们定义的结构的一个示例:

// 1. Define the limiter
l := limiter.NewLimiter(5, 20*time.Second)

// 2. Somewhere in your code you can add a function for execution   
l.Execute(func() {
    // Processing..
})

如果您想查看整个文件,我在github上创建了要点,因此您可以访问this链接或从下面的github复制代码。

感谢您阅读本文,希望它能帮助您解决问题或学习新的东西。