@adamhand
2019-02-12T12:37:31.000000Z
字数 4110
阅读 1137
无缓冲信道的发送和接收过程是阻塞的;除此之外,还可以创建一个有缓冲(Buffer)的信道。只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。
通过向 make 函数再传递一个表示容量的参数(指定缓冲的大小),可以创建缓冲信道。
ch := make(chan type, capacity)
要让一个信道有缓冲,上面语法中的 capacity 应该大于 0。无缓冲信道的容量默认为 0。
看一个例子:
package mainimport ("fmt""time")func write(ch chan int) {for i := 0; i < 5; i++ {ch <- ifmt.Println("successfully wrote", i, "to ch")}close(ch)}func main() {ch := make(chan int, 2)go write(ch)time.Sleep(2 * time.Second)for v := range ch {fmt.Println("read value", v,"from ch")time.Sleep(2 * time.Second)}}
在上面的程序中,第 16 行在 Go 主协程中创建了容量为 2 的缓冲信道 ch,而第 17 行把 ch 传递给了 write 协程。接下来 Go 主协程休眠了两秒。在这期间,write 协程在并发地运行。write 协程有一个 for 循环,依次向信道 ch 写入 0~4。而缓冲信道的容量为 2,因此 write 协程里立即会向 ch 写入 0 和 1,接下来发生阻塞,直到 ch 内的值被读取。因此,该程序立即打印出下面两行:
successfully wrote 0 to chsuccessfully wrote 1 to ch
打印上面两行之后,write 协程中向 ch 的写入发生了阻塞,直到 ch 有值被读取到。而 Go 主协程休眠了两秒后,才开始读取该信道,因此在休眠期间程序不会打印任何结果。主协程结束休眠后,在第 19 行使用 for range 循环,开始读取信道 ch,打印出了读取到的值后又休眠两秒,这个循环一直到 ch 关闭才结束。所以该程序在两秒后会打印下面两行:
read value 0 from chsuccessfully wrote 2 to ch
该过程会一直进行,直到信道读取完所有的值,并在 write 协程中关闭信道。最终输出如下:
successfully wrote 0 to chsuccessfully wrote 1 to chread value 0 from chsuccessfully wrote 2 to chread value 1 from chsuccessfully wrote 3 to chread value 2 from chsuccessfully wrote 4 to chread value 3 from chread value 4 from ch
当信道被阻塞之后,比如信道为空或者信道满,如果没有改变这种阻塞情况的方式,比如信道为空时没有协程往里写数据,信道为满时没有协程往外读数据,就会发生死锁(deadlock)。
缓冲信道的容量是指信道可以存储的值的数量。我们在使用 make 函数创建缓冲信道的时候会指定容量大小。
缓冲信道的长度是指信道中当前排队的元素个数。
package mainimport ("fmt")func main() {ch := make(chan string, 3)ch <- "naveen"ch <- "paul"fmt.Println("capacity is", cap(ch))fmt.Println("length is", len(ch))fmt.Println("read value", <-ch)fmt.Println("new length is", len(ch))}
该程序会输出:
capacity is 3length is 2read value naveennew length is 1
WaitGroup 用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。假设我们有 3 个并发执行的 Go 协程(由 Go 主协程生成)。Go 主协程需要等待这 3 个协程执行结束后,才会终止。这就可以用 WaitGroup 来实现。
WaitGroup 用于实现工作池。
package mainimport ("fmt""sync""time")func process(i int, wg *sync.WaitGroup) {fmt.Println("started Goroutine ", i)time.Sleep(2 * time.Second)fmt.Printf("Goroutine %d ended\n", i)wg.Done()}func main() {no := 3var wg sync.WaitGroupfor i := 0; i < no; i++ {wg.Add(1)go process(i, &wg)}wg.Wait()fmt.Println("All go routines finished executing")}
当调用 WaitGroup 的 Add 并传递一个 int 时,WaitGroup 的计数器会加上 Add 的传参。要减少计数器,可以调用 WaitGroup 的 Done() 方法。Wait() 方法会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞。
在第 21 行里,传递 wg 的地址是很重要的。如果没有传递 wg 的地址,那么每个 Go 协程将会得到一个 WaitGroup 值的拷贝,因而当它们执行结束时,main 函数并不会知道。
该程序输出:
started Goroutine 2started Goroutine 0started Goroutine 1Goroutine 0 endedGoroutine 2 endedGoroutine 1 endedAll go routines finished executing
工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。
下面的例子使用缓冲信道来实现工作池,工作池的任务是计算所输入的数字的每一位的和。
工作池的核心功能如下:
package mainimport ("fmt""math/rand""sync""time")/*工作和结果两个结构体*/type Job struct {id intrandomno int}type Results struct {job Jobsumofdigits int}/*存放作业和结果的信道*/var jobs = make(chan Job, 10)var results = make(chan Results, 10)/*用来计算结果的函数*/func digits(number int) int {sum := 0no := numberfor no != 0{digit := no % 10sum += digitno /= 10}time.Sleep(100 * time.Millisecond)return sum}/*用来计算的协程,将结果写入Results信道中*/func worker(wg * sync.WaitGroup) {for job := range jobs{output := Results{job, digits(job.randomno)}results <- output}wg.Done()}/*创建工作池*/func createWorkerPool(noOfWorkers int) {var wg sync.WaitGroupfor i := 0; i < noOfWorkers; i++{wg.Add(1)go worker(&wg)}wg.Wait()close(results)}/*创建工作*/func alloate(noOfJobs int) {for i := 0; i < noOfJobs; i++{randomno := rand.Intn(999)job := Job{i, randomno}jobs <- job}close(jobs)}/*打印结果*/func result(done chan bool) {for r := range results{fmt.Printf("job id %d, input num %d, result %d\n", r.job.id, r.job.randomno, r.sumofdigits)}done <- true}func main() {startTime := time.Now()noOfJobs := 100noOfWorkers := 10done := make(chan bool)go alloate(noOfJobs)go result(done)go createWorkerPool(noOfWorkers)<- doneendTime := time.Now()diffTime :=endTime.Sub(startTime)fmt.Println("time costs is ", diffTime.Seconds(), " s")}
程序会打印100行,对应着100个工作,结果如下:
job id 0, input num 878, result 23job id 2, input num 407, result 11job id 1, input num 636, result 15...time costs is 1.0187975 s
问题:在allocate函数中如果for循环结束后,jobs里面的数据worker还来不及取走,这时执行到close,会不会导致works取数据失败?或者取不到足额的任务?
答案: golang 里的 close 只是用于通知信道的接收方,所有数据都已经发送完毕,信道没有真正关闭。
若用 for range 接收数据时,对于关闭了的信道,会接收完剩下的有效数据,并退出循环。如果没有 close 提示数据发送完毕的话,for range 会接收完剩下所有有效数据后发生阻塞。
所以接收方 worker 是可以把 jobs 剩下的数据取走的。后面垃圾收集器会自动回收掉该信道的内存。