[关闭]
@adamhand 2019-02-12T20:37:31.000000Z 字数 4110 阅读 917

golang--缓冲信道和工作池


什么是缓冲信道?

无缓冲信道的发送和接收过程是阻塞的;除此之外,还可以创建一个有缓冲(Buffer)的信道。只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。

通过向 make 函数再传递一个表示容量的参数(指定缓冲的大小),可以创建缓冲信道。

  1. ch := make(chan type, capacity)

要让一个信道有缓冲,上面语法中的 capacity 应该大于 0。无缓冲信道的容量默认为 0。

看一个例子:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func write(ch chan int) {
  7. for i := 0; i < 5; i++ {
  8. ch <- i
  9. fmt.Println("successfully wrote", i, "to ch")
  10. }
  11. close(ch)
  12. }
  13. func main() {
  14. ch := make(chan int, 2)
  15. go write(ch)
  16. time.Sleep(2 * time.Second)
  17. for v := range ch {
  18. fmt.Println("read value", v,"from ch")
  19. time.Sleep(2 * time.Second)
  20. }
  21. }

在上面的程序中,第 16 行在 Go 主协程中创建了容量为 2 的缓冲信道 ch,而第 17 行把 ch 传递给了 write 协程。接下来 Go 主协程休眠了两秒。在这期间,write 协程在并发地运行。write 协程有一个 for 循环,依次向信道 ch 写入 0~4。而缓冲信道的容量为 2,因此 write 协程里立即会向 ch 写入 0 和 1,接下来发生阻塞,直到 ch 内的值被读取。因此,该程序立即打印出下面两行:

  1. successfully wrote 0 to ch
  2. successfully wrote 1 to ch

打印上面两行之后,write 协程中向 ch 的写入发生了阻塞,直到 ch 有值被读取到。而 Go 主协程休眠了两秒后,才开始读取该信道,因此在休眠期间程序不会打印任何结果。主协程结束休眠后,在第 19 行使用 for range 循环,开始读取信道 ch,打印出了读取到的值后又休眠两秒,这个循环一直到 ch 关闭才结束。所以该程序在两秒后会打印下面两行:

  1. read value 0 from ch
  2. successfully wrote 2 to ch

该过程会一直进行,直到信道读取完所有的值,并在 write 协程中关闭信道。最终输出如下:

  1. successfully wrote 0 to ch
  2. successfully wrote 1 to ch
  3. read value 0 from ch
  4. successfully wrote 2 to ch
  5. read value 1 from ch
  6. successfully wrote 3 to ch
  7. read value 2 from ch
  8. successfully wrote 4 to ch
  9. read value 3 from ch
  10. read value 4 from ch

死锁

当信道被阻塞之后,比如信道为空或者信道满,如果没有改变这种阻塞情况的方式,比如信道为空时没有协程往里写数据,信道为满时没有协程往外读数据,就会发生死锁(deadlock)

长度 vs 容量

缓冲信道的容量是指信道可以存储的值的数量。我们在使用 make 函数创建缓冲信道的时候会指定容量大小。

缓冲信道的长度是指信道中当前排队的元素个数。

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func main() {
  6. ch := make(chan string, 3)
  7. ch <- "naveen"
  8. ch <- "paul"
  9. fmt.Println("capacity is", cap(ch))
  10. fmt.Println("length is", len(ch))
  11. fmt.Println("read value", <-ch)
  12. fmt.Println("new length is", len(ch))
  13. }

该程序会输出:

  1. capacity is 3
  2. length is 2
  3. read value naveen
  4. new length is 1

WaitGroup

WaitGroup 用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。假设我们有 3 个并发执行的 Go 协程(由 Go 主协程生成)。Go 主协程需要等待这 3 个协程执行结束后,才会终止。这就可以用 WaitGroup 来实现。

WaitGroup 用于实现工作池

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func process(i int, wg *sync.WaitGroup) {
  8. fmt.Println("started Goroutine ", i)
  9. time.Sleep(2 * time.Second)
  10. fmt.Printf("Goroutine %d ended\n", i)
  11. wg.Done()
  12. }
  13. func main() {
  14. no := 3
  15. var wg sync.WaitGroup
  16. for i := 0; i < no; i++ {
  17. wg.Add(1)
  18. go process(i, &wg)
  19. }
  20. wg.Wait()
  21. fmt.Println("All go routines finished executing")
  22. }

当调用 WaitGroup 的 Add 并传递一个 int 时,WaitGroup 的计数器会加上 Add 的传参。要减少计数器,可以调用 WaitGroup 的 Done() 方法。Wait() 方法会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞。

在第 21 行里,传递 wg 的地址是很重要的。如果没有传递 wg 的地址,那么每个 Go 协程将会得到一个 WaitGroup 值的拷贝,因而当它们执行结束时,main 函数并不会知道。

该程序输出:

  1. started Goroutine 2
  2. started Goroutine 0
  3. started Goroutine 1
  4. Goroutine 0 ended
  5. Goroutine 2 ended
  6. Goroutine 1 ended
  7. All go routines finished executing

工作池的实现

工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。

下面的例子使用缓冲信道来实现工作池,工作池的任务是计算所输入的数字的每一位的和。

工作池的核心功能如下:

  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. )
  8. /*
  9. 工作和结果两个结构体
  10. */
  11. type Job struct {
  12. id int
  13. randomno int
  14. }
  15. type Results struct {
  16. job Job
  17. sumofdigits int
  18. }
  19. /*
  20. 存放作业和结果的信道
  21. */
  22. var jobs = make(chan Job, 10)
  23. var results = make(chan Results, 10)
  24. /*
  25. 用来计算结果的函数
  26. */
  27. func digits(number int) int {
  28. sum := 0
  29. no := number
  30. for no != 0{
  31. digit := no % 10
  32. sum += digit
  33. no /= 10
  34. }
  35. time.Sleep(100 * time.Millisecond)
  36. return sum
  37. }
  38. /*
  39. 用来计算的协程,将结果写入Results信道中
  40. */
  41. func worker(wg * sync.WaitGroup) {
  42. for job := range jobs{
  43. output := Results{job, digits(job.randomno)}
  44. results <- output
  45. }
  46. wg.Done()
  47. }
  48. /*
  49. 创建工作池
  50. */
  51. func createWorkerPool(noOfWorkers int) {
  52. var wg sync.WaitGroup
  53. for i := 0; i < noOfWorkers; i++{
  54. wg.Add(1)
  55. go worker(&wg)
  56. }
  57. wg.Wait()
  58. close(results)
  59. }
  60. /*
  61. 创建工作
  62. */
  63. func alloate(noOfJobs int) {
  64. for i := 0; i < noOfJobs; i++{
  65. randomno := rand.Intn(999)
  66. job := Job{i, randomno}
  67. jobs <- job
  68. }
  69. close(jobs)
  70. }
  71. /*
  72. 打印结果
  73. */
  74. func result(done chan bool) {
  75. for r := range results{
  76. fmt.Printf("job id %d, input num %d, result %d\n", r.job.id, r.job.randomno, r.sumofdigits)
  77. }
  78. done <- true
  79. }
  80. func main() {
  81. startTime := time.Now()
  82. noOfJobs := 100
  83. noOfWorkers := 10
  84. done := make(chan bool)
  85. go alloate(noOfJobs)
  86. go result(done)
  87. go createWorkerPool(noOfWorkers)
  88. <- done
  89. endTime := time.Now()
  90. diffTime :=endTime.Sub(startTime)
  91. fmt.Println("time costs is ", diffTime.Seconds(), " s")
  92. }

程序会打印100行,对应着100个工作,结果如下:

  1. job id 0, input num 878, result 23
  2. job id 2, input num 407, result 11
  3. job id 1, input num 636, result 15
  4. ...
  5. time costs is 1.0187975 s

注意

问题:在allocate函数中如果for循环结束后,jobs里面的数据worker还来不及取走,这时执行到close,会不会导致works取数据失败?或者取不到足额的任务?

答案: golang 里的 close 只是用于通知信道的接收方,所有数据都已经发送完毕,信道没有真正关闭。
若用 for range 接收数据时,对于关闭了的信道,会接收完剩下的有效数据,并退出循环。如果没有 close 提示数据发送完毕的话,for range 会接收完剩下所有有效数据后发生阻塞。
所以接收方 worker 是可以把 jobs 剩下的数据取走的。后面垃圾收集器会自动回收掉该信道的内存。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注