介绍
在该系列的这一部分中,我们将继续使用Golang与通道的并发功能。在上一篇文章中,我们涵盖了GO例程和等待小组的基本面。通过利用这些理解的概念,我们将探索渠道以在各种GO例程之间传达数据。
什么是频道
golang频道就像一条管道,可以让goroutines通信。它使您可以将值从一个goroutine传递到另一个gor。频道是键入的,即您使用chan
关键字声明它们,然后将要发送和接收的类型(例如chan int
)。 chan
类型指定将通过通道传递的值类型。我们将很快探讨详细的技术。现在,我们只需要关注解决方案的问题。
在上一篇文章中,我们使用GO例程和等待组,使我们能够异步处理任务。但是,如果我们想在过程之间访问数据,则必须调整核心功能或可能需要全局变量,但是,在现实世界中,环境受到了很大的限制。我们将需要一种在这些GO例程之间传达数据的方法。频道是为此而制作的(不仅如此),但本质上,它解决了确切的问题。
package main
import (
"fmt"
)
func main() {
ch := make(chan string)
defer close(ch)
go func() {
message := "Hello, Gophers!"
ch <- message
}()
msg := <-ch
fmt.Println(msg)
}
在上面的代码示例中,ch
是类型string
创建的,并将消息发送到GO例程中的频道(ch <- message
),并从通道中以<-ch
的方式检索消息。
$ go run main.go
Hello, Gophers!
通道具有两个关键属性:
-
<-ch
将读取从频道ch
接收的值,而ch<-val
将把val
发送到channelch
。 - 发送和接收操作块,直到双方都准备就绪(即。这允许Goroutines同步而无需明确的锁或条件变量。
- 键入频道,因此只能发送并接收指定类型的值。这提供了类型的安全性。
package main
import (
"fmt"
)
func main() {
ch := make(chan string)
go func() {
message := "Hello, Gophers!"
ch <- message
}()
fmt.Println(<-ch)
fmt.Println(<-ch)
}
在同一示例中,如果我们尝试添加第二个接收器,即<-ch
,它将永远导致僵局/块,因为没有发送第二个消息到频道中。只有一个值,即“你好地鼠!”被以message
的形式发送到频道,第一个接收器作为<-ch
接收到,但是在下一个接收器中,没有发件人。
$ go run main.go
Hello, Gophers!
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
/home/meet/code/100-days-of-golang/scripts/channels/main.go:16 +0x125
exit status 2
在未封闭的频道中总结僵局概念:
- 主要的goroutine在第二次接收操作中等待第二封永远不会到达的消息(永远不会发送)。
- 匿名Goroutine正在等待某人从频道阅读,以便继续发送第二个消息。
缓冲通道
在Go中,您可以同时创建缓冲和无封闭的通道。一个未封闭的通道没有持有数据的能力,它依赖于发件人和接收器之间的立即通信。但是,您可以在使用Make功能时指定容量来创建一个缓冲通道,例如ch := make(chan int, 5)
将创建一个具有5
的通道,即它可以存储一定数量的值而无需立即接收器。缓冲通道允许您在无需立即接收器的情况下向通道发送多个值,最高可容纳其容量。之后,它将阻塞直到接收器检索值。
package main
import (
"fmt"
"sync"
)
func main() {
buffchan := make(chan int, 2)
wg := sync.WaitGroup{}
wg.Add(2)
for i := 1; i <= 2; i++ {
go func(n int) {
buffchan <- n
wg.Done()
}(i)
}
wg.Wait()
close(buffchan)
for c := range buffchan {
fmt.Println(c)
}
}
$ go run channels.go
1
2
$ go run channels.go
2
1
在此代码段中,我们创建了一个容量为2的缓冲通道CH,我们向频道发送了两个值,即使没有立即接收器,代码也不会阻止。如果我们要发送第三个值,它将导致僵局,因为没有接收器可以释放缓冲区中的空间。
关闭渠道
关闭通道对于向接收者发出信号很重要,表明将不再发送数据。它使用内置关闭功能实现了。关闭通道后,任何进一步发送数据的尝试都会引起恐慌。在接收方中,如果关闭频道并且没有更多数据可接收,则接收操作将对频道类型产生零值。
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch)
}()
for num := range ch {
fmt.Println("Received:", num)
}
}
在此示例中,Goroutine将数字发送到通道,然后将其关闭。主要例程使用范围循环接收这些数字。当通道关闭并接收所有值时,循环将自动终止。请记住,只有发件人才能关闭频道,以指示接收器不要等待来自频道的更多值。
选择频道语句
选择语句用于处理多个通道。可以在Select Block中使用案例语句检查一些操作。
案例 | 频道操作 |
---|---|
发送 | chan < - value |
接收 | <-Chan |
因此,我们可以检查带有案例语句的频道的发件人或接收器,就像开关语句一样。
package main
import (
"fmt"
"sync"
)
func sendMessage(ch chan string, message string, wg *sync.WaitGroup) {
defer wg.Done()
ch <- message
}
func main() {
var wg sync.WaitGroup
ch1 := make(chan string, 2)
ch2 := make(chan string, 2)
wg.Add(2)
go sendMessage(ch1, "Hello, Gophers!", &wg)
go sendMessage(ch2, "Hello, Hamsters!", &wg)
go func() {
defer wg.Done()
wg.Wait()
close(ch1)
close(ch2)
}()
ch1 <- "new message to c1"
ch2 <- "new message to c2"
select {
case <-ch1:
fmt.Println("Received from ch1")
case ch1 <- "new message to c1":
fmt.Println("Sent to ch1")
case <-ch2:
fmt.Println("Received from ch2")
case ch2 <- "new message to c2":
fmt.Println("Sent to ch2")
}
}
$ go run channels.go
Sent to ch1
$ go run simple.go
Received from ch1
$ go run simple.go
Received from ch2
$ go run simple.go
Sent to ch2
$ go run simple.go
Received from ch1
无法保证消息的顺序,首先根据GO例程执行的操作只会记录。
在上面的简单示例中,我们创建了两个通道ch1
和ch2
,并使用两个GO例程向他们发送了两条消息。然后,主要例程等待从频道接收消息。发送发送后,我们关闭频道,并简单地检查4个案例,即在频道1上发送的发送,在频道1上接收,同样是频道2。因此,这就是我们可以使用Select语句检查哪个操作的方式在不同的渠道上执行,这构成了渠道之间通信的基础。使用频道时,我们会变得更加轻松。
下面是测试哪个url
或Web服务器首先响应请求的示例。
package main
import (
"fmt"
"net/http"
"sync"
)
func pingGoogle(c chan string, wg *sync.WaitGroup) {
defer wg.Done()
res, _ := http.Get("http://google.com")
c <- res.Status
}
func pingDuckDuckGo(c chan string, wg *sync.WaitGroup) {
defer wg.Done()
res, _ := http.Get("https://duckduckgo.com")
c <- res.Status
}
func pingBraveSearch(c chan string, wg *sync.WaitGroup) {
defer wg.Done()
res, _ := http.Get("https://search.brave.com")
c <- res.Status
}
func main() {
gogChan := make(chan string)
ddgChan := make(chan string)
braveChan := make(chan string)
var wg sync.WaitGroup
wg.Add(3)
go pingDuckDuckGo(ddgChan, &wg)
go pingGoogle(gogChan, &wg)
go pingBraveSearch(braveChan, &wg)
openChannels := 3
go func() {
wg.Wait()
close(gogChan)
close(ddgChan)
close(braveChan)
}()
for openChannels > 0 {
select {
case msg1, ok := <-gogChan:
if !ok {
openChannels--
} else {
fmt.Println("Google responded:", msg1)
}
case msg2, ok := <-ddgChan:
if !ok {
openChannels--
} else {
fmt.Println("DuckDuckGo responded:", msg2)
}
case msg3, ok := <-braveChan:
if !ok {
openChannels--
} else {
fmt.Println("BraveSearch responded:", msg3)
}
}
}
}
上面的示例显示了如何使用SELECT语句等待多个频道准备就绪,然后再进行下一个操作。在此示例中,我们可以获取首先发送响应的频道,即在这种情况下,首先对Ping响应了哪种搜索引擎。只是有点夸张的例子,但有助于理解select
语句的概念。
$ go run select-chan.go
DuckDuckGo responded: 200 OK
Google responded: 200 OK
BraveSearch responded: 200 OK
$ go run select-chan.go
DuckDuckGo responded: 200 OK
BraveSearch responded: 200 OK
Google responded: 200 OK
让我们打破每个步骤:
-
pingDuckDuckGo(ddgChan, &wg)
是一种将数据发送到ChannelddgChan
的方法。 -
pingGoogle(gogChan, &wg)
是将数据发送到频道gogChan
的方法。 -
pingBraveSearch(braveChan, &wg)
是一种将数据发送到ChannelbraveChan
的方法。 - 我们等待每个GO例程使用
wg.Wait()
结束并关闭频道。 - 最后,我们关闭了
gogChan
,ddgChan
和braveChan
的频道,以使用Select Case Block从频道中拾取数据。 - 选择情况将选择准备接收数据的第一个渠道。因此,我们根据频道首先响应的顺序获得输出。
- 我们使用
!ok
条件检查通道是否关闭,我们有一个openChannels
变量以跟踪开放通道的数量,如果没有频道打开,我们只需脱离无限环路即可。<<<<<<<<<<<<<<<<<<<<<<<<<<<<< /li>
方向通道
渠道也可以被指定为“仅发送”或“仅接收”,以强制执行某些沟通模式并提高安全性。这是通过定义通道类型时指定方向来完成的。
package main
import (
"fmt"
"sync"
)
func receiver(ch <-chan int, wg *sync.WaitGroup) {
for i := range ch {
fmt.Println("Received:", i)
}
wg.Done()
}
func sender(ch chan<- int, wg *sync.WaitGroup) {
for i := 0; i < 10; i++ {
fmt.Println("Sent:", i)
ch <- i
}
close(ch)
wg.Done()
}
func main() {
ch := make(chan int)
wg := sync.WaitGroup{}
wg.Add(2)
go receiver(ch, &wg)
go sender(ch, &wg)
wg.Wait()
}
在上面的示例中,我们创建了一个通道ch
并使用两次例程向其发送了10个值。主要例程是在关闭频道之前等待Goroutines完成。 sender
通过9
发送值0
,每当收到一个值时,receiver
都会打印。在sender
方法中,我们仅接受将数据发送为chan<-
的频道,在receiver
方法中,频道参数仅从频道读取为<-chan
。
$ go run send-recv.go
Sent: 0
Received: 0
Sent: 1
Sent: 2
Received: 1
Received: 2
Sent: 3
Sent: 4
Received: 3
Received: 4
Sent: 5
Sent: 6
Received: 5
Received: 6
Sent: 7
Sent: 8
Received: 7
Received: 8
Sent: 9
Received: 9
当我们将参数定义为仅写入通道时,意味着该函数只能将数据发送到该通道中。它无法从中读取数据或关闭数据。当您要确保该功能完全负责生成数据而不消耗或与频道当前状态进行交互时,此模式很有帮助。
当我们将参数定义为纯通道时,这意味着该函数只能从该频道接收数据。它不能关闭频道或将数据发送到其中。当我们要确保函数仅在不修改频道或干扰发件人的逻辑的情况下消耗来自频道的数据时,此模式很有用。
。此外,编译器将捕获代码,试图在仅阅读频道上发送或从仅写的频道接收。
通道使用模式
有多种方法可以在GO中使用频道。在本节中,我们将探讨一些在GO中使用频道的最常见模式。一些最有用的惯用通道使用模式包括管道,风扇和风扇,等等
异步等待频道的模式
在Go中,Goroutines和频道可实现优雅的异步/等待风格。 Goroutine可以异步执行任务,而主线程则使用频道等待结果。
GO中的异步 - wawat模式涉及同时启动多个任务,每个任务都与自己的goroutine一起启动,然后在继续之前等待其完成。渠道用于在这些goroutines之间进行交流,使它们能够独立工作,并在准备就绪时为主要例程提供结果。
package main
import (
"fmt"
"net/http"
)
func fetchURL(url string, ch chan<- http.Response) {
go func() {
res, err := http.Get(url)
if err != nil {
panic(err)
}
defer res.Body.Close()
ch <- *res
}()
}
func task(name string) {
fmt.Println("Task", name)
}
func main() {
fmt.Println("Start")
url := "http://google.com"
respCh := make(chan http.Response)
fetchURL(url, respCh)
task("A")
task("B")
response := <-respCh
fmt.Println("Response Status:", response.Status)
fmt.Println("Done")
}
$ go run async.go
Start
Task A
Task B
Response Status: 200 OK
Done
在上面的示例中,我们创建了一个函数fetchURL
,该函数将URL和通道作为参数。频道respCh
用于在Goroutines之间进行通信。该函数启动了获取请求的goroutine,我们将GET
请求发送到提供的URL,并将响应发送到提供的频道。在主要函数中,我们通过接收频道的数据作为<-respCh
访问response
。在执行此操作之前,我们可以同时执行任何其他任务,例如task("A")
和task("B")
,它们只是打印一些字符串(可能是任何内容)。但这应该是在我们从频道中撤入之前,访问后的任何内容将被阻止,即将依次执行。
通道的管道模式
管道模式用于将一系列处理阶段链接在一起,每个阶段都消耗输入,处理数据并将输出传递到下一阶段。可以通过将不同的通道从一个GO例程到另一种方式来实现这种类型的模式。
因此,使用通道的管道模式在GO中,数据依次通过多个阶段流动:阶段1读取输入并发送到通道A,阶段2从通道A接收并发送到通道B,然后阶段3从通道B和产生最终输出。
package main
import (
"fmt"
"sync"
)
func generate(nums []int, out chan<- int, wg *sync.WaitGroup) {
fmt.Println("Stage 1")
for _, n := range nums {
fmt.Println("Number:", n)
out <- n
}
close(out)
wg.Done()
}
func square(in <-chan int, out chan<- int, wg *sync.WaitGroup) {
fmt.Println("Stage 2")
for n := range in {
sq := n * n
fmt.Println("Square:", sq)
out <- sq
}
close(out)
wg.Done()
}
func print(in <-chan int, wg *sync.WaitGroup) {
for n := range in {
fmt.Println(n)
}
wg.Done()
}
func main() {
input := []int{1, 2, 3, 4, 5}
var wg sync.WaitGroup
wg.Add(3)
stage1 := make(chan int)
stage2 := make(chan int)
go generate(input, stage1, &wg)
go square(stage1, stage2, &wg)
go print(stage2, &wg)
wg.Wait()
}
在上面的示例中,我们创建了一系列处理阶段,每个阶段都消耗输入,处理数据并将输出传递到下一阶段。我们可以分别将功能generate
,square
和print
视为阶段1
,2
和3
。
- 生成函数,将输入作为整数片,一个未封闭的通道和WaitGroup B参考,该函数基本上迭代了切片中的数字,并将其发送到参数中提供的通道。 li>
- Square函数在stage1频道中采用了stage1频道以及其自己的频道为
stage2
(请记住,stage1
通道已通过生成功能发送了数字)。 - 正方形函数然后在
stage1
时从频道发送的数字迭代,并将其正常并将其发送到作为stage2
作为out
频道提供的通道。 - 打印函数作为参数输入stage2频道,并迭代从频道
stage2
发送的数字并打印出来。
$ go run pipeline.go
Stage 1
Number: 1
Stage 2
Square: 1
1
Number: 2
Number: 3
Square: 4
Square: 9
Number: 4
4
9
Square: 16
16
Number: 5
Square: 25
25
因此,我们可以看到执行的顺序,两个管道都同步启动,但是,仅当从上一个频道发送数据时,它们才能执行操作。我们首先从generate
函数打印number
,然后在square
函数中打印平方值,最后在打印函数中将其打印为Square: value
。
频道的粉丝模式
粉丝范围的模式用于将来自多个源的数据组合到单个流中,以进行统一处理,通常使用共享数据结构来汇总数据。我们可以通过将多个输入通道合并到单个输出通道中来创建风扇in模式。
粉丝模式是同时读取多个输入通道(a,b,c),并将其数据合并到单个输出通道(M)时。
package main
import (
"fmt"
"io/ioutil"
"sync"
)
func readFile(file string, ch chan<- string) {
content, _ := ioutil.ReadFile(file)
fmt.Println("Reading from", file, "as :: ", string(content))
ch <- string(content)
close(ch)
}
func merge(chs ...<-chan string) string {
var wg sync.WaitGroup
out := ""
for _, ch := range chs {
wg.Add(1)
go func(c <-chan string) {
for s := range c {
out += s
}
wg.Done()
}(ch)
}
wg.Wait()
return out
}
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go readFile("data/f1.txt", ch1)
go readFile("data/f2.txt", ch2)
merged := merge(ch1, ch2)
fmt.Println(merged)
}
在上面的示例中,readFile
函数读取文件的内容,并将其从不同的GO例程中发送到ch1
和ch2
。 readFile
仅发送频道,仅发送读取文件并将内容发送到频道的频道作为ch <- string(content)
。 merge
函数在2
中采用它也可以是n
频道数量,从如...<-chan
所示,它在每个频道上迭代,对于每个频道,它读取内容,并将其添加为单个字符串。
$ go run fan-in.go
Reading from data/f1.txt as :: This is from file 1
Reading from data/f2.txt as :: This is from file 2
This is from file 1
This is from file 2
$ go run fan-in.go
Reading from data/f2.txt as :: This is from file 2
Reading from data/f1.txt as :: This is from file 1
This is from file 2
This is from file 1
因此,这就是扇形模式的工作方式,我们创建多个通道并将结果组合到单个数据流中(在此示例中,单个字符串)。
频道的扇出模式
粉丝出口模式涉及从单个源获取数据并将其分配给多个工人或处理单元以进行并行或并发处理。 Fan-Out Design将输入通道拆分为多个输出通道,用于在并发过程中分布工作分支或数据。
粉丝范围的模式是将来自单个输入通道(a)的数据分配到并行处理的多个工人通道(x,y,z)时。
package main
import (
"fmt"
"os"
"sync"
)
func readFile(file string, ch chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
content, err := os.ReadFile(file)
if err != nil {
fmt.Printf("Error reading from %s: %v\n", file, err)
return
}
ch <- string(content)
}
func main() {
files := []string{"data/f1.txt", "data/f2.txt"}
var wg sync.WaitGroup
ch := make(chan string)
for _, f := range files {
wg.Add(1)
go readFile(f, ch, &wg)
}
go func() {
wg.Wait()
close(ch)
}()
var fileData []string
for content := range ch {
fileData = append(fileData, content)
}
fmt.Printf("Read %d files\n", len(fileData))
fmt.Printf("Contents:\n%s", fileData)
}
在上面的示例中,我们创建一个单个通道ch
作为单个源,我们在所有文件上循环,并创建呼叫readFile
函数的GO例程。 readFile
函数带有文件名,频道和waitgroup引用,该函数读取文件并将内容发送到频道为ch <- content
。 readFile
同时称为所有文件,在这里我们将任务的粉丝分解为多个GO例程,然后在主函数中,我们在频道上迭代并接收内容。
$ go run fan-out.go
Read 2 files
Contents:
[This is from file 2
This is from file 1
]
$ go run fan-out.go
Read 2 files
Contents:
[This is from file 1
This is from file 2
]
这是提供的示例中的粉丝范围模式的简短摘要:
- 使用goroutines同时读取多个文件。这是“粉丝”的作品。
-
readFile
函数在goroutine中运行以分别处理每个文件。 - WaitGroup协调Goroutines。
- 共享的频道CH从每个Goroutine收集结果。
- 主要的goroutine从通道读取并汇总结果。
- 渠道已关闭并范围为清晰地收集结果。
我还有更多的模式可以证明,这些模式已在100 days of Golang存储库上的GitHub中提供。
Mr-Destructive / 100-days-of-golang
Golang系列100天的脚本和资源
100 Days of Go lang
Go lang is a programming language which is easier to write and even provides type and memory safety, garbage collection and structural typing. It is developed by the Google Team in 2009 and open sourced in 2012. It is a programming language made for Cloud native technologies and web applications. It is a general purpose programming lanugage and hence there are no restrictions on the type of programs you wish to make in it.
学习golang
的资源- Exercism - Go track
- Go by example
- Go Time - Podcast
- Boot.dev Golang Course
- Zetcode Blog
- daily.dev Golang articles
一些著名的应用程序!
Web应用程序 | devops工具 | CLI工具 |
---|---|---|
SoundCloud-音乐系统 | Prometheus-监视系统和时间序列数据库 | gh-cli-官方GitHub Cli |
Uber-乘车共享/驾驶室预订webApp |
就是这一系列的第31部分,示例的所有源代码均在100 days of Golang存储库的github中链接。
参考
结论
因此,从本系列的这一部分中,我们能够理解Golang渠道的基础知识。通过使用以前的帖子中的核心概念(例如GO例程和等待组),我们能够使用Golang的频道。我们使用带有频道的并发概念为不同模式编写了一些示例。在本节中探索了管道,风扇,扇出,异步,异步以及某些通道的某些陈述的模式。
。希望,如果您有任何评论或反馈,请发现本节有帮助,请在评论部分或我的社交手柄中告诉我。谢谢您的阅读。快乐编码:)