读写锁是通过位运算来快速确定读和写的状态的。假设当前state = s,则写状态等于s & ((1 << 16) - 1),读状态等于s >>> 16。当写状态增加1时,state = s + 1。当读状态加1时,state = s + (1 << 16)。
1 << 0 等于 1,(1 << 0) - 1 = 01 << 1 等于 10,(1 << 1) - 1 = 011 << 2 等于 100,(1 << 2) - 1 = 0111 << 4 等于 1000,(1 << 4) - 1 = 01111 << 8 等于 100000000,(1 << 8) - 1 = 0111111111 << 16 等于 10000000000000000,(1 << 16) - 1 = 01111111111111111//所以s & ((1 << 16) - 1)相当于将s的高16位全部抹去,只剩下低16位//若s = 11111,则s >>> 2 = 00111//所以s >>> 16,就是无符号补0右移16位(3)写锁的获取与释放
写锁是一个可重入的排他锁,它只能被一个线程同时获取。如果当前线程已获取写锁,则增加写状态:s + 1。如果当前线程在获取写锁时,读锁已被获取或者自己不是已获写锁的线程,则进入等待状态。
读锁是一个可重入的共享锁,它能被多个线程同时获取。在写状态为0时,读锁总会被成功获取,而所做的也只是增加读状态。如果当前线程已获取读锁,则增加读状态:state = s + (1 << 16)。如果当前线程在获取读锁时,写锁已被其他其他线程获取,则该线程进入等待状态。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { //Inner class providing readlock private final ReentrantReadWriteLock.ReadLock readerLock; //Inner class providing writelock private final ReentrantReadWriteLock.WriteLock writerLock; //Performs all synchronization mechanics final Sync sync; //Creates a new ReentrantReadWriteLock with default (nonfair) ordering properties. public ReentrantReadWriteLock() { this(false); } //Creates a new ReentrantReadWriteLock with the given fairness policy. //@param fair {@code true} if this lock should use a fair ordering policy public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } //获取写锁 public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } //获取读锁 public ReentrantReadWriteLock.ReadLockreadLock(){ return readerLock; } ...}(2)ReentractReadWriteLock的读锁和写锁
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { ... //Performs all synchronization mechanics final Sync sync; //The lock returned by method {@link ReentrantReadWriteLock#readLock}. public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } //Acquires the read lock. //Acquires the read lock if the write lock is not held by another thread and returns immediately. //If the write lock is held by another thread then the current thread becomes disabled //for thread scheduling purposes and lies dormant until the read lock has been acquired. public void lock() { sync.acquireShared(1); } //Acquires the read lock unless the current thread is Thread#interrupt interrupted. public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //Acquires the read lock only if the write lock is not held by another thread at the time of invocation. public boolean tryLock() { return sync.tryReadLock(); } //Acquires the read lock if the write lock is not held by another thread //within the given waiting time and the current thread has not been Thread#interrupt interrupted. public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //Attempts to release this lock. //If the number of readers is now zero then the lock is made available for write lock attempts. public void unlock() { sync.releaseShared(1); } ... } //The lock returned by method {@link ReentrantReadWriteLock#writeLock}. public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } //Acquires the write lock. //Acquires the write lock if neither the read nor write lock are held by another thread and returns immediately, //setting the write lock hold count to one. //If the current thread already holds the write lock then the hold count is incremented by one and the method return immediately. //If the lock is held by another thread then the current thread becomes disabled //for thread scheduling purposes and lies dormant until the write lock has been acquired, //at which time the write lock hold count is set to one. public void lock() { sync.acquire(1); } //Acquires the write lock unless the current thread is Thread#interrupt interrupted. public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } //Acquires the write lock only if it is not held by another thread at the time of invocation. public boolean tryLock( ) { return sync.tryWriteLock(); } //Acquires the write lock if it is not held by another thread //within the given waiting time and the current thread has not been Thread#interrupt interrupted. public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } //Attempts to release this lock. //If the current thread is the holder of this lock then the hold count is decremented. //If the hold count is now zero then the lock is released. //If the current thread is not the holder of this lock then IllegalMonitorStateException is thrown. public void unlock() { sync.release(1); } ... } ...}(3)ReentractReadWriteLock基于AQS的两对模版方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Acquires in exclusive mode, ignoring interrupts. //Implemented by invoking at least once #tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquire until success. //This method can be used to implement method Lock#lock. public final void acquire(int arg) { //tryAcquire()需要子类重写 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //Releases in exclusive mode. //Implemented by unblocking one or more threads if #tryRelease returns true. //This method can be used to implement method Lock#unlock. public final boolean release(int arg) { //tryRelease()需要子类重写 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) { unparkSuccessor(h); } return true; } return false; } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //Acquires in shared mode, ignoring interrupts. //Implemented by first invoking at least once #tryAcquireShared, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquireShared until success. public final void acquireShared(int arg) { //tryAcquireShared()需要子类重写 if (tryAcquireShared(arg) < 0) { doAcquireShared(arg); } } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //Releases in shared mode. //Implemented by unblocking one or more threads if #tryReleaseShared returns true. public final boolean releaseShared(int arg) { //tryReleaseShared()需要子类重写 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } ...}
情况一:c != 0 && w == 0
情况二:c != 0 && w != 0
情况三:c != 0 && w != 0 && current持有锁
情况四:c == 0
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { ... //Performs all synchronization mechanics final Sync sync; //The lock returned by method {@link ReentrantReadWriteLock#writeLock}. public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } //Acquires the write lock. //Acquires the write lock if neither the read nor write lock are held by another thread and returns immediately, //setting the write lock hold count to one. //If the current thread already holds the write lock then the hold count is incremented by one and the method return immediately. //If the lock is held by another thread then the current thread becomes disabled //for thread scheduling purposes and lies dormant until the write lock has been acquired, //at which time the write lock hold count is set to one. public void lock() { //执行AQS的acquire()模版方法,获取写锁(独占锁) sync.acquire(1); } ... } //Synchronization implementation for ReentrantReadWriteLock. //Subclassed into fair and nonfair versions. abstract static class Sync extends AbstractQueuedSynchronizer { static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //Returns the number of exclusive holds represented in count static int exclusiveCount(int c) { //获取写锁状态:c & ((1 << 16) - 1) //也就是从state变量中查找当前获得写锁的线程数量 return c & EXCLUSIVE_MASK; } //获取写锁(独占锁) protected final boolean tryAcquire(int acquires) { //Walkthrough: //1.If read count nonzero or write count nonzero and owner is a different thread, fail. //2.If count would saturate, fail. (This can only happen if count is already nonzero.) //3. Otherwise, this thread is eligible for lock if it is either a reentrant acquire or queue policy allows it. //If so, update state and set owner. Thread current = Thread.currentThread(); int c = getState();//获取锁的状态 int w = exclusiveCount(c);//获取写锁的状态 if (c != 0) { //如果c != 0 && w == 0时,说明有线程持有读锁,所以当前获取写锁的线程会被阻塞,会返回false //如果c != 0 && w != 0 && current没获取锁,说明有其他线程持有写锁(不可能有线程持有读锁),所以当前获取写锁的线程会被阻塞,会返回false if (w == 0 || current != getExclusiveOwnerThread()) { return false; } //判断重入次数不能大于65535 if (w + exclusiveCount(acquires) > MAX_COUNT) { throw new Error("Maximum lock count exceeded"); } //Reentrant acquire setState(c + acquires); return true; } //此时c == 0,说明没有线程持有锁,可以通过CAS操作抢占锁 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) { return false; } setExclusiveOwnerThread(current); return true; } ... }}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //获取写锁(独占锁) //Acquires in exclusive mode, ignoring interrupts. //Implemented by invoking at least once #tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquire until success. //This method can be used to implement method Lock#lock. public final void acquire(int arg) { //tryAcquire()需要子类重写,此时执行ReentrantReadWriteLock的内部类Sync的tryAcquire()方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } ...}(2)WriteLock的释放
在Sync类的tryRelease()方法中,首先通过getState() - releases来递减写锁的次数。由于写锁的重入次数保存在低位,所以直接十进制相减即可。然后通过exclusiveCount()获取写锁的重入次数,如果为0说明锁释放成功。最后通过setState()方法修改state变量的值。由于写锁是独占锁,所以设置state变量的值不需要CAS操作。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { ... //Performs all synchronization mechanics final Sync sync; //The lock returned by method {@link ReentrantReadWriteLock#writeLock}. public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } //Attempts to release this lock. //If the current thread is the holder of this lock then the hold count is decremented. //If the hold count is now zero then the lock is released. //If the current thread is not the holder of this lock then IllegalMonitorStateException is thrown. public void unlock() { //执行AQS的release()方法,释放写锁(独占锁) sync.release(1); } ... } //Synchronization implementation for ReentrantReadWriteLock. //Subclassed into fair and nonfair versions. abstract static class Sync extends AbstractQueuedSynchronizer { static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //Returns the number of exclusive holds represented in count static int exclusiveCount(int c) { //获取写锁状态:c & ((1 << 16) - 1) //也就是从state变量中查找当前获得写锁的线程数量 return c & EXCLUSIVE_MASK; } //Note that tryRelease and tryAcquire can be called by Conditions. //So it is possible that their arguments contain both read and write holds //that are all released during a condition wait and re-established in tryAcquire. protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } int nextc = getState() - releases;//递减写锁的次数 boolean free = exclusiveCount(nextc) == 0;//计算写锁的重入次数 if (free) { setExclusiveOwnerThread(null); } setState(nextc); return free; } ... } ...}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //释放写锁(独占锁) //Releases in exclusive mode. //Implemented by unblocking one or more threads if #tryRelease returns true. //This method can be used to implement method Lock#unlock. public final boolean release(int arg) { //tryRelease()需要子类重写,此时执行ReentrantReadWriteLock的内部类Sync的tryRelease()方法 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) { unparkSuccessor(h); } return true; } return false; } ...}
在继承自AQS的Sync类的tryAcquireShared()方法中:首先会判断是否有线程持有写锁 + 持有写锁的线程是否是当前线程。如果有线程持有写锁,但不是当前线程持有写锁,那么会阻塞当前线程。然后判断当前线程获取读锁是否应该阻塞,读锁重入次数是否小于65535,以及通过CAS修改state值来抢占读锁是否成功。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { ... //Performs all synchronization mechanics final Sync sync; //The lock returned by method {@link ReentrantReadWriteLock#readLock}. public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } //Acquires the read lock. //Acquires the read lock if the write lock is not held by another thread and returns immediately. //If the write lock is held by another thread then the current thread becomes disabled //for thread scheduling purposes and lies dormant until the read lock has been acquired. public void lock() { //执行AQS的acquireShared()方法,获取读锁(共享锁) sync.acquireShared(1); } ... } //Synchronization implementation for ReentrantReadWriteLock. //Subclassed into fair and nonfair versions. abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //Returns the number of exclusive holds represented in count static int exclusiveCount(int c) { //获取写锁状态:c & ((1 << 16) - 1) //也就是从state变量中查找当前获得写锁的线程数量 return c & EXCLUSIVE_MASK; } //Returns the number of shared holds represented in count static int sharedCount(int c) { //获取读锁状态:c >>> 16 //也就是从state变量中查找当前获得读锁的线程数量 return c >>> SHARED_SHIFT; } //A counter for per-thread read hold counts. //Maintained as a ThreadLocal; cached in cachedHoldCounter static final class HoldCounter { int count = 0; //Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } //ThreadLocal subclass. Easiest to explicitly define for sake of deserialization mechanics. static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } //The number of reentrant read locks held by current thread. //Initialized only in constructor and readObject. //Removed whenever a thread's read hold count drops to 0. private transient ThreadLocalHoldCounter readHolds; //The hold count of the last thread to successfully acquire readLock. //This saves ThreadLocal lookup in the common case where the next thread to release is the last one to acquire. //This is non-volatile since it is just used as a heuristic, and would be great for threads to cache. private transient HoldCounter cachedHoldCounter; //firstReader is the first thread to have acquired the read lock. //firstReaderHoldCount is firstReader's hold count. private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState());// ensures visibility of readHolds } //获取读锁(共享锁) protected final int tryAcquireShared(int unused) { //Walkthrough: //1.If write lock held by another thread, fail. //2.Otherwise, this thread is eligible for lock wrt state, //so ask if it should block because of queue policy. //If not, try to grant by CASing state and updating count. //Note that step does not check for reentrant acquires, //which is postponed to full version to avoid having to check hold count in the more typical non-reentrant case. //3.If step 2 fails either because thread apparently not eligible or CAS fails or count saturated, //chain to version with full retry loop. Thread current = Thread.currentThread(); int c = getState(); //如果写锁的持有线程不是当前线程,则直接阻塞 //从而说明,如果一个线程先获取了写锁,那么是可以重入获取读锁的 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) { return -1; } int r = sharedCount(c);//获取读锁的状态 //首先判断当前线程获取读锁是否应该阻塞,然后判断读锁重入次数是否小于65535,最后通过CAS修改state值抢占读锁 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //使用ThreadLocal记录每个线程重入读锁的次数 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { cachedHoldCounter = rh = readHolds.get(); } else if (rh.count == 0) { readHolds.set(rh); } rh.count++; } return 1; } //如果当前线程获取读锁失败,则调用fullTryAcquireShared()方法通过自旋尝试获取锁 return fullTryAcquireShared(current); } ... } ...}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //获取读锁(共享锁) //Acquires in shared mode, ignoring interrupts. //Implemented by first invoking at least once #tryAcquireShared, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquireShared until success. public final void acquireShared(int arg) { //tryAcquireShared()需要子类重写,此时执行ReentrantReadWriteLock的内部类Sync的tryAcquireShared()方法 if (tryAcquireShared(arg) < 0) { //调用AQS的doAcquireShared()方法入队等待队列和阻塞当前线程 doAcquireShared(arg); } } //Acquires in shared uninterruptible mode. private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //注意:如果等待获取读锁的线程被唤醒,那么会继续循环把其后连续的所有等待获取读锁的线程都唤醒 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) { selfInterrupt(); } failed = false; return; } } //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL //执行parkAndCheckInterrupt()方法挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { interrupted = true; } } } finally { if (failed) { cancelAcquire(node); } } } ...}(2)ReadLock的释放
在Sync类的tryReleaseShared()方法中:首先会结合ThreadLocal处理当前线程重入读锁的次数,然后再通过自旋 + CAS设置state值来实现释放读锁,最后执行AQS的doReleaseShared()方法唤醒阻塞的线程。
tryRelease()和tryReleaseShared()的区别:读锁是共享锁,由多个线程持有,所以释放读锁需要通过自旋 + CAS完成。写锁是独占锁,由单个线程持有,所以释放写锁时不需要CAS操作。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { ... //Performs all synchronization mechanics final Sync sync; //The lock returned by method {@link ReentrantReadWriteLock#readLock}. public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } //Attempts to release this lock. //If the number of readers is now zero then the lock is made available for write lock attempts. public void unlock() { //执行AQS的releaseShared()方法,释放读锁(共享锁) sync.releaseShared(1); } ... } //Synchronization implementation for ReentrantReadWriteLock. //Subclassed into fair and nonfair versions. abstract static class Sync extends AbstractQueuedSynchronizer { ... //释放读锁(共享锁) protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //首先结合ThreadLocal处理当前线程重入读锁的次数 if (firstReader == current) { if (firstReaderHoldCount == 1) { firstReader = null; } else { firstReaderHoldCount--; } } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); } int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) { throw unmatchedUnlockException(); } } --rh.count; } //然后通过自旋 + CAS设置state值来实现释放读锁 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) { //Releasing the read lock has no effect on readers, //but it may allow waiting writers to proceed if both read and write locks are now free. return nextc == 0; } } } ... } ...}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //释放读锁(共享锁) //Releases in shared mode. //Implemented by unblocking one or more threads if #tryReleaseShared returns true. public final boolean releaseShared(int arg) { //tryReleaseShared()需要子类重写,此时执行ReentrantReadWriteLock的内部类Sync的tryReleaseShared()方法 if (tryReleaseShared(arg)) { //执行AQS的doReleaseShared()方法唤醒阻塞的线程 doReleaseShared(); return true; } return false; } //Release action for shared mode -- signals successor and ensures propagation. //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal. private void doReleaseShared() { //Ensure that a release propagates, even if there are other in-progress acquires/releases. //This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal. //But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues. //Additionally, we must loop in case a new node is added while we are doing this. //Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking. for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) { //loop to recheck cases continue; } //执行AQS的unparkSuccessor()方法 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) { //loop on failed CAS continue; } } //loop if head changed if (h == head) { break; } } } //Wakes up node's successor, if one exists. private void unparkSuccessor(Node node) { //If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling. //It is OK if this fails or if status is changed by waiting thread. //Node结点的状态watiStatus可以分为如下几种: //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) //默认情况下,watiStatus应该是0,或者是空 //获得头结点的状态 int ws = node.waitStatus; //需要设置头结点的状态为0 if (ws < 0) { compareAndSetWaitStatus(node, ws, 0); } //Thread to unpark is held in successor, which is normally just the next node. //But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor. //获取头结点的后继结点 Node s = node.next; //如果头结点的后继结点为null或其状态为CANCELED if (s == null || s.waitStatus > 0) { s = null; //那么就从尾结点开始扫描,找到距离头结点最近的 + 状态不是取消的结点 for (Node t = tail; t != null && t != node; t = t.prev) { if (t.waitStatus <= 0) { s = t; } } } if (s != null) { //唤醒传入的头结点的后继结点对应的线程 LockSupport.unpark(s.thread); } } ...}(3)fullTryAcquireShared()方法
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { ... //Synchronization implementation for ReentrantReadWriteLock. //Subclassed into fair and nonfair versions. abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //Returns the number of exclusive holds represented in count static int exclusiveCount(int c) { //获取写锁状态:c & ((1 << 16) - 1) //也就是从state变量中查找当前获得写锁的线程数量 return c & EXCLUSIVE_MASK; } //Returns the number of shared holds represented in count static int sharedCount(int c) { //获取读锁状态:c >>> 16 //也就是从state变量中查找当前获得读锁的线程数量 return c >>> SHARED_SHIFT; } //A counter for per-thread read hold counts. //Maintained as a ThreadLocal; cached in cachedHoldCounter static final class HoldCounter { int count = 0; //Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } //ThreadLocal subclass. Easiest to explicitly define for sake of deserialization mechanics. static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } //The number of reentrant read locks held by current thread. //Initialized only in constructor and readObject. //Removed whenever a thread's read hold count drops to 0. private transient ThreadLocalHoldCounter readHolds; //The hold count of the last thread to successfully acquire readLock. //This saves ThreadLocal lookup in the common case where the next thread to release is the last one to acquire. //This is non-volatile since it is just used as a heuristic, and would be great for threads to cache. private transient HoldCounter cachedHoldCounter; //firstReader is the first thread to have acquired the read lock. //firstReaderHoldCount is firstReader's hold count. private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState());// ensures visibility of readHolds } ... //Full version of acquire for reads, //that handles CAS misses and reentrant reads not dealt with in tryAcquireShared. final int fullTryAcquireShared(Thread current) { //This code is in part redundant with that in tryAcquireShared //but is simpler overall by not complicating tryAcquireShared with interactions //between retries and lazily reading hold counts. HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { //如果当前有其他线程获取到写锁,那么返回-1,当前线程会挂起阻塞并进入等待队列 if (getExclusiveOwnerThread() != current) { return -1; } //else we hold the exclusive lock; blocking here would cause deadlock. } else if (readerShouldBlock()) { //如果readerShouldBlock()返回true,表示当前线程获取读锁需要被阻塞 //结合ThreadLocal处理当前线程重入读锁的次数 //Make sure we're not acquiring read lock reentrantly if (firstReader == current) { //assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) { readHolds.remove(); } } } //如果不是重入 if (rh.count == 0) { return -1; } } } if (sharedCount(c) == MAX_COUNT) { throw new Error("Maximum lock count exceeded"); } if (compareAndSetState(c, c + SHARED_UNIT)) { //结合ThreadLocal处理当前线程重入读锁的次数 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) { rh = cachedHoldCounter; } if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); } else if (rh.count == 0) { readHolds.set(rh); } rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } ... } ...}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { ... //Fair version of Sync static final class FairSync extends Sync { final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } } //Synchronization implementation for ReentrantReadWriteLock. //Subclassed into fair and nonfair versions. abstract static class Sync extends AbstractQueuedSynchronizer { ... //Returns true if the current thread, when trying to acquire the read lock, //and otherwise eligible to do so, should block because of policy for overtaking other waiting threads. abstract boolean readerShouldBlock(); //Returns true if the current thread, when trying to acquire the write lock, //and otherwise eligible to do so, should block because of policy for overtaking other waiting threads. abstract boolean writerShouldBlock(); ... } ...}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Queries whether any threads have been waiting to acquire longer than the current thread. //判断当前队列中是否有线程排队 public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //所以!hasQueuedPredecessors() 等价于: //h == t || (h.next != null && h.next.thread == Thread.currentThread()) return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ...}(2)非公平锁的实现代码
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { ... static final class NonfairSync extends Sync { //写线程调用 final boolean writerShouldBlock() { return false; // writers can always barge } //读线程调用 final boolean readerShouldBlock() { //As a heuristic to avoid indefinite writer starvation, //block if the thread that momentarily appears to be head of queue, if one exists, is a waiting writer. //This is only a probabilistic effect since a new reader will not block //if there is a waiting writer behind other enabled readers that have not yet drained from the queue. return apparentlyFirstQueuedIsExclusive(); } } ...}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //如果当前等待队列头结点的后继结点是写锁结点,那么该方法就返回true,表示当前来获取读锁的读线程需要排队; //如果当前等待队列头结点的后继结点是读锁结点,那么该方法就返回false,表示当前来获取读锁的读线程可以抢占锁; final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next)!= null && !s.isShared() && s.thread != null; } ...}
Object data;public void processData() { readLock.lock();//要先获取读锁,因为后面要读数据,比如update为true时,需要 if (!update) {//发现要进行修改 readLock.unlock();//必须先释放读锁,接下来才能获取写锁 writeLock.lock();//锁降级从这里获取到写锁开始 try { if (!update) { //准备修改数据,写数据data data = ...; update = true; } readLock.lock();//在获得写锁的基础上,继续获取读锁 } finally { writeLock.unlock();//释放写锁,写锁降级为读锁,完成锁降级 } } try { //使用数据,读数据data使用时间长 use(data); } finally { readLock.unlock();//释放读锁 }}
public interface Condition { //Causes the current thread to wait until it is signalled or Thread#interrupt interrupted. void await() throws InterruptedException; //Causes the current thread to wait until it is signalled. void awaitUninterruptibly(); //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses. long awaitNanos(long nanosTimeout) throws InterruptedException; //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses. boolean await(long time, TimeUnit unit) throws InterruptedException; //Causes the current thread to wait until it is signalled or interrupted, or the specified deadline elapses. boolean awaitUntil(Date deadline) throws InterruptedException; //Wakes up one waiting thread. void signal(); //Wakes up all waiting threads. void signalAll();}(2)Condition说明
线程1 -> 获取锁 -> 释放锁 + await()阻塞等待 ->
线程2 -> 获取锁 -> signal()唤醒线程1 + 释放锁 ->
线程1 -> 被唤醒 + 尝试获取锁 -> 释放锁
public class ConditionDemo() { static ReentrantLock lock = new ReentrantLock(); static Condition condition = lock.newCondition(); public static void main(String[] args) throws Exception { new Thread() { public void run() { lock.lock(); System.out.println("第一个线程加锁"); try { System.out.println("第一个线程释放锁以及阻塞等待"); condition.await(); System.out.println("第一个线程重新获取锁"); } catch (Exception e) { e.printStackTrace(); } System.out.println("第一个线程释放锁"); lock.unlock(); }; }.start(); Thread.sleep(3000); new Thread() { public void run() { lock.lock(); System.out.println("第二个线程加锁"); System.out.println("第二个线程唤醒第一个线程"); condition.signal(); lock.unlock(); System.out.println("第二个线程释放锁"); }; }.start(); }}
public class ReentrantLock implements Lock, java.io.Serializable { ... //Synchronizer providing all implementation mechanics private final Sync sync; //Returns a {@link Condition} instance for use with this Lock instance. public Condition newCondition() { //执行ReentrantLock内部类Sync的newCondition()方法 return sync.newCondition(); } //Base of synchronization control for this lock. //Subclassed into fair and nonfair versions below. //Uses AQS state to represent the number of holds on the lock. abstract static class Sync extends AbstractQueuedSynchronizer { ... final ConditionObject newCondition() { return new ConditionObject(); } ... } ...} public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... public class ConditionObject implements Condition, java.io.Serializable { //First node of condition queue. private transient Node firstWaiter; //Last node of condition queue. private transient Node lastWaiter; //Creates a new ConditionObject instance. public ConditionObject() { } ... } ...}(2)ConditionObject的Condition队列
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... public class ConditionObject implements Condition, java.io.Serializable { //First node of condition queue. private transient Node firstWaiter; //Last node of condition queue. private transient Node lastWaiter; ... //Implements interruptible condition wait. //If current thread is interrupted, throw InterruptedException. public final void await() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } //1.执行ConditionObject的addConditionWaiter()方法,将当前线程封装成Node结点并加入Condition队列 Node node = addConditionWaiter(); //2.调用AQS的fullyRelease()方法释放锁 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //3.阻塞当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } } //4.当前线程被signal()方法唤醒后,执行AQS的acquireQueued()方法尝试获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } if (node.nextWaiter != null) {// clean up if cancelled unlinkCancelledWaiters(); } if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } } //将当前线程封装成Node结点并加入Condition队列 private Node addConditionWaiter() { Node t = lastWaiter; //If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) { firstWaiter = node; } else { t.nextWaiter = node; } lastWaiter = node; return node; } ... } //释放锁 //Invokes release with current state value; returns saved state. //Cancels node and throws exception on failure. final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) { node.waitStatus = Node.CANCELLED; } } } //Releases in exclusive mode. //Implemented by unblocking one or more threads if #tryRelease returns true. //This method can be used to implement method Lock#unlock. public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) { //唤醒等待队列头结点的后继结点 unparkSuccessor(h); } return true; } return false; } //Acquires in exclusive uninterruptible mode for thread already in queue. //Used by condition wait methods as well as acquire. final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL //执行parkAndCheckInterrupt()方法挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { interrupted = true; } } } finally { if (failed) { cancelAcquire(node); } } } ...}(4)ConditionObject的通知方法signal()
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... public class ConditionObject implements Condition, java.io.Serializable { //First node of condition queue. private transient Node firstWaiter; //Last node of condition queue. private transient Node lastWaiter; ... //Moves the longest-waiting thread, if one exists, //from the wait queue for this condition to the wait queue for the owning lock. public final void signal() { //通过isHeldExclusively()方法检查当前线程是否获取了锁 if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } Node first = firstWaiter; if (first != null) { doSignal(first); } } //Removes and transfers nodes until hit non-cancelled one or null. //Split out from signal in part to encourage compilers to inline the case of no waiters. private void doSignal(Node first) { do { //firstWaiter后移 if ((firstWaiter = first.nextWaiter) == null) { lastWaiter = null; } //firstWaiter出队 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } //Transfers a node from a condition queue onto sync queue. //Returns true if successful. final boolean transferForSignal(Node node) { //在addConditionWaiter()方法时,node就被封装为CONDITION类型 //如果CAS失败,说明当前结点已被修改为CANCELED,此时需要继续查找Condition队列的下一个结点 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) { return false; } //将node结点转移到等待队列中,返回等待队列的尾结点 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) { //唤醒node结点 LockSupport.unpark(node.thread); } return true; } //Implements interruptible condition wait. //If current thread is interrupted, throw InterruptedException. public final void await() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } //1.执行ConditionObject的addConditionWaiter()方法,将当前线程封装成Node结点并加入Condition队列 Node node = addConditionWaiter(); //2.调用AQS的fullyRelease()方法释放锁 int savedState = fullyRelease(node); int interruptMode = 0; //一开始node结点并不在等待队列中,所以isOnSyncQueue()方法返回false,进行阻塞 //后来其他线程调用signal()方法,该node结点就会被唤醒,然后发现该node结点已经在等待队列中,于是isOnSyncQueue()方法返回true while (!isOnSyncQueue(node)) { //3.阻塞当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } } //4.当前线程被signal()方法唤醒后,执行AQS的acquireQueued()方法尝试获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } if (node.nextWaiter != null) {// clean up if cancelled unlinkCancelledWaiters(); } if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } } ... } //Inserts node into queue, initializing if necessary. See picture above. private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) { tail = head; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ...}