首页 » SEO优化 » php壅塞wait技巧_运用go syncwaitgroup重构做事

php壅塞wait技巧_运用go syncwaitgroup重构做事

访客 2024-12-08 0

扫一扫用手机浏览

文章目录 [+]

进行预估做事打算预估价格之前,会有十几个地方的数据进行获取。
我这里列举一些常见的。
比如

用户信息,我们看这个用户的类型,是否企业客户,是新用户还是老用户,这个用户有没有优惠券路径方案信息,我们获取一下详细方案的路线信息,这个一样平常是三方返回比如百度或者高德订单信息,订单的来源和类型,是否知足一些自己定制的条件(恶劣景象 动态调价等)附近司机信息,有些司机会有分外优惠代码的实现

我们这里利用大略的仿照代码进行实现,紧张分享一下实现的方法。

php壅塞wait技巧_运用go syncwaitgroup重构做事

涉及到多协程和数据信息的通报

php壅塞wait技巧_运用go syncwaitgroup重构做事
(图片来自网络侵删)
1. 首先想到的是利用channel

func es() { // 并发任务数 tasksNum := 10 ch := make(chan struct{}, tasksNum) for i := 0; i < tasksNum; i++ { go func() { defer func() { ch <- struct{}{} }() // working <-time.After(time.Second) }() } // 等待 10 个 goroutine 完成任务 for i := 0; i < tasksNum; i++ { <-ch } // do next // ...}

这个方法也可以办理问题,但是并发的任务数量会发送变革的,还要自己来掩护这个数量。
以是就反对了后面直策应用 sync.WaitGroup

2. 利用 channel + sync.waitgroup 网络数据和并发实行

func es(t testing.T) {tasksNum := 10dataCh := make(chan interface{})resp := make([]interface{}, 0, tasksNum)stopCh := make(chan struct{}, 1)// 启动读 goroutinego func() {for data := range dataCh {resp = append(resp, data)}stopCh <- struct{}{}}()// 担保获取到所有数据后,通过 channel 通报到读协程手中var wg sync.WaitGroupfor i := 0; i < tasksNum; i++ {wg.Add(1)go func(ch chan<- interface{}) {defer wg.Done()ch <- time.Now().UnixNano()}(dataCh)}// 确保所有取数据的协程都完成了事情,才关闭 chwg.Wait()close(dataCh)// 确保读协程处理完成 网络数据<-stopCh}

这里便是大概的项目实现的逻辑。
须要把稳的是还须要进行context超时掌握的判断,以是会在这个根本上做些超时和重试操作,这个不是本文的重点,就不添加上去了。

waitgroup的源码实现

type WaitGroup struct { // 防止值拷贝标记 noCopy noCopy // 64 个 bit 组成的状态值,高 32 位标识了当前须要等待多少个 goroutine 实行了 WaitGroup.Add,还没实行 WaitGroup.Done;低 32 位表示了当前多少 goroutine 实行了 WaitGroup.Wait 操作陷入壅塞中了 state1 uint64 state2 uint32}关于state2大略解释一下

在 64 位架构下,WaitGroup 构造体中的 state1 字段是一个 uint64 类型,高32位表示计数器,低32位表示等待者数量。
由于 64 位操作须要 64 位对齐,以是不须要额外处理。

而在 32 位架构下,由于只能担保 64 位字段的 32 位对齐,不能担保 64 位对齐,因此须要进行分外处理。

在 WaitGroup 的 state() 方法中,会检讨 state1 是否对齐,如果没有对齐,则须要将计数器和等待者数量的位置互换,即将原来低32位中的值存储到高32位,在高32位中存储计数器的值,并且在其他方法中也须要做相应的调度。

通过这种办法,可以确保在 32 位架构下,WaitGroup 构造体的操作能够正常进行,达到同步掌握的目的。

Add 和 wait 的源码实现部分

// 等待组计数器加 1func (wg WaitGroup) Add(delta int) { // 获取等待组的状态标识值,statep 指向 state1 的地址,semap 是用于壅塞挂起 goroutine 行列步队的标记值 statep, semap := wg.state() // ... // state1 高 32 位加 1,标识实行任务数量加 1 state := atomic.AddUint64(statep, uint64(delta)<<32) // 取的是 state 高 32 位的值,代表有多少个 goroutine 在实行任务 v := int32(state >> 32) // w 取的是 state 低 32 位的值,代表有多少个 goroutine 实行了 WaitGroup.Wait 在壅塞等待 w := uint32(state) // ... // 不能涌现负值的实行任务计数器 if v < 0 { panic("sync: negative WaitGroup counter") } // 倘若存在 goroutine 在壅塞等待 WaitGroup.Wait,但是在实行 WaitGroup.Add 前,实行任务计数器的值为 0 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 倘若当前没有 goroutine 在 Wait,或者任务实行计数器仍大于 0,则直接返回 if v > 0 || w == 0 { return } // 在实行过 WaitGroup.Wait 操作的情形下,WaitGroup.Add 操作不应该并发实行,否则可能导致 panic if statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 将 state1 计数器置为 0,然后依次唤醒实行过 Wait 的 waiters statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) }}

唤醒 goroutine 利用的方法是 runtime_Semrelease 方法,底层会实行 goready 操作,属于 goroutine 的被动调度模式

wg.Done实行的便是wg.Add(-1), 这里会有唤醒协程的操作。

WaitGroup.Done 常日在子 goroutine 内部实行,因此是可以并发调用的,但是利用的规则该当要担保,实行完本次 Done 操作后,并发计数器的数值仍旧是大于即是 0 的,这样并发实行不会有问题.

// Done decrements the WaitGroup counter by one.func (wg WaitGroup) Done() { wg.Add(-1)}

// Wait blocks until the WaitGroup counter is zero.func (wg WaitGroup) Wait() { // 获取 WaitGroup 状态字段的地址 statep, semap := wg.state() // ... for { state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) // 倘若当前须要等待完成任务的计数器值为 0,则无需 wait 直接返回 if v == 0 { // ... return } // wait 壅塞等待 waitGroup 的计数器加一,然后陷入壅塞 if atomic.CompareAndSwapUint64(statep, state, state+1)atomic.CompareAndSwapUint64(statep, state, state+1) { // ... runtime_Semacquire(semap) // 从壅塞中回答,倘若前一轮 wait 操作还没结束,waitGroup 又被利用了,则会 panic if statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } // ... return } }}

这个便是 关于 sync.waitgroup的实现分享了。

相关文章