Go 多并发能力
首先定义一个工作 数5只 🐑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
package mainimport ( "fmt" "time " ) func main () { count(5 , "🐑" ) } func count (n int , animal string ) { for i := 0 ; i < n; i++ { fmt.Println(i+1 , animal) time .Sleep(time.Millisecond * 500 ) } } +++++++++++++ $ go run main.go 1 🐑2 🐑3 🐑4 🐑5 🐑
Millisecond(毫秒), Microsecond(微秒)
把另一份工作交给这个程序:数5头 🐮
程序会按顺序执行:
1 2 3 4 5 6 7 8 9 10 11
$ go run main.go 1 🐑2 🐑3 🐑4 🐑5 🐑1 🐮2 🐮3 🐮4 🐮5 🐮
Goroutine 协程
可以看做请求一个轻量的线程:
使用 goroutine 来执行 count 会快很多:
但是当我们再用 go 关键词创建一个 goroutine 来执行第二个 count:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
package mainimport ( "fmt" "time" ) func main () { go count(5 , "🐑" ) go count(5 , "🐮" ) } func count (n int , animal string ) { for i := 0 ; i < n; i++ { fmt.Println(i+1 , animal) time.Sleep(time.Millisecond * 500 ) } }
然而什么也没有输出
原因
在一个用 golang 编写的程序中,当主函数运行完毕后,这个程序就结束了:不管你创建了多少个 goroutine,goroutine 里又创建了多少个 goroutine,他们都会被终止
并不现实的解决办法
我们可以掐时间进行等待:
运行时间是一定的 数羊需要2.5秒,数牛需要1.5秒,所以我们可以在主函数结束前等3秒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
package mainimport ( "fmt" "time" ) func main () { go count(5 , "🐑" ) go count(3 , "🐮" ) time.Sleep(time.Second * 1 ) } func count (n int , animal string ) { for i := 0 ; i < n; i++ { fmt.Println(i+1 , animal) time.Sleep(time.Microsecond * 500 ) } }
但是在绝大多数情况下我们并不知道我们写的代码需要多长时间来执行
因此 golang 为我们提供了一个简单的工具:WaitGroup
WaitGroup
WaitGroup
在Go 语言自带的 sync
包裹中,引用它可以用来追踪我们有多少任务还没完成
接下来有两个任务要执行,所以我们这里调用 waitgroup.Add(2)
1 2
var wg sync.WaitGroupwg.Add(2 )
每当我们执行完一个任务后,我们希望把 WaitGroup
这个计数器的值减一
所以我们创建一个匿名函数把 count
函数包裹起来,然后在 count
函数执行完后运行一下 wg.Done()
1 2 3 4
go func () { count(5 , "🐑" ) wg.Done() }()
匿名函数就是创建一个没有名字的函数然后立马调用它
好处:把上面两行作为一个整体丢给 groutine 让它运行
这个匿名函数要做的事情就是执行完一个任务更新一下计数器 说:“嘿,我这个任务完成啦”
现在我们就可以在 main
函数结束前用我们 waitgroup
的 .Wat
方法来让 main
函数原地听命
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
package mainimport ( "fmt" "sync" "time" ) func main () { var wg sync.WaitGroup wg.Add(2 ) go func () { count(5 , "🐑" ) wg.Done() }() go func () { count(3 , "🐮" ) wg.Done() }() wg.Wait() } func count (n int , animal string ) { for i := 0 ; i < n; i++ { fmt.Println(i+1 , animal) time.Sleep(time.Millisecond * 500 ) } }
就这样通过 waitgroup
我们保证了我们的两个 goroutine
都能运行完毕,而且不耽误额外的时间
但是现在这两个 goroutine
就是各干各的没有任何交流,像极了我们单排打游戏遇到的队友一样
如果我们现在想知道一共数了多少只动物,就需要两个数动物的 groutine
进行交流
使用普通的 Counter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
package mainimport ( "fmt" "sync" "time" ) func main () { var wg sync.WaitGroup wg.Add(2 ) go func () { count(5 , "🐑" ) wg.Done() }() go func () { count(3 , "🐮" ) wg.Done() }() wg.Wait() fmt.Println(counter) } var counter int func count (n int , animal string ) { for i := 0 ; i < n; i++ { fmt.Println(i+1 , animal) counter = counter + 1 time.Sleep(time.Millisecond * 500 ) } }
这样在程序结束的最后会输出 8
这样会带来一个问题:
同时操作一块内存会导致这样的错误:
所以 Go 引入了 Communicating Sequential Processes - 通信顺序进程
Channel
Channel
就是 Go 语言中不同 goroutine
交流的途径
特性
Channel 是双向的:你可以随时对一个 Channel 喊话,也可以随时收听一个 Channel
往 Channel 里发送一条消息,和从 Channel 里收听一条消息,都会阻塞代码的运行
我们可以用这个阻塞的特性来同步我们的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
package mainimport ( "fmt" "time" ) func main () { c := make (chan string ) go count(5 , "🐑" , c) for { message := <-c fmt.Println(message) } } func count (n int , animal string , c chan string ) { for i := 0 ; i < n; i++ { c <- animal time.Sleep(time.Millisecond * 500 ) } }
运行结果是这样的:
错误原因
主函数这边一直在等着 c
这个 Channel 的另一边传来消息,但 count
函数已经结束了
也就是对我们这个程序来说,再也不会有新的数据传进 c
这个 Channel
判断是否继续从 Channel 接收消息
作为消息的发送人 我们需要负起关闭 Channel 的责任
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
package mainimport ( "fmt" "time" ) func main () { c := make (chan string ) go count(5 , "🐑" , c) for { message, open := <-c if !open { break } fmt.Println(message) } } func count (n int , animal string , c chan string ) { for i := 0 ; i < n; i++ { c <- animal time.Sleep(time.Millisecond * 500 ) } close (c) }
这次就不会报错啦
使用 Range 可以更简便一些,如果 Channel 关闭了 for 循环就会直接退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
package mainimport ( "fmt" "time" ) func main () { c := make (chan string ) go count(5 , "🐑" , c) for message := range c { fmt.Println(message) } } func count (n int , animal string , c chan string ) { for i := 0 ; i < n; i++ { c <- animal time.Sleep(time.Millisecond * 500 ) } close (c) }
那如何从多个 Channel 里接收消息呢?
当我们把一个任务分成多个子任务的时候,可能每一个子任务都有一个单独的 Channel 专门和那一个子任务通讯
这些 Channel 随时都可能传来消息,我们怎么去管理他们呢?
Channel 阻塞特性导致 For 循环管理 Channel 存在的问题
我们先尝试用 for 循环每次从 c1接收一下 再从 c2接收一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
package main import ( "fmt" "time" ) func main () { c1 := make (chan string ) c2 := make (chan string ) go func () { for { c1 <- "🐑" time.Sleep(time.Millisecond * 500 ) } }() go func () { for { c2 <- "🐮" time.Sleep(time.Millisecond * 2000 ) } }() for { fmt.Println(<-c1) fmt.Println(<-c2) } }
可以看到 🐑 和 🐮 是相间打印的
可因为c1 和 c2的间隔不一样 应该是每打印四只 🐑 才打印一只 🐮
这里就是 Channel 的阻塞特性:
往 Channel 里发送一条消息,和从 Channel 里收听一条消息,都会阻塞代码的运行
这个阻塞原理是这样的:
当c1接收到消息后,距离 c2收到消息还要等一段时间,所以只能在这里等着,什么也做不了
而在这个等待的时间里第一个 goroutine
又一次要发送消息了
但是它也得在那里等着,因为没有人从 c1里去接收它的消息
所以我们这个程序就浪费了很多时间,在那里干等着
Select
Go 提供了 select
语句,里面可以有很多 case
,它会看看这里面哪个不阻塞可以被直接运行的,他就会选择哪个来运行,且只会运行一个。
当有多个 Channel 发送消息的时候,我们就可以使用 select
语句来保证我们可以在第一时间从有新消息的 Channel 里收到数据,而不收不同 Channel 消息频率不同的影响:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
package mainimport ( "fmt" "time" ) func main () { c1 := make (chan string ) c2 := make (chan string ) go func () { for { c1 <- "🐑" time.Sleep(time.Millisecond * 500 ) } }() go func () { for { c2 <- "🐮" time.Sleep(time.Millisecond * 2000 ) } }() for { select { case msg := <-c1: fmt.Println(msg) case msg := <-c2: fmt.Println(msg) } } }
这样就成功不阻塞地对多个 Channel 进行了管理:
多线程并发对文件进行递归搜索
基础版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
package mainimport ( "fmt" "io/ioutil" "time" ) var query = ".DS_Store" var matches int func main () { start := time.Now() search("/Users/lintstar/Tools/" ) fmt.Println(matches, "matches" ) fmt.Println(time.Since(start)) } func search (path string ) { files, err := ioutil.ReadDir(path) if err == nil { for _, file := range files { name := file.Name() if name == query { matches++ } if file.IsDir() { search(path + name + "/" ) } } } }
运行结果
1 2 3
$ go run main.go 405 matches 12.765213877s
搜索并输出当前 .DS_Store
文件路径
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
package mainimport ( "fmt" "io/ioutil" "time" ) var query = ".DS_Store" var matches int func main () { start := time.Now() search("/Users/lintstar/Xstark/" ) fmt.Println(matches, "matches" ) fmt.Println(time.Since(start)) } func search (path string ) { files, err := ioutil.ReadDir(path) if err == nil { for _, file := range files { name := file.Name() if name == query { matches++ fmt.Println(path + name) } if file.IsDir() { search(path + name + "/" ) } } } }
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
$ go run main.go /Users/lintstar/Xstark/.DS_Store /Users/lintstar/Xstark/.idea/.DS_Store /Users/lintstar/Xstark/XstarkCore/.DS_Store /Users/lintstar/Xstark/XstarkCore/Scan/.DS_Store /Users/lintstar/Xstark/XstarkCore/Scan/sub/.DS_Store /Users/lintstar/Xstark/XstarkCore/Scan/sub/result/.DS_Store /Users/lintstar/Xstark/XstarkCore/Scan/vul/.DS_Store /Users/lintstar/Xstark/XstarkCore/Scan/vul/xray/.DS_Store /Users/lintstar/Xstark/XstarkCore/Tools/.DS_Store /Users/lintstar/Xstark/static/.DS_Store /Users/lintstar/Xstark/static/images/.DS_Store /Users/lintstar/Xstark/static/js/.DS_Store /Users/lintstar/Xstark/static/js/tools/.DS_Store /Users/lintstar/Xstark/templates/.DS_Store /Users/lintstar/Xstark/templates/tools/.DS_Store 15 matches 32.558533ms
Goroutine 版
我们对搜索顺序并没有要求 只要不漏搜和 结果没有误差就 OK
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
package mainimport ( "fmt" "io/ioutil" "time" ) var query = ".DS_Store" var matches int var workerCount = 0 var maxWorkerCount = 32 var searchRequest = make (chan string ) var workerDone = make (chan bool ) var foundMatch = make (chan bool ) func main () { start := time.Now() workerCount = 1 go search("/Users/lintstar/Tools/" , true ) waitForWorkers() fmt.Println(matches, "matches" ) fmt.Println(time.Since(start)) } func waitForWorkers () { for { select { case path := <-searchRequest: workerCount++ go search(path, true ) case <-workerDone: workerCount-- if workerCount == 0 { return } case <-foundMatch: matches++ } } } func search (path string , master bool ) { files, err := ioutil.ReadDir(path) if err == nil { for _, file := range files { name := file.Name() if name == query { foundMatch <- true fmt.Println(path + name) } if file.IsDir() { if workerCount < maxWorkerCount { searchRequest <- path + name + "/" } else { search(path+name+"/" , false ) } } } if master { workerDone <- true } } }
406个匹配大约 1.8秒 是基础版12秒的大概 1/6
FROM : lintstar.top , Author: 离沫凌天๓
评论