如何限制同时运行goroutines的数量并等待其完成
#go #goroutine

目录

有时有一项任务可以限制同时运行goroutines的数量以符合某些限制。例如,按CPU内核的数量或所消耗的内存等等。

启动goroutines

首先,让我们看看如何运行goroutines。让我们在一个循环中运行5个gorutins,我们通过time.Sleep()模拟Goroutine的工作,我们将将延迟设置为i * 100ms。需要字符串i := i来本地化迭代循环计数器的值,没有此,每个goroutine都会获得值5。阅读更多herehere

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("START main")
    for i := 0; i < 5; i++ {
        i := i
        go func() {
            delay := time.Millisecond * time.Duration(100*(i+1))
            fmt.Printf("  START #%d (delay %v)\n", i, delay)
            time.Sleep(delay)
            fmt.Printf("  END #%d (delay %v)\n", i, delay)
        }()
    }
    fmt.Println("END main")
}

输出:

START main
END main

Program exited.

Playground

它遵循程序的输出,即执行并非单个goroutine。发生这种情况是因为程序的执行(函数main())在for循环完成后立即结束,因此在执行后,所有运行的goroutines几乎都立即完成。

延迟完成main() 的延迟开始goroutines1111

让我们看看如果程序结束之前引入了一些延迟,例如250ms。

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("START main")
    for i := 0; i < 5; i++ {
        i := i
        go func() {
            delay := time.Millisecond * time.Duration(100*(i+1))
            fmt.Printf("  START #%d (delay %v)\n", i, delay)
            time.Sleep(delay)
            fmt.Printf("  END #%d (delay %v)\n", i, delay)
        }()
    }
    // Let's add some delay
    time.Sleep(time.Millisecond * 250)
    fmt.Println("END main")
}

输出:

START main
  START #4 (delay 500ms)
  START #0 (delay 100ms)
  START #1 (delay 200ms)
  START #2 (delay 300ms)
  START #3 (delay 400ms)
  END #0 (delay 100ms)
  END #1 (delay 200ms)
END main

Program exited.

Playground

您可以看到,在main()函数添加延迟后,将执行两个Goroutines(持续时间为100ms和200ms)。如果延迟增加到500ms或更多,则所有Goroutines都将有时间执行。

等待所有Goroutines 的完成

在现实生活中,很少能确定Goroutines的执行时间,因此,等待所有Goroutines的完成,您可以使用sync模块中的WaitGroup。来自documentation模块:

等待组等待着一系列Goroutines完成。主goroutine调用添加以设置要等待的goroutines数量。然后,每个goroutines在完成后运行并完成了呼叫。同时,可以使用等待来阻止所有goroutines完成。

在我们的示例中,我们将在Goroutine的每个开始之前调用Add(1),而Done()通过defer完成。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    fmt.Println("START main")
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        i := i
        wg.Add(1)
        go func() {
            defer wg.Done()
            delay := time.Millisecond * time.Duration(100*(i+1))
            fmt.Printf("  START #%d (delay %v)\n", i, delay)
            time.Sleep(delay)
            fmt.Printf("  END #%d (delay %v)\n", i, delay)
        }()
    }
    wg.Wait()
    fmt.Println("END main")
}

输出:

START main
  START #4 (delay 500ms)
  START #2 (delay 300ms)
  START #3 (delay 400ms)
  START #1 (delay 200ms)
  START #0 (delay 100ms)
  END #0 (delay 100ms)
  END #1 (delay 200ms)
  END #2 (delay 300ms)
  END #3 (delay 400ms)
  END #4 (delay 500ms)
END main

Program exited.

Playground

我们看到所有goroutines都有时间完成,但是尚未解决所有Goroutines的同时发射的问题。

限制同时运行goroutines 的数量

为了限制同时运行goroutines的数量,可以使用缓冲通道。缓冲通道的功能是,在填充缓冲区后,下一个对通道的写入将被阻塞,直到从通道中读取至少一个值。

常数grMax包含同时执行的goroutines的数量。频道缓冲区完整后,执行将在命令ch <- 1上被阻止,并在使用命令<-ch读取后将继续。最好放置从defer中的频道读取的命令,以便当gorutin中发生错误时,整个程序不会悬挂。

package main

import (
    "fmt"
    "sync"
    "time"
)

const grMax = 3

func main() {
    fmt.Println("START main")
    var wg sync.WaitGroup
    ch := make(chan int, grMax)
    for i := 0; i < 5; i++ {
        i := i
        wg.Add(1)
        ch <- 1
        go func() {
            defer func() { wg.Done(); <-ch }()
            delay := time.Millisecond * time.Duration(100*(i+1))
            fmt.Printf("  START #%d (delay %v)\n", i, delay)
            time.Sleep(delay)
            fmt.Printf("  END #%d (delay %v)\n", i, delay)
        }()
    }
    wg.Wait()
    fmt.Println("END main")
}

输出:

START main
  START #2 (delay 300ms)
  START #0 (delay 100ms)
  START #1 (delay 200ms)
  END #0 (delay 100ms)
  START #3 (delay 400ms)
  END #1 (delay 200ms)
  START #4 (delay 500ms)
  END #2 (delay 300ms)
  END #3 (delay 400ms)
  END #4 (delay 500ms)
END main

Program exited.

Playground

您可以看到,立即启动了3个Goroutines,#0#1#2,然后,完成其中一个(#0)后,下一个启动(#3)等。

解决问题。