Go语言 Channel
基础
为什么需要Channel
在介绍goroutine的时候有提到过多个goroutine会产生竞争关系,如果使用不得当,轻则无法正确更新变量,重则挂掉程序。我们可以通过原子函数和互斥锁来解决竞争关系,但是还有一种更有趣的方法,那就是使用Channel。
当一个资源需要在goroutine之间共享时,Channel在goroutine之间架起了一个管道,并提供了 确保同步交换数据的机制。
定义Channel
//无缓冲的整型通道
unbuffered := make(chan int)
//有缓冲的字符串通道
buffered := make(chan string, 10)
从Channel里收发数据
//通过通道发送一个字符串
buffered <- 1
//从通道接收一个字符串
value := <- buffered
无缓冲的Channel
无缓冲的Channel必须有接受者,如果没有接受者(worker)就会报错。
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int) {
for {
fmt.Printf("Worker %d received %d\n",
id, <-c)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
//Channel的接受者
go worker(id, c)
return c
}
func chanDemo() {
//建立Channel的slice (Channel是一等公民)
var channels [10]chan<- int
//建立10个Channel和接受者
for i := 0; i < 10; i++ {
channels[i] = createWorker(i)
}
//传入值
for i := 0; i < 10; i++ {
channels[i] <- 'a' + i
}
//传入值
for i := 0; i < 10; i++ {
channels[i] <- 'A' + i
}
//程序持续1 Millisecond
time.Sleep(time.Millisecond)
}
func main() {
fmt.Println("Channel as first-class citizen")
chanDemo()
}
注意在把Channel当做函数参数使用的时候,可以定义Channel的数据走向chan<- int
,这样Channel就只能接受数据。
有缓冲的Channel
有缓冲的Channel就算没有接受者,只要没有超过预先设置的缓冲,就能保存传入的值。
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int) {
for {
fmt.Printf("Worker %d received %c\n",
id, <-c)
}
}
func bufferedChannel() {
c := make(chan int, 3)
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
time.Sleep(time.Millisecond)
}
func main() {
fmt.Println("Buffered channel")
bufferedChannel()
}
如果不加worker,程序会报错,但是如果把c <- 'd'
也注释掉,程序时不会报错的。
关闭Channel
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int) {
for {
fmt.Printf("Worker %d received %c\n",
id, <-c)
}
}
func channelClose() {
c := make(chan int)
go worker(0, c)
c <- 'a'
c <- 'b'
c <- 'c'
c <- 'd'
close(c)
time.Sleep(time.Millisecond)
}
func main() {
fmt.Println("Channel close and range")
channelClose()
}
在传完4个值之后关闭了Channel,但是worker还在无限循环并且往外传Channel。但是由于已经没有值了,在1 毫秒的时间内,会生成下面的结果。
Channel close and range
Worker 0 received a
Worker 0 received b
Worker 0 received c
Worker 0 received d
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
Worker 0 received
如何对应这个问题可以有2种写法。
写法1
func worker2(id int, c chan int) {
for {
n, ok := <-c
if !ok {
break
}
fmt.Printf("Worker %d received %c\n",
id, n)
}
}
写法2
func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
}
}
如何等待Goroutine
在之前的代码里面都有使用time.Sleep(time.Millisecond)
来等待Channel的结束。但是这种方法的危险肉眼可见。所以我们需要修改一下。
修改点:
在执行打印的时候,不仅接受一个Channel的int,同时在打印完成后,对完传出一个done
由于有一对chan,将这一对chan提出来做一个struct
在Createworker的时候用worker构造体传值
同时在创建10个worker的时候也用构造体创建构造体的slice
在向内部传值后,同时加入接受done的处理
package main
import (
"fmt"
)
func doworker(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
done <- true
}
}
type worker struct {
in chan int
done chan bool
}
func createWorker(id int) worker {
w := worker{
in: make(chan int),
done: make(chan bool),
}
go doworker(id, w.in, w.done)
return w
}
func chanDemo() {
var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
<-workers[i].done
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
<-workers[i].done
}
}
func main() {
fmt.Println("Channel as first-class citizen")
chanDemo()
}
这个程序跑出来的结果如下:
orker 0 received a
Worker 1 received b
Worker 2 received c
Worker 3 received d
Worker 4 received e
Worker 5 received f
Worker 6 received g
Worker 7 received h
Worker 8 received i
Worker 9 received j
Worker 0 received A
Worker 1 received B
Worker 2 received C
Worker 3 received D
Worker 4 received E
Worker 5 received F
Worker 6 received G
Worker 7 received H
Worker 8 received I
Worker 9 received J
可以看到程序并没有并发,而是传进去一条,然后等待done,然后再传下一条。如果并发效果,使用goroutine就没有了意义。所以这里需要继续改:
func chanDemo() {
var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
for i, worker := range workers {
worker.in <- 'a' + i
}
for i, worker := range workers {
worker.in <- 'A' + i
}
for _, worker := range workers {
<-worker.done
<-worker.done
}
}
把<-worker.done也单独拿出来接收,但是这样其实是会报错的。程序只会打印出第一段小写的for,原因是所有done的接受都放在了最后,而Channel的是传值是阻塞式的,第一段for的Channel接受完值,但是没有收到done,而第二段done却已经开始执行要传值了,导致报错。这里有一种非常tricky的改法,就是把doworker函数里的done改为goroutine。
func doworker(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
go func() {
done <- true
}()
}
}
最后自己管理goroutine的结束实在太累了,所以go语言提供了一种叫做sync.WaitGroup
的语法糖。
WaitGroup的用法:
定义WaitGroup本身:
var wg sync.WaitGroup
定义需要等待的goroutine数量:
wg.Add(20)
等待:
wg.Wait()
在goroutine执行完成后done:
w.done()
在这个程序里还有一些需要说的点:
WaitGroup作为参数的时候一定要用指针
这里利用函数式编程重构了一下worker构造,把done设置为函数,createworker的时候定义函数同时传入dowork里。
package main
import (
"fmt"
"sync"
)
func doWork(id int,
w worker) {
for n := range w.in {
fmt.Printf("Worker %d received %c\n",
id, n)
w.done()
}
}
type worker struct {
in chan int
done func()
}
func createWorker(
id int, wg *sync.WaitGroup) worker {
w := worker{
in: make(chan int),
done: func() {
wg.Done()
},
}
go doWork(id, w)
return w
}
func chanDemo() {
var wg sync.WaitGroup
var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wg)
}
wg.Add(20)
for i, worker := range workers {
worker.in <- 'a' + i
}
for i, worker := range workers {
worker.in <- 'A' + i
}
wg.Wait()
}
func main() {
chanDemo()
}
应用
Go语言有句名言:
Don't communicate by sharing memory, share memory by communicating.
比如我们判断前后程序是否处理是否完成的时候经常设置一个flag变量,然后通过检测这个变量来进行判断,这就是所谓通过共享内存进行通信。但是go语言更提倡通过使用Channel,通过通信来共享内存。
其实说说直接一点就是作者是推荐使用Channel而不是Mutex的。但是还是要分场景,详细可以参照这篇文章。
利用Select
假如你需要从多个Channel接受值,但是你不知道哪一个Channel会先发数据给你。那你可以使用Select语句来完成此类需求。
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
out <- i
i++
}
}()
return out
}
func main() {
var c1, c2 = generator(), generator()
for {
select {
case n := <-c1:
fmt.Println("Received from c1: ", n)
case n := <-c2:
fmt.Println("Received from c1: ", n)
}
}
}
执行结果:
Received from c1: 0
Received from c2: 0
Received from c2: 1
Received from c1: 1
Received from c1: 2
Received from c2: 2
Received from c1: 3
^Csignal: interrupt
假如我想把c1和c2的值传给之前写的worker。可以这样写。
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
out <- i
i++
}
}()
return out
}
func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func main() {
w := createWorker(0)
var c1, c2 = generator(), generator()
for {
n := 0
select {
case n = <-c1:
fmt.Println("Received from c1: ", n)
case n = <-c2:
fmt.Println("Received from c2: ", n)
case w <- n:
}
}
}
但是这样写有个问题,由于generator需要等待,所以在一开始的时候,会走很多遍case w <- n
同时输出0。所以我们在传给worker之前,需要检查n是否为空。
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
out <- i
i++
}
}()
return out
}
func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func main() {
worker := createWorker(0)
var c1, c2 = generator(), generator()
n := 0
hasValue := false
for {
// nil chan永远不会被select到
var activeworker chan<- int
if hasValue {
activeworker = worker
}
select {
case n = <-c1:
fmt.Println("Received from c1: ", n)
hasValue = true
case n = <-c2:
fmt.Println("Received from c2: ", n)
hasValue = true
case activeworker <- n:
hasValue = false
}
}
}
这样就对了,这里有个知识点是nil chan永远不会被select到。用var activeworker chan<- int
定义的chan是nil,所以在最后的case activeworker <- n
的时候可以用来作为判断条件。
这里还有一个点需要考虑,现在我们的chan是从c1或者c2传进worker,但是中途经过int变量n,如果worker处理的速度过慢,而c1,c2传入过快,会导致n被覆盖从而无法传入全部的值到worker之中。
个人还想过为什么不直接把c1,c2传给worker,但是worker是个Channel类型,c1,c2也是Channel类型,所以不能直接
worker<-c1
这个时候我们需要把c1,c2传进来的数据储存起来。
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
out <- i
i++
}
}()
return out
}
func worker(id int, c chan int) {
for n := range c {
// 让worker故意慢一点
time.Sleep(time.Second)
fmt.Printf("Worker %d received %d\n",
id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func main() {
var c1, c2 = generator(), generator()
var worker = createWorker(0)
// 建立一个切片来储存数据
var values []int
// time.After在指定时间后往返回Channel你传入时间
tm := time.After(10 * time.Second)
// time.Tick在指定间隔往返回Channel你传入时间
tick := time.Tick(time.Second)
for {
var activeWorker chan<- int
// 定义有效的值
var activeValue int
if len(values) > 0 {
activeWorker = worker
// 永远拿队列第一个值
activeValue = values[0]
}
select {
case n := <-c1:
values = append(values, n)
case n := <-c2:
values = append(values, n)
case activeWorker <- activeValue:
// 将队列第二个值往前移动一位
values = values[1:]
case <-time.After(800 * time.Millisecond):
fmt.Println("timeout")
case <-tick:
fmt.Println(
// 每隔一秒查看队列长度
"queue len =", len(values))
case <-tm:
fmt.Println("bye")
return
}
}
}
Last updated
Was this helpful?