Fork me on GitHub

图解 Go 并发编程

单线程程序与多线程程序

编程中一种常见的模式是用多个函数来完成一个特定的任务,但只有在程序的前一部分为下一个函数准备好数据时才会调用它们。

这就是我们设立的第一个例子,采矿程序。这个例子中的函数执行:寻矿,挖矿和炼矿。在我们的例子中,矿坑和矿石被表示为个字符串数组,每个函数接收它们并返回一个 处理好的 字符串数组。对于单线程应用程序,程序设计如下。

这里有 3 个主要函数。一个寻矿者,一个矿工和一个冶炼工。在这个版本的程序中,我们的函数在单个线程上运行,一个接一个地运行 - 而这个单线程(名为 Gary 的 gopher)需要完成所有工作。

1
2
3
4
5
6
func main() {
theMine := [5]string{"rock", "ore", "ore", "rock", "ore"}
foundOre := finder(theMine)
minedOre := miner(foundOre)
smelter(minedOre)
}

在每个函数的末尾打印出处理后的 矿石 数组,我们得到以下输出:

1
2
3
From Finder: [ore ore ore]
From Miner: [minedOre minedOre minedOre]
From Smelter: [smeltedOre smeltedOre smeltedOre]

这种编程风格具有易于设计的优点,但是当你想要利用多个线程并执行彼此独立的功能的时候,会发生什么情况?这是并发编程发挥作用的地方。

这种采矿设计更有效率。现在多线程( gopher 们 )独立工作;因此,并不是让 Gary 完成整个行动。有一个 gopher 寻找矿石,一个开采矿石,另一个冶炼矿石,可能全部在同一时间进行。

为了让我们将这种类型的功能带入我们的代码中,我们需要两件事:一种创建独立工作的 gopher 的方法,以及一种让 gopher 们相互沟通(发送矿石)的方法。这就是 Go 并发原语进场的地方:Go 例程和通道。

Go Routines ( 线程 )

Go Routines 可以被认为是轻量级线程。创建 Go Routines 简单到只需要将 go 添加到调用函数的开始。举一个简单的例子,让我们创建两个寻矿函数,使用 go 关键字调用它们,并在他们每次在矿中发现矿石时将其打印出来。

1
2
3
4
5
6
func main() {
theMine := [5]string{“rock”, “ore”, “ore”, “rock”, “ore”}
go finder1(theMine)
go finder2(theMine)
<-time.After(time.Second * 5)
}

以下是我们程序的输出结果:

1
2
3
4
5
6
Finder 1 found ore!
Finder 2 found ore!
Finder 1 found ore!
Finder 1 found ore!
Finder 2 found ore!
Finder 2 found ore!

从上面的输出中可以看到,寻矿者正在同时运行。谁先发现矿石并没有真正的顺序,并且当多次运行时,顺序并不总是相同的。

现在我们有一个简单的方法来建立一个多线程(多 Gopher)程序,但是当我们需要我们独立的 Go Routine 相互通信时会发生什么?

Go Channel ( 通道 )

示例

通道允许 go routines 相互通信。你可把通道理解为一个互相发送和接收的信息管道。

1
myFirstChannel := make(chan string)

Go routines 在通道上 发送接收 ,并通过使用箭头(<-)指向数据走向。

1
2
myFirstChannel <- "hello" // 发送
myVariable := <- myFirstChannel // 接收

现在通过使用一个通道,我们可以让我们的寻找矿石的地鼠,立即将他们发现的东西发送给我们的开采矿石的地鼠,而无需等待发现所有矿石后才进行。

更新例子,将寻找矿石的代码和矿工的功能被设置为匿名函数。重要的是要注意这些 go runtines 是如何使用 oreChan 通道去相互之间传递数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
theMine := [5]string{“ore1”, “ore2”, “ore3”}
oreChan := make(chan string)
// Finder
go func(mine [5]string) {
for _, item := range mine {
oreChan <- item //send
}
}(theMine)
// Ore Breaker
go func() {
for i := 0; i < 3; i++ {
foundOre := <-oreChan //receive
fmt.Println(“Miner: Received “ + foundOre + “ from finder”)
}
}()
<-time.After(time.Second * 5) // Again, ignore this for now
}

在下面的输出中,您可以看到我们的矿工从矿石通道读取了三次,每次收到一块矿石 。

1
2
3
Miner: Received ore1 from finder
Miner: Received ore2 from finder
Miner: Received ore3 from finder

现在我们可以在程序中的不同 Go routines(gophers)之间发送数据。在开始编写带有通道的复杂程序之前,先了解一些通道的属性。

通道阻塞

通道阻塞 go runtines 有很多种情况。这样就允许我们的 go routines 在一段时间内彼此同步,在 go runtines 独立运行之前。

  • 发送端阻塞

一旦一个 go routine( gopher )在一个通道上发送信息,这个发送数据的 go routine 会一直阻塞直到另外一个 go routine 在这个通道上接收到这个发送的数据

  • 接收端阻塞

类似于在通道中发送数据后阻塞,一个 go routine 会被阻塞,当从通道中等待获取一个指,但是这个通道里什么都没有发送。

可能首先对阻塞的概念会有一些迷惑,但是你可以这么去想,它就像两个 go runtines (gophers)之间进行交易。一个 gopher 不管是等着收钱还是打钱,它都要等到另外一个搭档出现才可以交易。

现在我们对 go routine 通过通道通信的时候不同阻塞方式有了一些概念。接下来,让我们讨论两种不同类型的通道 :unbufferedbuffered 。选择使用哪种类型的通道将会决定你的程序如何执行。

无缓冲通道

前面的例子中我们使用的都是无缓冲的通道。它们的与众不同的地方在于一次只能有一条数据填满整个通道。

缓冲通道

在并发程序中,时序并非总是完美。在我们的采矿例子中,我们可能会遇到这种情况: 当开矿小地鼠开采一块矿石的时间里,我们的寻矿小地鼠可以找到三块矿石。为了不让寻矿小地鼠傻等着开矿小地鼠完成工作,我们可以使用一个缓冲通道。现在我们就来生成一个容量为 3 的缓冲通道。

1
bufferedChan := make(chan string, 3)

缓存通道的工作原理跟无缓存通道类似,但是有一点不同的是:可以在需要另一个 go runtine 读取数据之前,将多条数据发送到该通道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bufferedChan := make(chan string, 3)
go func() {
bufferedChan <- "first"
fmt.Println("Sent 1st")
bufferedChan <- "second"
fmt.Println("Sent 2nd")
bufferedChan <- "third"
fmt.Println("Sent 3rd")
}()
<-time.After(time.Second * 1)
go func() {
firstRead := <- bufferedChan
fmt.Println("Receiving..")
fmt.Println(firstRead)
secondRead := <- bufferedChan
fmt.Println(secondRead)
thirdRead := <- bufferedChan
fmt.Println(thirdRead)
}()

我们两个 Go 例程之间的打印顺序是:

1
2
3
4
5
6
7
Sent 1st
Sent 2nd
Sent 3rd
Receiving..
first
second
third

为了简单起见,我们不会在最终程序中使用缓冲通道,但了解并发工具带中可用的通道类型很重要。

注意:使用缓冲通道不会阻止阻塞的发生。例如,如果寻矿 gopher 比矿工快 10 倍,并且它们通过大小为 2 的缓冲通道进行通信,则发现 gopher 仍将在程序中多次阻塞。

匿名的 Go Routines

类似于使用 go 关键字去设置一个函数来运行在自己的 go routine上,我们可以创建一个匿名的函数,使用一下的格式来使其运行在自己的 go routine 上面:

1
2
3
4
// Anonymous go routine
go func() {
fmt.Println("I'm running in my own go routine")
}()

这种匿名函数的方式,如果我们只需要调用这个函数一次,我们可以把匿名函数放到它自己的 go routine 上运行,而不用去创建一个官方函数的申明。

main 函数 是一个 go routine

main 方法确实是单独运行在一个 go routine 上面 !更重要的是要知道:如果 main 函数 一旦有执行结束了,它将关闭所有正在运行的 go routine 。这就是为什么在 main 函数地步有一个定时器 – 它创建了一个通道,并在五秒钟之后,往通道里发送一个值。

1
<-time.After(time.Second * 5) // 5秒钟之后从通道里接收数据

还记得一个 go rountie 如何读取通道数据,直到有发送内容到这个通道为止?通过添加上面的代码,这正是主程序正在发生的事情。 main 方法的 go rountine 将会阻塞,给其他 go routine 额外的 5 秒钟去运行。

现在,这里有更好方式去处理阻塞 main 函数直到其他的 go routine 也执行完成。一个普遍的做法是:创建一个已经完成的信号通道 ,它的主要功能是阻塞等待读取,一旦程执行完成了,就往这个通道里写入一个标识,然后这个已经完成的信号通道会读取到这个标识,这个通道将不会阻塞。从而这个程序就会结束运行。

1
2
3
4
5
6
7
8
9
func main() {
doneChan := make(chan string)
go func() {
// Do some work…
doneChan <- "I’m all done!"
}()

<-doneChan // 一直阻塞,直到有信号写入说明 go routine 执行完成
}
  • 你可以通过 range 关键字来遍历一个通道的数据
    在前面的例子中,我们让我们的矿工从 for 循环中的一个通道读取,经历了 3 次迭代。如果我们不确切知道有多少矿石来自寻矿者,会发生什么 ?可以对集合进行范围遍历搜索。

更行我们之前的矿工的方法,我们这么些:

1
2
3
4
5
6
// Ore Breaker
go func() {
for foundOre := range oreChan {
fmt.Println(“Miner: Received “ + foundOre + “ from finder”)
}
}()

由于矿工需要读取寻矿这发送给他的所有内容,因此通过遍历这个频道确保我们收到所有发送的内容。

注意:遍历一个通道将会阻塞直到这个通道发送了另外的一些数据。阻止 go routine 阻塞的唯一方法是当所有的数据遍历完之后通过 close(channel) 方法来关闭通道。

  • 在通道上进行非阻塞读取
    有一种方法可以让 go routine 在通道上进行非阻塞读取,那就是使用 Go 语言的 select case 结构。通过下面的代码,你的 go routine 将会从通道里读取(如果通道里有数据),或者运行默认的 case 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
myChan := make(chan string)

go func(){
myChan <- "Message!"
}()

select {
case msg := <- myChan:
fmt.Println(msg)
default:
fmt.Println("No Msg")
}
<-time.After(time.Second * 1)

select {
case msg := <- myChan:
fmt.Println(msg)
default:
fmt.Println("No Msg")
}

当上面的代码运行起来的时候,会输出如下的结果:

1
2
No Msg
Message!
  • 可以在通道上进行非阻塞发送
    非阻塞的发送也是使用 select case 结构来执行非阻塞的操作,跟非阻塞读取唯一的区别就是我们的 case 看起来像发送数据而不是接收数据。
1
2
3
4
5
6
select {
case myChan <- “message”:
fmt.Println(“sent the message”)
default:
fmt.Println(“no message sent”)
}

并发编程实现

现在凭借 Go 例程和通道的强大功能,我们可以编写一个程序,使用 Go 的并发原语来充分利用多线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
theMine := [5]string{"rock", "ore", "ore", "rock", "ore"}
oreChannel := make(chan string)
minedOreChan := make(chan string)

// Finder
go func(mine [5]string) {
for _, item := range mine {
if item == "ore" {
oreChannel <- item // 在 oreChannel 上发送东西
}
}
}(theMine)

// Ore Breaker
go func() {
for i := 0; i < 3; i++ {
foundOre := <-oreChannel // 从 oreChannel 上读取
fmt.Println("From Finder: ", foundOre)
minedOreChan <- "minedOre" // 向 minedOreChan 发送
}
}()

// Smelter
go func() {
for i := 0; i < 3; i++ {
minedOre := <-minedOreChan // 从 minedOreChan 读取
fmt.Println("From Miner: ", minedOre)
fmt.Println("From Smelter: Ore is smelted")
}
}()
<-time.After(time.Second * 5)

程序的输出如下:

1
2
3
4
5
6
7
8
9
From Finder:  ore
From Finder: ore
From Miner: minedOre
From Smelter: Ore is smelted
From Miner: minedOre
From Smelter: Ore is smelted
From Finder: ore
From Miner: minedOre
From Smelter: Ore is smelted

与我们原来的例子相比,这是一个很大的改进!现在,我们的每个函数都是独立运行在自己的 Go 例程上的。另外,每一块矿石在处理之后,都会进入我们采矿线的下一个阶段。

参考链接:

坚持原创技术分享,您的支持将鼓励我继续创作!