从实战理解 Golang 的多并发能力

  • A+
所属分类:安全博客

Go 多并发能力

从实战理解 Golang 的多并发能力

首先定义一个工作 数5只 🐑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"time"
)

func main() {
count(5, "🐑")
}

func count(n int, animal string) {
for i := 0; i < n; i++ {
fmt.Println(i+1, animal)
time.Sleep(time.Millisecond * 500) // 每隔半秒输出一次字符串
}
}

+++++++++++++
$ go run main.go
1 🐑
2 🐑
3 🐑
4 🐑
5 🐑

Millisecond(毫秒), Microsecond(微秒)

把另一份工作交给这个程序:数5头 🐮

1
count(5, "🐮")

程序会按顺序执行:

1
2
3
4
5
6
7
8
9
10
11
$ go run main.go
1 🐑
2 🐑
3 🐑
4 🐑
5 🐑
1 🐮
2 🐮
3 🐮
4 🐮
5 🐮

Goroutine 协程

可以看做请求一个轻量的线程:

1
go count(5, "🐑")

使用 goroutine 来执行 count 会快很多:

从实战理解 Golang 的多并发能力

但是当我们再用 go 关键词创建一个 goroutine 来执行第二个 count:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"time"
)

func main() {
go count(5, "🐑")
go count(5, "🐮")
}

func count(n int, animal string) {
for i := 0; i < n; i++ {
fmt.Println(i+1, animal)
time.Sleep(time.Millisecond * 500)
}
}

然而什么也没有输出

从实战理解 Golang 的多并发能力

原因

  • 在一个用 golang 编写的程序中,当主函数运行完毕后,这个程序就结束了:不管你创建了多少个 goroutine,goroutine 里又创建了多少个 goroutine,他们都会被终止

从实战理解 Golang 的多并发能力

并不现实的解决办法

我们可以掐时间进行等待:

运行时间是一定的 数羊需要2.5秒,数牛需要1.5秒,所以我们可以在主函数结束前等3秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"fmt"
"time"
)

func main() {
go count(5, "🐑")
go count(3, "🐮")
time.Sleep(time.Second * 1)
}

func count(n int, animal string) {
for i := 0; i < n; i++ {
fmt.Println(i+1, animal)
time.Sleep(time.Microsecond * 500)
}
}

但是在绝大多数情况下我们并不知道我们写的代码需要多长时间来执行

因此 golang 为我们提供了一个简单的工具:WaitGroup

WaitGroup

WaitGroup 在Go 语言自带的 sync 包裹中,引用它可以用来追踪我们有多少任务还没完成

接下来有两个任务要执行,所以我们这里调用 waitgroup.Add(2)

1
2
var wg sync.WaitGroup
wg.Add(2)

每当我们执行完一个任务后,我们希望把 WaitGroup 这个计数器的值减一

所以我们创建一个匿名函数把 count 函数包裹起来,然后在 count 函数执行完后运行一下 wg.Done()

1
2
3
4
go func() {
count(5, "🐑")
wg.Done()
}()

匿名函数就是创建一个没有名字的函数然后立马调用它

好处:把上面两行作为一个整体丢给 groutine 让它运行

这个匿名函数要做的事情就是执行完一个任务更新一下计数器 说:“嘿,我这个任务完成啦”

现在我们就可以在 main 函数结束前用我们 waitgroup.Wat 方法来让 main 函数原地听命

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
count(5, "🐑")
wg.Done()
}()
go func() {
count(3, "🐮")
wg.Done()
}()
wg.Wait()
}

func count(n int, animal string) {
for i := 0; i < n; i++ {
fmt.Println(i+1, animal)
time.Sleep(time.Millisecond * 500)
}
}

就这样通过 waitgroup我们保证了我们的两个 goroutine 都能运行完毕,而且不耽误额外的时间

但是现在这两个 goroutine 就是各干各的没有任何交流,像极了我们单排打游戏遇到的队友一样

如果我们现在想知道一共数了多少只动物,就需要两个数动物的 groutine 进行交流

使用普通的 Counter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
count(5, "🐑")
wg.Done()
}()
go func() {
count(3, "🐮")
wg.Done()
}()
wg.Wait()
fmt.Println(counter)
}

var counter int

func count(n int, animal string) {
for i := 0; i < n; i++ {
fmt.Println(i+1, animal)
counter = counter + 1 //每数一只动物 counter+1
time.Sleep(time.Millisecond * 500)
}
}

这样在程序结束的最后会输出 8

这样会带来一个问题:

从实战理解 Golang 的多并发能力

同时操作一块内存会导致这样的错误:

从实战理解 Golang 的多并发能力

所以 Go 引入了 Communicating Sequential Processes - 通信顺序进程

从实战理解 Golang 的多并发能力

Channel

Channel 就是 Go 语言中不同 goroutine 交流的途径

特性

  • Channel 是双向的:你可以随时对一个 Channel 喊话,也可以随时收听一个 Channel
  • 往 Channel 里发送一条消息,和从 Channel 里收听一条消息,都会阻塞代码的运行

我们可以用这个阻塞的特性来同步我们的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
c := make(chan string)

go count(5, "🐑", c)
for {
message := <-c //从 channel 里收听一条消息
fmt.Println(message)
}
}

func count(n int, animal string, c chan string) {
for i := 0; i < n; i++ {
c <- animal //每数一只动物向 channel 里面喊话
time.Sleep(time.Millisecond * 500)
}
}

运行结果是这样的:

从实战理解 Golang 的多并发能力

错误原因

  • 主函数这边一直在等着 c 这个 Channel 的另一边传来消息,但 count 函数已经结束了
  • 也就是对我们这个程序来说,再也不会有新的数据传进 c 这个 Channel

判断是否继续从 Channel 接收消息

作为消息的发送人 我们需要负起关闭 Channel 的责任

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"time"
)

func main() {
c := make(chan string)

go count(5, "🐑", c)
for {
message, open := <-c //从 Channel 里收听一条消息
if !open {
break
} //接收方通过这个布尔值来判断我们是不是要继续从这个 Channel 接收消息
fmt.Println(message)
}
}

func count(n int, animal string, c chan string) {
for i := 0; i < n; i++ {
c <- animal //每数一只动物向 Channel 里面喊话
time.Sleep(time.Millisecond * 500)
}
close(c) //关闭 Channel
}

这次就不会报错啦

从实战理解 Golang 的多并发能力

使用 Range 可以更简便一些,如果 Channel 关闭了 for 循环就会直接退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
c := make(chan string)

go count(5, "🐑", c)
for message := range c {
fmt.Println(message)
}
}

func count(n int, animal string, c chan string) {
for i := 0; i < n; i++ {
c <- animal //每数一只动物向 Channel 里面喊话
time.Sleep(time.Millisecond * 500)
}
close(c) //关闭 Channel
}

那如何从多个 Channel 里接收消息呢?

当我们把一个任务分成多个子任务的时候,可能每一个子任务都有一个单独的 Channel 专门和那一个子任务通讯

这些 Channel 随时都可能传来消息,我们怎么去管理他们呢?

Channel 阻塞特性导致 For 循环管理 Channel 存在的问题

我们先尝试用 for 循环每次从 c1接收一下 再从 c2接收一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"time"
)

func main() {
c1 := make(chan string)
c2 := make(chan string)

go func() {
for {
c1 <- "🐑"
time.Sleep(time.Millisecond * 500) //每间隔0.5秒往c1的 Channel 里发送消息
}
}()
go func() {
for {
c2 <- "🐮"
time.Sleep(time.Millisecond * 2000) //每间隔2秒往c2的 Channel 里发送消息
}
}()

for {
fmt.Println(<-c1)
fmt.Println(<-c2)
}
}

可以看到 🐑 和 🐮 是相间打印的

从实战理解 Golang 的多并发能力

可因为c1 和 c2的间隔不一样 应该是每打印四只 🐑 才打印一只 🐮

这里就是 Channel 的阻塞特性:

  • 往 Channel 里发送一条消息,和从 Channel 里收听一条消息,都会阻塞代码的运行

从实战理解 Golang 的多并发能力

这个阻塞原理是这样的:

  1. 当c1接收到消息后,距离 c2收到消息还要等一段时间,所以只能在这里等着,什么也做不了
  2. 而在这个等待的时间里第一个 goroutine 又一次要发送消息了
  3. 但是它也得在那里等着,因为没有人从 c1里去接收它的消息
  4. 所以我们这个程序就浪费了很多时间,在那里干等着

Select

Go 提供了 select 语句,里面可以有很多 case,它会看看这里面哪个不阻塞可以被直接运行的,他就会选择哪个来运行,且只会运行一个。

当有多个 Channel 发送消息的时候,我们就可以使用 select 语句来保证我们可以在第一时间从有新消息的 Channel 里收到数据,而不收不同 Channel 消息频率不同的影响:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"time"
)

func main() {
c1 := make(chan string)
c2 := make(chan string)

go func() {
for {
c1 <- "🐑"
time.Sleep(time.Millisecond * 500) //每间隔0.5秒往c1的 Channel 里发送消息
}
}()
go func() {
for {
c2 <- "🐮"
time.Sleep(time.Millisecond * 2000) //每间隔2秒往c2的 Channel 里发送消息
}
}()

for {
select {
case msg := <-c1:
fmt.Println(msg)
case msg := <-c2:
fmt.Println(msg)
}
}
}

这样就成功不阻塞地对多个 Channel 进行了管理:

从实战理解 Golang 的多并发能力

多线程并发对文件进行递归搜索

基础版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"io/ioutil"
"time"
)

var query = ".DS_Store"
var matches int //定义一个计数器

func main() {
start := time.Now() //记录当前时间
search("/Users/lintstar/Tools/")
fmt.Println(matches, "matches")
fmt.Println(time.Since(start))
}

func search(path string) {
files, err := ioutil.ReadDir(path) //首先看一下当前路径有什么文件或者文件夹
if err == nil { //没有错误的话进行循环
for _, file := range files {
name := file.Name()
if name == query {
matches++
}
if file.IsDir() { //如果是文件夹的话进行递归搜索
search(path + name + "/")
}
}
}
}

运行结果

1
2
3
$ go run main.go
405 matches
12.765213877s

搜索并输出当前 .DS_Store 文件路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"io/ioutil"
"time"
)

var query = ".DS_Store"
var matches int //定义一个计数器

func main() {
start := time.Now() //记录当前时间
search("/Users/lintstar/Xstark/")
fmt.Println(matches, "matches")
fmt.Println(time.Since(start))
}

func search(path string) {
files, err := ioutil.ReadDir(path) //首先看一下当前路径有什么文件或者文件夹
if err == nil { //没有错误的话进行循环
for _, file := range files {
name := file.Name()
if name == query {
matches++
fmt.Println(path + name)
}
if file.IsDir() { //如果是文件夹的话进行递归搜索
search(path + name + "/")
}
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ go run main.go
/Users/lintstar/Xstark/.DS_Store
/Users/lintstar/Xstark/.idea/.DS_Store
/Users/lintstar/Xstark/XstarkCore/.DS_Store
/Users/lintstar/Xstark/XstarkCore/Scan/.DS_Store
/Users/lintstar/Xstark/XstarkCore/Scan/sub/.DS_Store
/Users/lintstar/Xstark/XstarkCore/Scan/sub/result/.DS_Store
/Users/lintstar/Xstark/XstarkCore/Scan/vul/.DS_Store
/Users/lintstar/Xstark/XstarkCore/Scan/vul/xray/.DS_Store
/Users/lintstar/Xstark/XstarkCore/Tools/.DS_Store
/Users/lintstar/Xstark/static/.DS_Store
/Users/lintstar/Xstark/static/images/.DS_Store
/Users/lintstar/Xstark/static/js/.DS_Store
/Users/lintstar/Xstark/static/js/tools/.DS_Store
/Users/lintstar/Xstark/templates/.DS_Store
/Users/lintstar/Xstark/templates/tools/.DS_Store
15 matches
32.558533ms

Goroutine 版

我们对搜索顺序并没有要求 只要不漏搜和 结果没有误差就 OK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"fmt"
"io/ioutil"
"time"
)

var query = ".DS_Store"
var matches int //定义一个计数器

var workerCount = 0 //定义👷🏻 数量
var maxWorkerCount = 32 //定义最大 👷🏻 数量

var searchRequest = make(chan string) //指派任务
var workerDone = make(chan bool) //任务完成
var foundMatch = make(chan bool) //传输找到搜索结果的消息

func main() {
start := time.Now() //记录当前时间
workerCount = 1
go search("/Users/lintstar/Tools/", true)
waitForWorkers()
fmt.Println(matches, "matches")
fmt.Println(time.Since(start))
}

func waitForWorkers() {
for {
select {
case path := <-searchRequest:
workerCount++
go search(path, true)
case <-workerDone:
workerCount--
if workerCount == 0 {
return
}
case <-foundMatch:
matches++
}
}
}

func search(path string, master bool) { //在 goroutine 运行时,结束告诉主函数运行完毕
files, err := ioutil.ReadDir(path) //首先看一下当前路径有什么文件或者文件夹
if err == nil { //没有错误的话进行循环
for _, file := range files {
name := file.Name()
if name == query {
foundMatch <- true //找到了就向这个 Channel 中喊话:我找到了!
fmt.Println(path + name)
}
if file.IsDir() { //如果是文件夹的话进行递归搜索
if workerCount < maxWorkerCount { //如果有空闲工人就请工头指派
searchRequest <- path + name + "/"
} else {
search(path+name+"/", false)
}
}
}
if master {
workerDone <- true //如果是运行在 goroutine 里面的运行结束喊话:我干完了
}
}
}

406个匹配大约 1.8秒 是基础版12秒的大概 1/6

从实战理解 Golang 的多并发能力

FROM : lintstar.top , Author: 离沫凌天๓

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: