本文将解释如何使用GO通道限制并发函数执行。我从事一项GO服务,该服务在其HTTP请求处理程序中执行另一个程序。执行的程序使用大量的CPU使用和内存使用量,因为它可以与照片和视频一起使用,因此我必须限制其中有多少可以同时运行。
幸运的是,在这种情况下,Go频道非常方便。我们还将添加超时支持,以使我们的等待goroutines不会无限期地块。需要缓冲的GO频道来开发它,所以让我们记住它们是什么。
缓冲GO通道
缓冲通道可以容纳有限数量的值(由缓冲区大小确定),并且在缓冲区满足时只能阻止发送goroutine。这可以允许一些额外的并发性,但需要仔细考虑以避免死锁和其他同步问题。
缓冲通道是我们将在本文中构建的理想解决方案。
限制并发函数执行
我们首先创建一个名为limiter
的新软件包,然后为Limiter
类型定义数据结构。当创建Limiter
时,我们将提供一个限制和超时,数据结构将保持超时和以提供的限制创建的缓冲通道。
翻译成代码,我们可以写这样的东西:
type Limiter struct {
timeout time.Duration
ch chan struct{}
}
func NewLimiter(limit int, timeout time.Duration) *Limiter {
return &Limiter{
timeout: timeout,
ch: make(chan struct{}, limit),
}
}
我们现在需要定义如何使用Limiter
类型。我认为,如果我们可以传达一个没有参数并且不返回限制器的函数,那将是最好的。如果缓冲通道的缓冲区不满,则将执行传递的功能。如果缓冲区已满,则呼叫者将等待创建Limiter
时定义的超时。如果释放了一个插槽,则将执行该函数,如果超时时间通过,将返回错误。
使用空结构的类型创建缓冲通道。空结构对于数据不重要的信号很有用,因为它们的内存足迹为0。
我们可以在下面的代码中看到Execute
函数的定义。
func (l *Limiter) Execute(f func()) error {
select {
case l.ch <- struct{}{}:
// Added execution for running...
case <-time.After(l.timeout):
return errors.New("execution timed out")
}
// On exit, remove this execution from limit channel
defer func() {
<-l.ch
}()
f()
return nil
}
Execute
函数首先尝试将空结构插入缓冲通道中。如果频道不满,则将继续执行SELECT语句,并执行提供的功能。但是,如果频道已满,则选择语句将等待,并且可能会发生两件事。缓冲通道将倒空容量,我们将执行提供的功能。如果按时间通过在limiter
超时值中定义的时间量,则选择语句将输入返回错误的情况。在这种情况下,函数将不会执行。在调用提供的函数之前,我们添加了一个函数,并使用defer语句添加函数,该功能将在Execute
完成时执行。此功能将使缓冲通道被清空。
这全都适用于limiter
,它可以限制函数调用的结构。我还想指出,在选择语句中,您可以使用上述示例代码中的频道定义自定义时间间隔。这对于定期检查某些事情或在此示例中进行超时会非常有用。
这是有关如何使用我们定义的结构的一个示例:
// 1. Define the limiter
l := limiter.NewLimiter(5, 20*time.Second)
// 2. Somewhere in your code you can add a function for execution
l.Execute(func() {
// Processing..
})
如果您想查看整个文件,我在github上创建了要点,因此您可以访问this链接或从下面的github复制代码。
感谢您阅读本文,希望它能帮助您解决问题或学习新的东西。