0%

golang-gorouting通信

引言

本文介绍 Golang gorouting 通信的几种机制,了解多协程之间是如何通信的。

Gorouting

Goroutine是Go中最基本的执行单元。相比于线程,线程是进程中一个单位,由物理CPU进行调度,拥有自己的栈空间,共享堆空间。Gorouting是Go的协程实现,在语言层控制,由程序员在代码层显示调度。

Go runtime scheduler

go runtime 会负责goroutine的生老病死,从创建到销毁。Runtime会在程序启动的时候,创建M个线程,创建N个gorouting 都会依附在这M个线程上执行。这就是M:N 模型。

Schedueler 包含三个部分,g,m,p。

  • 全局队列(Global Queue):存放等待运行的 G
  • g 代表gorouting
  • p 代表一个虚拟的processor,它维护一个处于 Runnable 状态的 g 队列。在程序启动时创建,并保存在数组中。
  • m 表示内核线程,包含正在运行的 goroutine 等字段。m线程想运行任务就得获取 P,从 P 的本地队列获取 G,P 队列为空时,M 也会尝试从全局队列拿一批 G 放到 P 的本地队列,或从其他 P 的本地队列偷一半放到自己 P 的本地队列。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。

有关 P 和 M 的个数问题

P 的数量

  • 由启动时$GOMAXPROCS或者由 runtime 的方法GOMAXPROCS()决定。

M 的数量

  • go 语言本身的限制:go 程序启动时,会设置 M 的最大数量,默认 10000. 但是内核很难支持这么多的线程数,所以这个限制可以忽略
  • runtime/debug 中的 SetMaxThreads 函数,设置 M 的最大数量
  • 一个 M 阻塞了,会创建新的 M。

P 和 M 何时会被创建

复用线程:避免频繁的创建、销毁线程,而是对线程的复用。

  • work stealing 机制: 当本线程无可运行的 G 时,尝试从其他线程绑定的 P 偷取 G,而不是销毁线程。
  • hand off 机制:​ 当本线程因为 G 进行系统调用阻塞时,线程释放绑定的 P,把 P 转移给其他空闲的线程执行。

一个gorouting调度流程如下

  1. 创建一个新的gorouting
  2. 放到本地或者共享队列中
  3. M 负责唤醒线程或者创建线程启动goroutine
  4. 循环调度
  5. 尝试或者一个gorouting 执行
  6. 清理,M进入重新循环调度。

通信方式

  • 全局共享变量
  • channel通信
  • Context包

全局共享变量

声明一个全局环境变量, 所有子goroutine共享这个变量,并不断轮询这个变量检查是否有更新。

channel通信

go中有一个数据类型 chan,用于在gorouteines之间消息通信,具备缓存功能。

channel 特性

  • 线程安全:hchan mutex
  • 先进先出:copying into and out of hchan buffer
  • channel的高性能所在:
    • 调用runtime scheduler实现,OS thread不需要阻塞;
    • 跨goroutine栈可以直接进行读写;

channel的使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func main() {
//创建通道
data := make(chan int)
//创建一个等待组
var wg sync.WaitGroup
wg.Add(3)
go func() {
for d := range data {//通过range不断地处理data
fmt.Println(d)
wg.Done()
}
}()

data <- 1//发送要放在接收协程跑起来后面,因为发送后会阻塞等待接收
data <- 2
data <- 3
//等待channel中所有的数据输出
wg.Wait()
close(data)
}

channel类型:无缓冲和缓冲类型

channel有两种形式,一种是有缓冲的,一个协程向这个channel发送了消息后,回阻塞当前这个线程,直到其他协程接受到这个channel。

无缓冲的初始化方式

1
intChan := make(chan int)

缓冲channel的初始化方式:

1
intChan := make(chan int, 3)

channel的几种情况

  • 当写入数据超出缓冲空间会阻塞。
  • 向nil channel写入和读取数据会阻塞

channel一般会和select机制配合,select的运行机制如下:

  • 选取一个可执行不阻塞的case分支,如果多个case分支都不阻塞,会随机算一个case分支执行,和case分支在代码里写的顺序没关系。
  • 如果所有case分支都阻塞,会进入default分支执行。
  • 如果没有default分支,那select会阻塞,直到有一个case分支不阻塞。

channel 通过 mutex(锁)来保证多个 goroutine 来访问 channel 的时候是安全的,它的底层是一个叫做hchan的结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type hchan struct {
//channel分为无缓冲和有缓冲两种。
//对于有缓冲的channel存储数据,借助的是如下循环数组的结构
qcount uint // 循环数组中的元素数量
dataqsiz uint // 循环数组的长度
buf unsafe.Pointer // 指向底层循环数组的指针
elemsize uint16 //能够收发元素的大小


closed uint32 //channel是否关闭的标志
elemtype *_type //channel中的元素类型

//有缓冲channel内的缓冲数组会被作为一个“环型”来使用。
//当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
sendx uint // 下一次发送数据的下标位置
recvx uint // 下一次读取数据的下标位置

//当循环数组中没有数据时,收到了接收请求,那么接收数据的变量地址将会写入读等待队列
//当循环数组中数据已满时,收到了发送请求,那么发送数据的变量地址将写入写等待队列
recvq waitq // 读等待队列
sendq waitq // 写等待队列
lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
}

如上图是channel的底层数据结构。

  • buf: 用来保存goroutine之间传递数据的循环链表
  • sendx和recvx: 用来记录此循环链表当前发送或接收数据的下标值
  • sendq 和 recvq: 用于保存向该chan发送和修改chan接收数据的goroutine的队列
  • lock: 保证channel写入和读取数据时线程安全的锁 。

lock 在给channel发送数据和消费的数据的时候使用,发送数据时,给buf加锁,将数据copy到buf中,sendx++,然后释放对buf的锁。

消费数据的时候,对buf加锁,将buf中的数据copy到变量内存中,recvx++,并释放锁。

可以发现底层是通过hchan结构体的buf,使用copy内存的方式进行通信,最终达到共享内存的目的。这也体现了Go中的CSP并发模型。

  • CSP并发模型:不要以共享内存的方式来通信,相反,要通过通信的方式来共享内存。

channel发送流程

向一个channel 发送数据的时候,流程如下:

  1. 如果接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq中取出G,并把数据写入,最后唤醒G。结束发送过程
  2. 如果缓冲区有空余位置,将数据写入缓冲区,结束发送过程
  3. 如果缓冲区没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读G唤醒。

有goroutine阻塞在channel recv队列时,此时缓存队列为空,则直接将消息发送给reciver gourotine,只产生一次数据copy。

channel 写入流程

向一个channel写入数据的时候,流程如下:

  1. 如果channel上的recveq队列非空的时候,跳过channel的缓冲队列,直接将数据发送给接受的gorouting
    1. 调用sendDirect方法,将待写入的消息发送给接收的goroutine
    2. 释放channel的全局锁
    3. 调用goready函数,将接收消息的goroutine设置成就绪状态,等待调度
  2. 缓存队列未满,则将消息复制到缓存队列上,然后释放全局锁
  3. 缓存队列已满且接收消息队列recv为空,则将当前的goroutine加入到send队列
    1. 获取当前gorouting的sudog,然后加入到channel的sendq队列
    2. 将当前gorouting睡眠。