channel的简单使用
在Go语言中,通道(channel)是一种用于在goroutine之间进行通信和同步的机制。下面是一些简单的通道使用示例,以及它们对应的底层函数调用。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int) // 创建通道
go func() {
ch <- 42 // 发送数据到通道
}()
go func() {
value := <-ch // 从通道接收数据
fmt.Println("Received:", value)
}()
time.Sleep(1 * time.Second) // 等待goroutine完成
close(ch) // 关闭通道
}
底层函数调用
创建通道:
ch := make(chan int)
底层调用:
makechan(elemtype, size)
发送数据到通道:
ch <- 42
底层调用:
chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
从通道接收数据:
value := <- ch
底层调用:
chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
关闭通道:
close(ch)
底层调用:
closechan(c *hchan)
这些底层函数是Go运行时系统的一部分,用于实现通道的底层操作。通过这些函数,Go语言能够在goroutine之间进行高效、安全的通信和同步。
chan.go中包含的类
hchan和waitq
chan.go中有两个struct:
- hchan, channel的内部实现
- waitq, hchan中的recvq和sendq的数据结构
type hchan struct {
qcount uint // total data in the queue,队列中总的元素数量
dataqsiz uint // size of the circular queue,环形缓冲区buffer的大小
buf unsafe.Pointer // points to an array of dataqsiz elements,指向buffer中元素的指针
elemsize uint16 // 表示channel中元素的大小
closed uint32 // 是否关闭,0表示该channel没有关闭
timer *timer // timer feeding this chan
elemtype *_type // element type,channel中元素的类型
sendx uint // send index,buffer中应该发送的元素
recvx uint // receive index,buffer中接收元素的位置
recvq waitq // list of recv waiters,接受者队列
sendq waitq // list of send waiters,发送者队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
// waitq由一个双向链表实现,元素类型是指向sudog的指针
type waitq struct {
first *sudog
last *sudog
}
关于sudog的补充介绍
在Go语言中,
sudog
是一个重要的数据结构,用于在goroutine之间进行通信和同步。它是 “Selector User-space Data Object Group” 的缩写,表示在用户空间中用于选择器的数据对象组。
sudog
主要用于实现Go语言中的select
语句和通道(channel)操作。当一个goroutine在等待通道操作(如发送或接收数据)时,它会被转换为一个sudog
结构体,并被添加到相应的等待队列中。这样,调度器可以管理这些等待的goroutine,并在通道操作完成后唤醒它们。以下是
sudog
结构体的一些关键字段:
g *g
:指向当前等待的goroutine。elem unsafe.Pointer
:指向发送或接收的数据元素。c *hchan
:指向正在操作的通道。selectDone uint32
:用于选择器操作的完成标志。ticket uint32
:用于公平调度。
sudog
结构体的具体实现可以在Go语言的源码中找到,通常位于runtime
包中。通过使用sudog
,Go语言的调度器能够高效地管理goroutine的等待和唤醒,从而实现高效的并发编程。总结来说,
sudog
是Go语言运行时系统中的一个关键数据结构,用于管理goroutine在通道操作中的等待和同步。
// src/runtime/runtime2.go
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
// 以下字段受该 sudog 所阻塞的通道的 hchan.lock 保护。
// shrinkstack 依赖这些字段来处理涉及通道操作的 sudogs。
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// 数据元素(可能指向堆栈)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
// 以下字段从未被并发访问。
// 对于通道,waitlink 只被 g 访问。
// 对于信号量,所有字段(包括上述字段)仅在持有 semaRoot 锁时访问。
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
// isSelect 表示 g 正在参与 select,因此 g.selectDone 必须通过 CAS 来赢得唤醒竞争。
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
// success 表示通过通道 c 的通信是否成功。如果 goroutine 是因为值通过通道 c 传递而被唤醒,则为 true;
// 如果是因为 c 被关闭而被唤醒,则为 false。
success bool
// waiters is a count of semaRoot waiting list other than head of list,
// clamped to a uint16 to fit in unused space.
// Only meaningful at the head of the list.
// (If we wanted to be overly clever, we could store a high 16 bits
// in the second entry in the list.)
// waiters 是 semaRoot 等待列表中除列表头外的计数,被限制为 uint16 以适应未使用的空间。
// 仅在列表头有意义。
// (如果我们想过于聪明,我们可以将高 16 位存储在列表的第二个条目中。)
waiters uint16
parent *sudog // semaRoot binary tree // semaRoot 二叉树
waitlink *sudog // g.waiting list or semaRoot // g.waiting 列表或 semaRoot
waittail *sudog // semaRoot
c *hchan // channel // 通道
}
makechan
调用场景
在调用ch := make(chan int,2)
函数的时候,编译之后可以发现他调用了makechan这个函数.
使用的在线的一个查看汇编代码的网站
实现代码
这是makechan的实现:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
// compiler checks this but be safe.
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case !elem.Pointers():
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}
解读
switch部分,分配内存
mem
是一个 uintptr
类型的变量,用于表示通道缓冲区所需的总内存大小。
uintptr
的大小足以容纳任何指针的位模式,因此它可以用来进行指针运算和存储指针地址。在 Go 语言中,
uintptr
通常用于以下几种情况:
- 指针运算:由于
uintptr
可以表示指针的整数形式,因此可以用于指针运算,例如计算指针的偏移量。- 与
unsafe
包结合使用:unsafe
包提供了一些不安全的操作,例如将指针转换为uintptr
或将uintptr
转换为指针。这在进行底层内存操作时非常有用。- 存储指针地址:在某些情况下,需要将指针地址存储在一个整数类型中,这时可以使用
uintptr
。需要注意的是,
uintptr
不是一个指针类型,因此它不会阻止垃圾回收器回收它所表示的内存。如果需要保持对象存活,应该使用实际的指针类型(如*T
),而不是uintptr
。
通过这里的代码得到:mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
mem
是通过将通道元素的大小(elem.Size_
)与通道缓冲区的大小(size
)相乘得到的。这个计算过程是通过调用 math.MulUintptr
函数来完成的。
switch {
case mem == 0:
// Queue or element size is zero.
// 缓冲区buf大小为0,或者通道元素的大小为0
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case !elem.Pointers():
// Elements do not contain pointers.
// 元素不包含指针
// Allocate hchan and buf in one call.
// 通过一次调用分配给hchan和buf内存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
// 元素包含指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
mem == 0
的情况:- 如果
mem
为 0,这意味着通道的缓冲区大小为 0 或者通道元素的大小为 0。 - 在这种情况下,代码调用
mallocgc
函数分配hchanSize
大小的内存,并将返回的内存地址转换为*hchan
类型,赋值给c
。
mallocgc
是 Go 语言运行时系统中的一个内部函数,用于分配内存并进行垃圾回收(GC)标记。由于它是运行时系统的内部函数,通常不会直接在用户代码中使用。相反,用户代码中会使用更高层次的内存分配函数,如new
、make
等,这些函数会在内部调用mallocgc
。其他的内存分配函数还有:
newobject
,也是调用了mallocgc()
它的函数签名如下:
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer
这个函数的参数和返回值解释如下:
size uintptr
:需要分配的内存大小,以字节为单位。typ *_type
:指向类型信息的指针。_type
是 Go 语言中表示类型信息的一个结构体。needzero bool
:一个布尔值,表示是否需要将分配的内存初始化为零。
返回值:
unsafe.Pointer
:返回一个指向新分配内存的unsafe.Pointer
指针。
mallocgc
函数的主要作用是分配指定大小的内存,并根据需要进行零初始化。同时,它还会对分配的内存进行垃圾回收标记,以便垃圾回收器能够正确地管理这些内存。c.buf = c.raceaddr()
这一行是用于竞态检测器(race detector)的同步操作。
- 如果
!elem.Pointers()
的情况:- 如果通道元素不包含指针(即元素是基本类型或不包含指针的结构体),则调用
mallocgc
函数一次性分配hchanSize + mem
大小的内存。 - 这里
hchanSize
是hchan
结构体的大小,mem
是通道缓冲区的大小。 - 分配的内存中,前
hchanSize
字节用于存储hchan
结构体,后面的mem
字节用于存储通道缓冲区。 c.buf = add(unsafe.Pointer(c), hchanSize)
这一行将c.buf
指向缓冲区的起始位置。add
函数用于计算指针的偏移量。
- 如果通道元素不包含指针(即元素是基本类型或不包含指针的结构体),则调用
默认情况(
default
):- 如果通道元素包含指针(即元素是包含指针的结构体),则首先调用
new(hchan)
分配hchan
结构体的内存。 - 然后调用
mallocgc
函数分配mem
大小的内存用于通道缓冲区,并将返回的内存地址赋值给c.buf
。 - 这种情况下,
hchan
结构体和缓冲区是分开分配的。
- 如果通道元素包含指针(即元素是包含指针的结构体),则首先调用
总结来说,这段代码根据通道元素是否包含指针以及缓冲区大小是否为 0,选择不同的内存分配策略。这样可以优化内存使用,并确保在元素包含指针时能够正确地进行垃圾回收。
其他初始化
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
这一段是对c(hchan)的元素大小、元素类型、缓冲区大小、锁进行初始化。
chansend
调用场景
实现代码和解读
chansend1
- 调用
chansend
函数,传递通道、元素指针、阻塞标志(true
)和调用者的程序计数器。
// entry point for c <- x from compiled code.
// 编译代码中 c <- x 的入口点。
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
chansend1 和 chansend 的区别
chansend1
是一个包装函数,用于在编译代码中调用chansend
。它总是以阻塞模式调用chansend
,即block
参数为true
。chansend
是实际执行发送操作的函数,它可以以阻塞或非阻塞模式工作,具体取决于block
参数的值。
chansend的函数签名解读
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
ep unsafe.Pointer
:这是函数的第二个参数。ep
是参数名,unsafe.Pointer
是参数类型。unsafe.Pointer
是一个可以指向任意类型数据的指针类型,通常用于与 Go 的内存管理进行低级别的交互。block bool
:这是函数的第三个参数。block
是参数名,bool
是参数类型。bool
类型表示一个布尔值,可以是true
或false
。这个参数用于指示函数是否应该在无法立即完成发送操作时阻塞。callerpc uintptr
:这是函数的第四个参数。callerpc
是参数名,uintptr
是参数类型。uintptr
是一个无符号整数类型,通常用于表示指针的数值。callerpc
通常用于记录调用者的程序计数器(program counter),以便进行调试或性能分析。
chansend
检查通道是否为nil
- 检查通道是否为
nil
,如果是非阻塞模式则返回false
,否则让当前 goroutine 进入永久睡眠状态。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 通道为空并且不阻塞,直接返回false
if !block {
return false
}
// 能执行到这里表示通道为空并且block==true,则调用gopark
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}
gopark
是 Go 语言运行时系统中的一个内部函数,用于将当前的 goroutine 挂起(park),使其进入等待状态。这个函数通常在需要让出 CPU 时间片或等待某个条件满足时使用。函数签名
gopark
的函数签名如下:func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int)
上面的这些判断被称为 fast path,因为加锁的操作是一个很重的操作,所以能够在加锁之前返回的判断就在加锁之前做好是最好的unlockf
:一个函数,用于在挂起 goroutine 之前解锁某些资源。这个函数的签名是func(*g, unsafe.Pointer) bool
,其中*g
是当前 goroutine 的结构体,unsafe.Pointer
是一个指向任意数据的指针。lock
:一个指向锁的指针,用于在挂起 goroutine 之前解锁。reason
:一个waitReason
类型的值,表示 goroutine 挂起的原因。traceEv
:一个字节值,用于跟踪事件。traceskip
:一个整数值,表示跟踪的跳过层数。使用场景
gopark
通常在以下几种情况下使用:
- 等待通道操作:当 goroutine 在等待通道的发送或接收操作时,如果通道当前不可用,goroutine 会被挂起。
- 等待锁:当 goroutine 尝试获取一个已经被其他 goroutine 持有的锁时,它会被挂起,直到锁被释放。
- 等待条件变量:当 goroutine 在等待某个条件变量满足时,它会被挂起,直到条件变量被通知。
Fast Path,检查通道是否未关闭非阻塞且已满
- 如果通道未关闭且未准备好发送(即通道已满),在非阻塞模式下返回
false
。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 非阻塞,未关闭,通道已满
if !block && c.closed == 0 && full(c) {
return false
}
...
}
full()
也是在runtime2.go中定义的一个函数// full reports whether a send on c would block (that is, the channel is full). // full函数报告发送操作是否会在channel c上阻塞(如果阻塞了,说明channel已满) // It uses a single word-sized read of mutable state, so although // the answer is instantaneously true, the correct answer may have changed // by the time the calling function receives the return value. // 该函数使用了单个字大小的可变状态读取,所以即使答案在某一时刻上正确的,但是正确的答案也可能在调用函数返回值之前改变 func full(c *hchan) bool { // c.dataqsiz is immutable (never written after the channel is created) // c.dataqsiz是不可变的(在创建通道之后不会改变) // so it is safe to read at any time during channel operation. // 所以在channel操作的任何时刻读取都是安全的 if c.dataqsiz == 0 { // dataqsiz表示缓冲区的大小,为0代表无缓冲通道 // Assumes that a pointer read is relaxed-atomic. // 假定指针读取是宽松原子式的 return c.recvq.first == nil // 缓冲区的第一个指针为nil代表该chanel可以接收不会阻塞,反之会阻塞 } // 有缓冲通道的情况 // Assumes that a uint read is relaxed-atomic. // 假定读取一个uint是宽松原子式的 return c.qcount == c.dataqsiz // 如果channel的元素个数等于channel的缓冲区大小说明已满阻塞,反之则有空不会阻塞 }
宽松原子式(Relaxed-atomic):
- 宽松原子式是指在多线程环境中,对某个变量的读取或写入操作是原子的,即操作是不可分割的,不会被其他线程的操作打断。
- 在 Go 语言中,通常假设单个字大小的读取和写入操作是原子的,这意味着读取或写入一个字(通常是 32 位或 64 位)的操作不会被其他线程的操作打断。
- 这种假设简化了并发编程,但需要注意的是,这种原子性是宽松的,因为它不提供顺序或可见性保证
上面的部分是加锁之前的判断,因为加锁是一个很重的操作,所以最好是能不加就能直接判断返回最好,所以有了上面的代码:channel为nil的判断和非阻塞且通道已满的两个判断。
加锁,并判断channel是否关闭
- 获取通道锁。
- 先判断通道是否处于关闭状态,如是解锁并抛出“send on closed channel”异常。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
var t0 int64 // 存储时间戳
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}
如果有等待的接受者
- 如果找到等待的接收者,调用
send
函数直接将值传递给接收者。
如果recvq中存在等待的接受者,说明缓冲区是空的,就可以直接把要发的数据发送。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 存在一个等待的接收者。我们将要发送的值直接传递给接收者,绕过通道缓冲区(如果有的话)。
// send接受一个通道指针、一个等待接收数据的 goroutine 指针、一个数据指针、一个解锁函数和一个用于堆栈跟踪的整数
// 要求通道 c 必须为空并已上锁。send 使用 unlockf 解锁 c。
// sg 必须已经从 c 中出队。
// ep 必须非空并指向堆或调用者的堆栈。
send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 关于send的实现在下面
return true
}
...
}
dequeue
是 Go 语言运行时系统中用于从等待队列(waitq
)中移除并返回一个sudog
结构体的函数。sudog
结构体代表一个正在等待的 goroutine。这个函数的主要目的是从等待队列中安全地移除一个sudog
,并处理一些特殊情况,比如在select
语句中等待的 goroutine。func (q *waitq) dequeue() *sudog { for { sgp := q.first if sgp == nil { return nil } y := sgp.next if y == nil { q.first = nil q.last = nil } else { y.prev = nil q.first = y sgp.next = nil // mark as removed (see dequeueSudoG) } // if a goroutine was put on this queue because of a // select, there is a small window between the goroutine // being woken up by a different case and it grabbing the // channel locks. Once it has the lock // it removes itself from the queue, so we won't see it after that. // We use a flag in the G struct to tell us when someone // else has won the race to signal this goroutine but the goroutine // hasn't removed itself from the queue yet. if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) { continue } return sgp } }
如果缓冲区有空间
- 如果通道缓冲区有空间,将元素入队并解锁。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 执行到这里说明上一条件未满足,即没有等待的接收者
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// channel buffer中有可用空间。将要发送的元素入队。
qp := chanbuf(c, c.sendx) // 获取缓冲区sendx的指针
if raceenabled { // 如果启用了数据竞争检测
racenotify(c, c.sendx, nil) // 这个函数用于通知数据竞争检测系统
}
typedmemmove(c.elemtype, qp, ep) // 将数据复制到缓冲区
// 更新 sendx 索引,如果达到缓冲区大小,则重置为 0(实现循环缓冲区)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++ // 元素个数更新
unlock(&c.lock) // 发送数据完成,解锁
return true
}
...
}
chanbuf
是 Go 语言运行时系统中的一个内部函数,用于获取通道(channel)缓冲区中指定索引位置的元素的指针。这个函数通常在通道的发送和接收操作中使用,以便访问和操作通道缓冲区中的数据。
add
:这是一个内部函数,用于计算地址偏移量。它将缓冲区起始地址c.buf
加上索引位置i
乘以元素大小c.elemsize
,得到指定索引位置的元素的指针。//go:linkname chanbuf func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize)) }
缓冲区已满,如果是非阻塞
- 如果通道缓冲区已满且非阻塞模式,解锁并返回
false
。
上一条件是没有等待的接受者,缓冲区还有空。执行到这里代表缓冲区已满。
然后分阻塞和非阻塞模式的判断,如果是非阻塞的,不会存入发送队列,直接返回false。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if !block {
unlock(&c.lock)
return false
}
...
}
为什么非阻塞的不会存入到发送队列?
- 非阻塞模式的设计目的:
- 非阻塞模式的设计目的是为了在发送操作不能立即完成时,不阻塞调用者。这样可以避免发送操作因为等待缓冲区可用空间而导致的线程阻塞,从而提高程序的响应性和并发性能。
- 直接返回
false
的意义:
- 当通道缓冲区已满且处于非阻塞模式时,直接返回
false
可以让调用者立即知道发送操作失败。调用者可以根据返回值来决定下一步的操作,比如重试发送、丢弃数据或采取其他策略。- 避免不必要的等待:
- 如果非阻塞模式下仍然尝试将数据存入发送队列并等待缓冲区可用空间,这实际上会导致发送操作阻塞,违背了非阻塞模式的设计初衷。
因此,在非阻塞模式下,如果通道缓冲区已满,发送操作不会将数据存入发送队列,而是直接返回
false
,以确保发送操作不会阻塞调用者。
阻塞,创建sudog入队sendq
- 如果需要阻塞,获取当前 goroutine 并创建一个
sudog
结构体,将其入队到发送队列,然后让 goroutine 进入睡眠状态。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// Block on the channel. Some receiver will complete our operation for us.
// 在通道上执行阻塞,别的接收者会完成我们的操作。
gp := getg() // 函数用于获取当前执行的 goroutine 的指针
mysg := acquireSudog() // acquireSudog() 函数用于获取一个 sudog 结构体,表示一个正在等待的 goroutine。
mysg.releasetime = 0 // 设置mysg的等待时间为0
if t0 != 0 { // 如果上文的t0不为0,则将其设置为-1
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 在将 elem 分配给 mysg 并将 mysg 入队到 gp.waiting 之间不能进行堆栈拆分,
// copystack 可以在这里找到它。
mysg.elem = ep // mysg.elem 设置为要发送的数据的指针。
mysg.waitlink = nil // mysg.waitlink 设置为 nil,表示没有下一个等待的 sudog
mysg.g = gp // mysg.g 设置为当前的 goroutine。
mysg.isSelect = false // mysg.isSelect 设置为 false,表示这不是一个 select 操作。
mysg.c = c // mysg.c 设置为当前的通道。
gp.waiting = mysg // gp.waiting 设置为当前的 sudog,表示当前 goroutine 正在等待。
gp.param = nil // gp.param 设置为 nil,表示没有传递参数。
c.sendq.enqueue(mysg) // 将mysg入队发送队列
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
// 向任何试图缩小堆栈的人发出信号,表示我们即将挂起通道。
// 此 G 的状态变化与我们设置 gp.activeStackChans 之间的窗口对于堆栈缩小来说是不安全的。
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
// 确保发送的值在接收者复制它之前保持存活。sudog 有一个指向堆栈对象的指针,但 sudog 并不被视为堆栈跟踪器的根。
// KeepAlive(ep) 的作用是确保发送的值在接收者复制它之前保持存活。即使 goroutine 被挂起,ep 指向的值也不会被垃圾回收器回收。
KeepAlive(ep)
...
}
处理唤醒后的操作
如果没有接收者唤醒这个 goroutine,它会一直处于阻塞状态,直到满足以下条件之一:
- 有接收者从通道中接收数据,从而释放缓冲区空间,并唤醒发送者。
- 通道被关闭,此时发送操作会引发 panic。
因此,如果没有接收者唤醒这个 goroutine,它会一直阻塞在
gopark
调用处,直到有接收者接收数据或通道被关闭。
- 当被唤醒时,检查是否因为通道关闭而被唤醒,如果是则抛出异常。
这段代码处理的是当发送操作被阻塞后,如何在被唤醒时进行后续处理。具体步骤包括检查等待的 sudog、重置 goroutine 的状态、检查通道是否关闭、记录阻塞事件、重置 sudog 的状态,并处理通道关闭的情况。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// someone woke us up.
// 有人唤醒这个sudog,如果不是等待的sudog(即当前的mysg,前面设置的),说明等待队列被破坏,抛出错误
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil // 表示当前 goroutine 不再等待
gp.activeStackChans = false // 表示当前 goroutine 不再活跃在通道上
closed := !mysg.success
gp.param = nil
// 如果 mysg.releasetime 大于 0,表示记录了阻塞开始的时间,调用 blockevent 函数记录阻塞事件。
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil // mysg.c 设置为 nil,表示 sudog 不再关联任何通道。
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
send 的作用
send
函数处理在空通道上的发送操作。它将发送者发送的值直接复制到接收者,并唤醒接收者继续其工作。通道必须为空且已锁定,send
函数在完成后会解锁通道。
send
- 处理在空通道上的发送操作。
- 将发送者发送的值直接复制到接收者。
- 解锁通道并唤醒接收者继续其工作。
// send processes a send operation on an empty channel c.
// send 处理空通道 c 上的发送操作。
// The value ep sent by the sender is copied to the receiver sg.
// 发送者发送的值 ep 被复制到接收者 sg。
// The receiver is then woken up to go on its merry way.
// 然后唤醒接收者让其继续。
// Channel c must be empty and locked. send unlocks c with unlockf.
// 通道 c 必须为空并已上锁。send 使用 unlockf 解锁 c。
// sg must already be dequeued from c.
// sg 必须已经从 c 中出队。
// ep must be non-nil and point to the heap or the caller's stack.
// ep 必须非空并指向堆或调用者的堆栈。
// 接受一个通道指针、一个等待接收数据的 goroutine 指针、一个数据指针、一个解锁函数和一个用于堆栈跟踪的整数
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果启用了竞争检测
if raceenabled {
// 无缓冲通道
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
// 假装我们通过缓冲区,即使我们直接复制。注意,只有启用竞争检测时才需要增加头尾位置。
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil { // 表示有数据需要发送
sendDirect(c.elemtype, sg, ep) // 直接将数据从发送者复制到接收者,这个也是在runtime2.go中定义
sg.elem = nil // 发送完成后,将 sg.elem 设置为 nil
}
gp := sg.g // 获取接收者的 goroutine gp
unlockf() // 调用 unlockf() 解锁通道
gp.param = unsafe.Pointer(sg) // 将接收者的 gp.param 设置为 unsafe.Pointer(sg),表示接收操作成功
sg.success = true
if sg.releasetime != 0 { // 如果 sg.releasetime 不为 0,则记录当前时间 cputicks()
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 调用 goready(gp, skip+1) 将接收者的 goroutine 设置为可运行状态,准备唤醒接收者
}
chanrecv
调用场景
在从ch中接收一个数字的时候,可以看到他这里是调用了runtime.chanrecv1()
这个函数
实现代码和解读
chanrecv1
这个函数实际上内部也是调用了chanrecv
函数,其中第三个参数为true
// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
chanrecv2
这个函数实际上内部也是调用了chanrecv
函数,其中第三个参数为true,同时有一个返回值
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
chanrecv
判断c是否为nil
若为nil且非阻塞则直接返回(false,false),若为空且阻塞则将它挂起
// chanrecv receives on channel c and writes the received data to ep.
// chanrecv 接收channel c中的元素,并将接收到的数据写给ep
// ep may be nil, in which case received data is ignored.
// ep可能为nil,这种情况下接收的数据被忽略
// If block == false and no elements are available, returns (false, false).
// 如果是非阻塞,同时没有元素可接收,那就return (false, false)
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// 否则,如果channel c关闭了,那么会将接收数据的指针ep清零,并且函数返回一个表示操作成功但没有数据接收的(true, false)
// Otherwise, fills in *ep with an element and returns (true, true).
// 否则,用接收到的元素写入到ep所指向的位置,然后return (true, true)
// A non-nil ep must point to the heap or the caller's stack.
// 一个非空的ep必须指向堆,或者调用者的栈
// 函数签名中的接收参数包括 传递参数的通道c,执行接收位置的指针ep,是否阻塞执行block;
// 返回值selected表示通道操作是否被选中执行。在Go语言的上下文中,这通常意味着通道操作是否成功,或者是否因为通道关闭而立即返回。
// received:表示是否成功接收到了数据。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
if c.timer != nil {
c.timer.maybeRunChan()
}
Fast Path,看能否在加锁之间返回false
在没有加锁之前判断能不能直接返回false
empty报告了是否读操作会导致阻塞,即通道是否为空。
函数返回的时刻,结果是原子性正确的(即没有其他并发操作干扰),并且是顺序一致的(即结果反映了调用时刻的状态)。然而,由于通道在函数返回后没有被锁定,因此通道的状态可能会立即发生变化,变得不为空。
// empty reports whether a read from c would block (that is, the channel is // empty). It is atomically correct and sequentially consistent at the moment // it returns, but since the channel is unlocked, the channel may become // non-empty immediately afterward. func empty(c *hchan) bool { // c.dataqsiz is immutable. // 检查是否是缓冲通道 if c.dataqsiz == 0 { // 如果是非缓冲通道,那么通过原子操作加载 sendq 队列的第一个元素(first)。sendq 是一个等待发送数据的 goroutine 队列。如果 first 为 nil,则表示没有 goroutine 在等待发送数据,因此通道为空。 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil } // c.timer is also immutable (it is set after make(chan) but before any channel operations). // All timer channels have dataqsiz > 0. if c.timer != nil { c.timer.maybeRunChan() } // 如果通道是有缓冲的,那么通过原子操作加载 qcount 字段,这个字段表示通道缓冲区中当前的数据项数量。如果 qcount 为0,则表示通道为空。 return atomic.Loaduint(&c.qcount) == 0 }
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) { // 如果接收操作非阻塞且channel为空(这里的empty代表是否会读取之后阻塞)
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
// 在观察到channel没有准备好接收之后,我们观察channel是否是处于关闭状态
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// 这些检查的重新排序可能会在与关闭操作竞争时导致不正确的行为
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
// 例如,如果通道是打开的且非空,然后被关闭,接着被清空,
// 重新排序的读取可能会错误地指示“打开且空”。为了防止重新排序,
// 我们使用原子加载来进行这两项检查,并依赖于清空和关闭操作在
// 同一个锁下的单独关键部分中发生。这个假设在关闭
// 一个带有阻塞发送的无缓冲通道时失败,但无论如何那都是一个错误条件。
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
// 因为通道不能被重新打开,所以稍后观察到通道
// 未关闭意味着在第一次观察的时刻它也未关闭。我们表现得好像我们在那个时刻观察到了通道
// 并报告接收操作不能继续。
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
// 通道已经不可逆地关闭。重新检查通道是否有任何待接收的数据
// 这些数据可能在上面的空和关闭检查之间到达。
// 当与这样的发送操作竞争时,这里也需要顺序一致性。
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
初始化和性能监控
- 如果
blockprofilerate
大于 0,获取当前的 CPU 时间戳t0
以用于后续的性能分析。
锁定通道
- 锁定通道
c
的互斥锁。
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
如果通道已关闭,检查有无等待的数据和发送者
- 如果通道已关闭且没有等待的数据:
- 如果启用了竞态检测,通知竞态检测器通道已关闭。
- 解锁通道。
- 如果
ep
不为nil
,清空ep
指向的内存。 - 返回
true, false
表示成功接收,但通道已关闭。
- 否则,如果通道未关闭并且有等待的发送者:
- 从发送队列中获取一个等待的发送者。
- 调用
recv
函数处理接收操作并解锁通道。 - 返回
true, true
表示成功接收并且通道未关闭。
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 通道已关闭,但缓冲区中有数据。
} else {
// 发现未关闭的等待发送者。
if sg := c.sendq.dequeue(); sg != nil {
// 找到等待的发送者。如果缓冲区大小为0,则直接从发送者接收值。
// 否则,从队列头部接收值,并将发送者的值添加到队列尾部(因为队列已满,两者映射到同一缓冲区槽位)。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
如果缓冲区不为空,接收缓冲区数据
- 如果缓冲区中有数据:
- 从缓冲区中接收数据到
ep
。 - 清空缓冲区中的数据。
- 更新接收索引
recvx
,如果超过缓冲区大小,重置为 0。 - 减少缓冲区中的数据计数
qcount
。 - 解锁通道。
- 返回
true, true
表示成功接收并且通道未关闭。
- 从缓冲区中接收数据到
if c.qcount > 0 {
// 直接从队列接收
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
如果是非阻塞,处理非阻塞接收
- 如果
block
为false
,解锁通道并返回false, false
表示没有数据可接收且不阻塞。
if !block {
unlock(&c.lock)
return false, false
}
阻塞等待发送者
- 获取当前 Goroutine。
- 获取一个
sudog
(表示 Goroutine 的结构体)并初始化。 - 将
ep
指向的内存地址赋值给sudog
。 - 将当前 Goroutine 设置为等待状态,并将
sudog
加入通道的接收队列。 - 如果通道有定时器,阻塞定时器。
- 将 Goroutine 设置为即将阻塞在通道上,调用
gopark
进行阻塞。
// 没有可用的发送者:在此通道上阻塞。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 在分配元素和将mysg入队到gp.waiting之间没有栈分割
// 这样copystack可以在那里找到它。
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
if c.timer != nil {
blockTimerChan(c)
}
// 向任何试图缩小我们的栈的人发出信号,表明我们即将在通道上停车。
// 在这个G的状态改变和我们设置gp.activeStackChans之间的窗口
// 对于栈缩小是不安全的。
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
处理唤醒后的操作
- 检查
sudog
是否仍在等待列表中,如果不在,抛出异常。 - 处理定时器解锁。
- 将 Goroutine 从等待状态中移除,标记为不再阻塞。
- 处理释放时间。
- 返回
true
和sudog
的成功标志。
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
if c.timer != nil {
unblockTimerChan(c)
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
recv
// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1. The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2. The value received by the receiver (the current G) is
// written to ep.
// recv处理在满通道c上的接收操作。
// 有两个部分:
// 1. 发送者sg发送的值被放入通道,发送者被唤醒继续其愉快的旅程。
// 2. 接收者(当前G)接收的值被写入ep。
//
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
// 对于同步通道,两个值是相同的。
// 对于异步通道,接收者从通道缓冲区获取数据,而发送者的数据被放入通道缓冲区。
// 通道c必须是满的且已锁定。recv通过unlockf解锁c。
// sg必须已经从c中出队。
// 非nil的ep必须指向堆或调用者的栈。
是非缓冲通道?
如果是非缓冲通道,执行以下步骤:
如果启用了竞态检测(raceenabled),则调用 racesync(c, sg) 来同步数据。
如果 ep 不为 nil,则直接从发送者 sg 复制数据到 ep,调用 recvDirect(c.elemtype, sg, ep)。
// recv的函数签名接收参数:一个接收数据的通道c,sg表示等待在通道上的发送数据的 goroutine,ep表示接收数据写入的地址
// unlock:在接收操作完成后解锁通道; skip:控制某些内部操作的跳过次数
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是非缓冲通道
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
recvDirect
是Go语言运行时系统的一部分,用于在非缓冲通道上直接从发送者复制数据到接收者func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { // dst is on our stack or the heap, src is on another stack. // The channel is locked, so src will not move during this // operation. src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) memmove(dst, src, t.Size_) }
是缓冲通道
如果通道是缓冲通道(即 c.dataqsiz > 0),执行以下步骤:
获取队列中当前接收位置 c.recvx 的元素 qp,调用 chanbuf(c, c.recvx)。
如果启用了竞态检测,则调用 racenotify 来通知竞态检测器。
如果 ep 不为 nil,则从队列 qp 复制数据到接收者 ep,调用 typedmemmove(c.elemtype, ep, qp)。
从发送者 sg 复制数据到队列 qp,调用 typedmemmove(c.elemtype, qp, sg.elem)。
更新接收位置 c.recvx,如果它等于缓冲区大小 c.dataqsiz,则将其重置为0,以循环使用缓冲区。
更新发送位置 c.sendx,使其等于接收位置 c.recvx,因为队列是满的,发送者和接收者指向同一位置。
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
处理后续
将发送者 sg 的 elem 字段设置为 nil,表示发送者不再持有数据。
获取发送者 sg 对应的 goroutine gp。
调用 unlockf() 函数来解锁通道 c,这个函数是在外部传递进来的,用于在接收操作完成后释放通道锁。
将 gp.param 设置为指向 sg 的 unsafe.Pointer,这通常用于在唤醒 goroutine 时传递参数。
将 sg.success 设置为 true,表示发送操作成功。
如果 sg.releasetime 不为0,则将其设置为当前的 CPU 滴答数 cputicks(),这可能用于性能分析。
调用 goready(gp, skip+1) 来唤醒发送者 sg 对应的 goroutine gp,使其准备好运行,skip+1 参数可能用于控制调度。
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
closechan
调用场景
关闭通道通过close(ch)
实现,在汇编代码中可以看到是调用了runtime.closechan()这个函数
实现代码和解读
closechan
检查通道是否为空
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel")) // 检查通道是否为空,如果为空则抛出异常
}
channel加锁,检查是否已经被关闭
lock(&c.lock) // 锁定通道
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel")) // 检查通道是否已经关闭,如果已关闭则抛出异常
}
竞态检测相关操作
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) // 记录当前调用者的程序计数器
racerelease(c.raceaddr()) // 进行相关竞态检测操作
}
标记为通道已关闭
c.closed = 1 // 标记通道为已关闭
释放所有等待接收的G
var glist gList
// 释放所有等待接收的 Goroutine
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem) // 清空接收的元素
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks() // 记录释放时间
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr()) // 竞态检测
}
glist.push(gp) // 将 Goroutine 加入列表
}
释放所有等待发送的G
// 释放所有等待发送的 Goroutine(它们将会 panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks() // 记录释放时间
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr()) // 竞态检测
}
glist.push(gp) // 将 Goroutine 加入列表
}
unlock(&c.lock) // 解锁通道
释放通道锁,唤醒所有G
// 现在已经释放了通道锁,唤醒所有 Goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3) // 唤醒 Goroutine
}
}