// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file.
package sync
packagefunccas(val *int32, old, newint32)bool
export type Mutex struct { key int32; sema int32; }
funcxadd(val *int32, delta int32) (newint32) { for { v := *val; if cas(val, v, v+delta) { return v+delta; } } panic("unreached") }
func(m *Mutex) Lock() { if xadd(&m.key, 1) == 1 { // changed from 0 to 1; we hold lock return; } sys.semacquire(&m.sema); }
func(m *Mutex) Unlock() { if xadd(&m.key, -1) == 0 { // changed from 1 to 0; no contention return; } sys.semrelease(&m.sema); }
// A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 sema uint32 }
// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file.
// Package sync provides basic synchronization primitives such as mutual // exclusion locks. Other than the Once and WaitGroup types, most are intended // for use by low-level library routines. Higher-level synchronization is // better done via channels and communication. // // Values containing the types defined in this package should not be copied. package sync
import"sync/atomic"
// A Mutex is a mutual exclusion lock. // Mutexes can be created as part of other structures; // the zero value for a Mutex is an unlocked mutex. type Mutex struct { state int32 sema uint32 }
// A Locker represents an object that can be locked and unlocked. type Locker interface { Lock() Unlock() }
// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func(m *Mutex) Lock() { // Fast path: grab unlocked mutex. // 直接加锁成功,然后返回 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return }
// 唤醒标志,这部分表示锁已经被占用 awoke := false for { old := m.state // 表示当前无锁可用,对应最后一位 new := old | mutexLocked if old&mutexLocked != 0 { // 表示现在进入到等待状态,waiter + 1,通过 mutexWaiterShift 偏移实现,对应 state 前 29 位 new = old + 1<<mutexWaiterShift } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. // 如果此轮被唤醒(是被 unlock 唤醒),那么就重置,为了避免惊群现象 new &^= mutexWoken } // 尝试获取锁 if atomic.CompareAndSwapInt32(&m.state, old, new) { // get it ! if old&mutexLocked == 0 { break } // 等待被唤醒 runtime_Semacquire(&m.sema) awoke = true } } }
// Unlock unlocks m. // It is a run-time error if m is not locked on entry to Unlock. // // A locked Mutex is not associated with a particular goroutine. // It is allowed for one goroutine to lock a Mutex and then // arrange for another goroutine to unlock it. func(m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { panic("sync: unlock of unlocked mutex") }
old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema) return } old = m.state } }
not only one
由于只请求一次,无论什么原因,再次获取锁只有等待下一次信号量来才能重新被唤醒,因此我们想要多次进行请求,简单来说就是允许自旋锁,但是第一次优化里面就说了自旋锁会消耗大量 CPU 资源,因此需要对他进行限制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//判断是否可以自旋,同时满足以下4个条件才能自旋: //1、自旋次数小于4次 //2、cpu核数大于1 //3、GOMAXPROCS>1 //4、running P > 1 并且 P队列为空 funcsync_runtime_canSpin(i int)bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { returnfalse } if p := getg().m.p; p.runqhead != p.runqtail { returnfalse } returntrue }
// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file.
// Package sync provides basic synchronization primitives such as mutual // exclusion locks. Other than the Once and WaitGroup types, most are intended // for use by low-level library routines. Higher-level synchronization is // better done via channels and communication. // // Values containing the types defined in this package should not be copied. package sync
import ( "internal/race" "sync/atomic" "unsafe" )
functhrow(string)// provided by runtime
// A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 sema uint32 }
// A Locker represents an object that can be locked and unlocked. type Locker interface { Lock() Unlock() }
// Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don't try to acquire the mutex even if it appears // to be unlocked, and don't try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency. starvationThresholdNs = 1e6 )
// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func(m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return }
var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. ifnew&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break// locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } }
if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
// Unlock unlocks m. // It is a run-time error if m is not locked on entry to Unlock. // // A locked Mutex is not associated with a particular goroutine. // It is allowed for one goroutine to lock a Mutex and then // arrange for another goroutine to unlock it. func(m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) }
// Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } ifnew&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. runtime_Semrelease(&m.sema, true) } }
// A RWMutex is a reader/writer mutual exclusion lock. // The lock can be held by an arbitrary number of readers or a single writer. // The zero value for a RWMutex is an unlocked mutex. // // A RWMutex must not be copied after first use. // // If any goroutine calls [RWMutex.Lock] while the lock is already held by // one or more readers, concurrent calls to [RWMutex.RLock] will block until // the writer has acquired (and released) the lock, to ensure that // the lock eventually becomes available to the writer. // Note that this prohibits recursive read-locking. // A [RWMutex.RLock] cannot be upgraded into a [RWMutex.Lock], // nor can a [RWMutex.Lock] be downgraded into a [RWMutex.RLock]. // // In the terminology of [the Go memory model], // the n'th call to [RWMutex.Unlock] “synchronizes before” the m'th call to Lock // for any n < m, just as for [Mutex]. // For any call to RLock, there exists an n such that // the n'th call to Unlock “synchronizes before” that call to RLock, // and the corresponding call to [RWMutex.RUnlock] “synchronizes before” // the n+1'th call to Lock. // // [the Go memory model]: https://go.dev/ref/mem type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32// semaphore for writers to wait for completing readers readerSem uint32// semaphore for readers to wait for completing writers readerCount atomic.Int32 // number of pending readers readerWait atomic.Int32 // number of departing readers }
// Happens-before relationships are indicated to the race detector via: // - Unlock -> Lock: readerSem // - Unlock -> RLock: readerSem // - RUnlock -> Lock: writerSem // // The methods below temporarily disable handling of race synchronization // events in order to provide the more precise model above to the race // detector. // // For example, atomic.AddInt32 in RLock should not appear to provide // acquire-release semantics, which would incorrectly synchronize racing // readers, thus potentially missing races.
// RLock locks rw for reading. // // It should not be used for recursive read locking; a blocked Lock // call excludes new readers from acquiring the lock. See the // documentation on the [RWMutex] type. func(rw *RWMutex) RLock() { if race.Enabled { race.Read(unsafe.Pointer(&rw.w)) race.Disable() } // 对应了 writer 的 lock if rw.readerCount.Add(1) < 0 { // A writer is pending, wait for it. runtime_SemacquireRWMutexR(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } }
// Lock locks rw for writing. // If the lock is already locked for reading or writing, // Lock blocks until the lock is available. func(rw *RWMutex) Lock() { if race.Enabled { race.Read(unsafe.Pointer(&rw.w)) race.Disable() } // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && rw.readerWait.Add(r) != 0 { runtime_SemacquireRWMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } }
// RUnlock undoes a single [RWMutex.RLock] call; // it does not affect other simultaneous readers. // It is a run-time error if rw is not locked for reading // on entry to RUnlock. func(rw *RWMutex) RUnlock() { if race.Enabled { race.Read(unsafe.Pointer(&rw.w)) race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } if r := rw.readerCount.Add(-1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } }
func(rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() fatal("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if rw.readerWait.Add(-1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
// Unlock unlocks rw for writing. It is a run-time error if rw is // not locked for writing on entry to Unlock. // // As with Mutexes, a locked [RWMutex] is not associated with a particular // goroutine. One goroutine may [RWMutex.RLock] ([RWMutex.Lock]) a RWMutex and then // arrange for another goroutine to [RWMutex.RUnlock] ([RWMutex.Unlock]) it. func(rw *RWMutex) Unlock() { if race.Enabled { race.Read(unsafe.Pointer(&rw.w)) race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() }
// Announce to readers there is no active writer. r := rw.readerCount.Add(rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() fatal("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() if race.Enabled { race.Enable() } }