| 1。 | Modelo de concurrencia
| 2。 | Goroutines y canales
| 3。 | Wait Groups
| 4。 | 选择y工人池
| 5。 | errgroup(错误组)
选择什么?
我们到达第四期,在这种情况下,我们将谈论保留的单词kude0和工人池中的小实现(我们将解释一下它是一个工人池)。
选择是一个块,它具有语法,其范围了解在其键内交易的所有内容。它的功能是等待(或等待)到正在执行的Goroutine的多个通信,当满足我们在块中提出的任何条件时,它将发布。换句话说,在我们可以收到多个例程的代码中,我们将使用Select。
基本语法是:
//...
select {
//...
}
导入注意:选择块,要知道它们不会阻止主要例程。
重要注释II:请勿使用空选择。
现在要在我们的select
中添加一些有趣的东西,在块中,我们将等待到达我们调用频道的消息。它的工作类似于switch
clipula,但条件是接收消息的条件,不执行任何类型的模式匹配。
让我们去真正的影响
c1 := make(chan string)
c2 := make(chan string)
// enviamos el primer mensaje.
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
// enviamos el primer mensaje.
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
// con este select esperamos dos veces, uno por cada
// canal. Otra práctica común es tener un for infinito
// en una rutina separada para asi no bloquear a la
// principal.
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
在这里,我们看到在每种情况下,我们希望如何阅读每个频道的消息。一旦发生,让我们离开块。
现在,另一个示例在我们的服务中用作A daemon ,该应用程序每次运行,并且在执行应用程序时,永远。
import "time"
//
//
go func() {
ticker := time.NewTicker(1 * time.Hour)
for {
select {
case <-ticker.C:
//doSomething()
}
}
}()
此代码向我们保证,我们每小时都会调用我们指出的内容。股票是一个倒计时,为我们提供了时间套件,这些情况非常âigetil。
选择的摘要
- 阻止执行它的例程。
- 当它进入案例时,您将取消其他人,直到选择再次执行(这就是为什么在无限的{}中看到它如此普遍的原因。
- 他们通常会进入goroutine启动的异常功能。
工人池,儿子?
我们知道,在这种情况下,池是储备。现在,在我们的代码中,我们必须确定3件清晰的事物。 1)池2)工人和3)要完成的工作。
目的是我们可以同时处理工作,即结束工作的工人,回到游泳池并尝试节省资源,此外尝试更有效地工作。
让我们去一点。
type Unit interface {
Job() error
}
我们定义了我们的工作单位。我们可以创建其他更多界面,并使其更加 polyma 。我们不必担心谁实施它,它将是客户的工作。
var jobQueue chan Unit // cola de trabajo compartida
type worker struct {
pool chan chan Unit
jobCh chan Unit
}
func newWorker(pool chan chan Unit) *worker {
return &worker{
jobCh: make(chan Unit),
pool: pool,
}
}
我们已经有一个工人属于游泳池并听取工作单位的渠道。池的双通道是用于注册,另一个用于激活它。
另一个功能,简单的构建器的表演。
type pool struct {
pool chan chan Unit
workers int
}
func NewPool(maxWorkers int) *pool {
return &pool{
pool: make(chan chan Unit, maxWorkers),
workers: maxWorkers,
}
}
现在,我们拥有池结构,其中包含与工人相同的渠道以及我们要创建多少工人的渠道。
我们有骨骼,现在我们全面进行动作。
func (d *pool) Run() {
for i := 0; i < d.workers; i++ {
w := newWorker(d.pool)
w.start()
}
go d.dispatch()
}
可以看到一些有趣的东西,对于我们指出的每个工人,我们都将创建一个,分配我们已经拥有的池并开始的池,让我们看看它是什么。
func (w *worker) start() {
go func() {
for {
// register the actual worker in the queue.
w.pool <- w.jobCh
select {
case job := <-w.jobCh:
// do the actual job here
err := job.Job()
if err != nil {
log.Println(err.Error())
}
}
}
}()
}
在整个开始中,我们将注册工人,然后使用Select剪辑,我们将等待工作到达并做到这一点(不知道它是什么,但我们不在乎)。所有这些都在工人的新例程中。
func (d *pool) dispatch() {
go func() {
for {
select {
case job, ok := <-jobQueue:
if ok {
jobChannel := <-d.pool
jobChannel <- job
}
}
}
}()
}
这是最后的,每次从池中获取工作的工作负载时,Kouude4将分配工作量,而该工作(在工作渠道中隐含)分配了工作。作业队列是共享资源。
顾客
func main() {
// init the shared data structure.
splanner.InitQueue(20)
// init the dispatcher & keep it listening.
splanner.NewPool(15).Run()
s := http.Server{}
s.Addr = ":8080"
http.HandleFunc("/jobs", JobHandler)
log.Fatal(s.ListenAndServe())
}
func JobHandler(w http.ResponseWriter, r *http.Request) {
log.Println("getting job...")
q := r.URL.Query().Get("q")
if q == "" {
q = "default"
}
for a := 0; a < 100; a++ {
work := HeavyWork{Name: q, number: a}
splanner.AddUnit(&work)
_, _ = w.Write([]byte(fmt.Sprintf("job %s %d done", work.Name, a)))
}
}
type HeavyWork struct {
Name string `json:"name"`
number int
}
func (p *HeavyWork) Job() error {
time.Sleep(500 * time.Millisecond)
fmt.Println(fmt.Sprintf("heavy job is running %d", p.number))
return nil
}
as as simple是对客户端的,您必须使用Job()函数实现单元接口,并使用Addunit导出功能添加工作单元,以使工作放大。在kude5中,我们看到设置非常微不足道。
el Codigo完成,disponible en github
结论
我们看到选择使我们能够协调世界并与游泳池结合,能够派遣例程,获得Misma等的力量和灵活性。
让我们不要忘记,您必须研究它,正确使用它,在许多情况下尝试它,以免在生产中引起非愉悦的怀疑。
ya知道,如果您想参加Sonsourearme,Abioqian9