首页 » Web前端 » phpmissingargument技巧_一文搞懂Go通道

phpmissingargument技巧_一文搞懂Go通道

访客 2024-11-04 0

扫一扫用手机浏览

文章目录 [+]

// bufferedch := make(chan Task, 3)// unbufferedch := make(chan int)// nilvar ch chan int复制代码

追踪make函数,会创造在builtin/builtin.go中仅有一个声明func make(t Type, size ...IntegerType) Type。
真正的实现可以参考go内置函数make,大略来说在cmd/compile/internal/gc/typecheck.go中有函数typecheck1

// The result of typecheck1 MUST be assigned back to n, e.g.// n.Left = typecheck1(n.Left, top)func typecheck1(n Node, top int) (res Node) {if enableTrace && trace {defer tracePrint("typecheck1", n)(&res)}switch n.Op {case OMAKE:ok |= ctxExprargs := n.List.Slice()if len(args) == 0 {yyerror("missing argument to make")n.Type = nilreturn n}n.List.Set(nil)l := args[0]l = typecheck(l, Etype)t := l.Typeif t == nil {n.Type = nilreturn n}i := 1switch t.Etype {default:yyerror("cannot make type %v", t)n.Type = nilreturn ncase TCHAN:l = nilif i < len(args) {l = args[i]i++l = typecheck(l, ctxExpr)l = defaultlit(l, types.Types[TINT])if l.Type == nil {n.Type = nilreturn n}if !checkmake(t, "buffer", l) {n.Type = nilreturn n}n.Left = l} else {n.Left = nodintconst(0)}n.Op = OMAKECHAN //对应的函数位置}if i < len(args) {yyerror("too many arguments to make(%v)", t)n.Op = OMAKEn.Type = nilreturn n}n.Type = tif (top&ctxStmt != 0) && top&(ctxCallee|ctxExpr|Etype) == 0 && ok&ctxStmt == 0 {if !n.Diag() {yyerror("%v evaluated but not used", n)n.SetDiag(true)}n.Type = nilreturn n}return n}}复制代码

终极真正实现位置为runtime/chan.go

phpmissingargument技巧_一文搞懂Go通道

func makechan(t chantype, size int) hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c hchan switch { case mem == 0: // Queue or element size is zero. c = (hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.kind&kindNoPointers != 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n") } return c}复制代码

从这个函数可以看出,channel的数据构造为hchan

phpmissingargument技巧_一文搞懂Go通道
(图片来自网络侵删)
2.2构造

接下来我们看一下channel的数据构造,基于数据构造,可以推测出详细实现。

runtime/chan.go

type hchan struct {//channel行列步队里面总的数据量qcount uint // total data in the queue// 循环行列步队的容量,如果是非缓冲的channel便是0dataqsiz uint // size of the circular queue// 缓冲行列步队,数组类型。
buf unsafe.Pointer // points to an array of dataqsiz elements// 元素占用字节的sizeelemsize uint16// 当前行列步队关闭标志位,非零表示关闭closed uint32// 行列步队里面元素类型elemtype _type // element type// 行列步队send索引sendx uint // send index// 行列步队索引recvx uint // receive index// 等待channel的G行列步队。
recvq waitq // list of recv waiters// 向channel发送数据的G行列步队。
sendq waitq // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.// 全局锁lock mutex}复制代码

通过该hchan的数据构造和makechan函数,数据构造里有几个值得解释的数据:

dataqsiz表示channel的长度,如果为非缓冲行列步队,则值为0。
通过dataqsiz实现环形行列步队。
buf存放真正的数据sendx和recvx指在环形行列步队中数据入channel和出channel的位置sendq存放向channel发送数据的goroutine行列步队recvq存放等待获取channel数据的goroutine行列步队lock为全局锁2.3Anwser

通过追查到的代码,我们可以回答最开始提出的几个问题了。

2.3.1channel为什么是并发安全的?

由于做操作之前,都会先获取全局锁,只有获取成功的才能进行操作,担保了并发安全。

2.3.2同步通道和异步通道有啥差异?

利用的底层数据构造、操作代码都是一样的,只不过dataqsiz的值不一样,一个为0,一个为正数。

2.3.3通道为何会壅塞协程?

当通道已经满了,但协程连续往通道里写入,或者通道里没有数据,但是协程从通道里获取数据时,协程会被壅塞。

实现的事理与Golang并发调度的GMP模型强干系。

写入满通道的流程

当前goroutine(G1)创建自身的一个引用(sudog),放置到hchan的sendq行列步队当前goroutine(G1)会调用gopark函数,将当前协程置为waiting状态;将M和G1绑定关系断开;scheduler会调度其余一个就绪态的goroutine与M建立绑定关系,然后M 会运行其余一个G。

读取空通道的流程

当前goroutine(G2)会创建自身的一个引用(sudog)将代表G2的sudog存入recvq等待行列步队G2会调用gopark函数进入等待状态,让出OS thread,然后G2进入壅塞态2.3.4利用通道导致壅塞的协程是如何解除壅塞的?

对付已经满的通道,当有协程G2做读操作时,会解除G1的壅塞,流程为

G2调用 t:=<-ch 获取一个元素A;从hchan的buf里面取出一个元素;从sendq等待行列步队里面pop一个sudog;将G1要写入的数据复制到buf中A的位置,然后更新buf的sendx和recvx索引值;G2调用goready(G1)将G1置为Runable状态,表示G1可以规复运行;

对付读取空的通道,当有协程G1做写操作时,会解除G2的壅塞,流程为

将待写入的发送给吸收的goroutine G2;G1调用goready(G2) 将G2设置造诣绪状态,等待调度;2.4实现

我们来看一下chan的详细实现

2.4.1读取数据

// chanrecv receives on channel c and writes the received data to ep.// ep may be nil, in which case received data is ignored.// If block == false and no elements are available, returns (false, false).// Otherwise, if c is closed, zeros ep and returns (true, false).// Otherwise, fills in ep with an element and returns (true, true).// A non-nil ep must point to the heap or the caller's stack.func chanrecv(c hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") } if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not ready for receiving, we observe that the // channel is not closed. Each of these observations is a single word-sized read // (first c.sendq.first or c.qcount, and second c.closed). // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. // // The order of operations is important here: reversing the operations can lead to // incorrect behavior when racing with a close. if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed}复制代码

吸收channel的数据的流程如下:

CASE1:前置channel为nil的场景:

如果block为非壅塞,直接return;

如果block为壅塞,就调用gopark()壅塞当前goroutine,并抛出非常。

前置场景,block为非壅塞,且channel为非缓冲行列步队且sender等待行列步队为空 或者 channel为有缓冲行列步队但是行列步队里面元素数量为0,且channel未关闭,这个时候直接return;调用 lock(&c.lock) 锁住channel的全局锁;

CASE2:channel已经被关闭且channel缓冲中没有数据了,这时直接返回success和空值;

CASE3:sender行列步队非空,调用 func recv(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int)

函数处理:

1.先取channel缓冲行列步队的敌人元素复制给receiver(也便是ep);

2.将sender行列步队的敌人元素里面的数据复制到channel缓冲行列步队刚刚弹出的元素的位置,这样缓冲行列步队就不用移动数据了。

channel是非缓冲channel,直接调用recvDirect函数直接从sender recv元素到ep工具,这样就只用复制一次;

对付sender行列步队非空情形下, 有缓冲的channel的缓冲行列步队一定是满的:

开释channel的全局锁;

调用goready函数标记当前goroutine处于ready,可以运行的状态;

CASE4:sender行列步队为空,缓冲行列步队非空,直接取行列步队元素,移动头索引;

CASE5:sender行列步队为空、缓冲行列步队也没有元素且不壅塞协程,直接return (false,false);

CASE6:sender行列步队为空且channel的缓存行列步队为空,将goroutine加入recv行列步队,并壅塞。

2.4.2写入数据

/ generic single channel send/recv If block is not nil, then the protocol will not sleep but return if it could not complete. sleep can wake up with g.param == nil when a channel involved in the sleep has been closed. it is easiest to loop and re-run the operation; we'll see that it's now closed. /func chansend(c hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasn't closed during the first observation. if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true}复制代码

向channel写入数据紧张流程如下:

CASE1:当channel为空或者未初始化,如果block表示壅塞那么向个中发送数据将会永久壅塞;如果block表示非壅塞就会直接return;CASE2:前置场景,block为非壅塞,且channel没有关闭(已关闭的channel不能写入数据)且(channel为非缓冲行列步队且receiver等待行列步队为空)或则( channel为有缓冲行列步队但是行列步队已满),这个时候直接return;调用 lock(&c.lock) 锁住channel的全局锁;CASE3:不能向已经关闭的channel send数据,会导致panic。
CASE4:如果channel上的recv行列步队非空,则跳过channel的缓存行列步队,直接向发送给吸收的goroutine:调用sendDirect方法,将待写入的发送给吸收的goroutine;开释channel的全局锁;调用goready函数,将吸收的goroutine设置造诣绪状态,等待调度。
CASE5:缓存行列步队未满,则将复制到缓存行列步队上,然后开释全局锁;CASE6:缓存行列步队已满且吸收行列步队recv为空,则将当前的goroutine加入到send行列步队;获取当前goroutine的sudog,然后入channel的send行列步队;将当前goroutine休眠关闭channel

func closechan(c hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }}复制代码

关闭的紧张流程如下所示:

获取全局锁;设置channel数据构造chan的关闭标志位;获取当前channel上面的读goroutine并链接成链表;获取当前channel上面的写goroutine然后拼接到前面的读链表后面;开释全局锁;唤醒所有的读写goroutine。
总结

理解一下详细实现还是很好的,虽然在利用上不会带来变革,不过理解了内涵后,能够更加灵巧地利用通道,可以更加随意马虎的追查到问题,也能学习到高手的设计思想。

资料Golang-Channel事理解析golang对付 nil通道 close通道你所不知道的神器特性Go措辞make和new关键字的差异及实现事理Go底层引用实现图解Golang的channel底层事理go内置函数makeGolang并发调度的GMP模型末了

大家如果喜好我的文章,可以关注我的"大众年夜众号(程序员麻辣烫)

我的个人博客为:shidawuhen.github.io/

往期文章回顾:

技能

Go通道实现事理Go定时器实现事理HTTPS连接过程限流实现2秒杀系统分布式系统与同等性协议微做事之做事框架和注册中央Beego框架利用浅谈微做事TCP性能优化限流实现1Redis实现分布式锁Golang源码BUG追查事务原子性、同等性、持久性的实现事理CDN要求过程详解常用缓存技巧如何高效对接第三方支付Gin框架简洁版InnoDB锁与事务简析算法总结

读书条记

敏捷革命如何磨炼自己的影象力大略的逻辑学-读后感热风-读后感论语-读后感孙子兵法-读后感

思考

项目流程管理对项目管理的一些意见对产品经理的一些思考关于程序员职业发展的思考关于代码review的思考Markdown编辑器推举-typora
标签:

相关文章

QQ伪装黑客代码大全技术与风险警示

网络安全问题日益凸显。QQ作为一种流行的社交工具,成为了黑客攻击的主要目标之一。本文将针对QQ伪装黑客代码大全进行深入剖析,揭示其...

Web前端 2025-03-02 阅读1 评论0