一、介绍
我们都知道,Java编程经常会使用到synchronized关键字来实现对多线程环境下的同步操作。类似地,Java还提供了Lock接口来实现锁的相关操作。与synchronized隐式获取释放锁不同,Lock接口可以让开发人员显式地进行加锁和解锁操作,其更加灵活,可以实现更多场景下的锁操作。
队列同步器AbstractQueuedSynchronizer是Java定义的用来构建锁或者其他自定义同步组件的框架,Lock的实现都是基于这个队列同步器的。它定义了一个整型的同步状态state变量,同步器可以根据state的值来判断线程是否处于同步状态。另外,还有一个FIFO的同步队列,当线程未获取到同步状态时,会被封装为节点放入到队列当中。为了实现类似synchronized中的wait/notify机制,同步器加入了Condition接口, 通过Condition的await()和signal()来实现等待/通知机制。有了队列同步器,锁的使用者可以不用关心锁的具体实现细节,直接使用lock()和unlock()方法即可;同样的,锁的实现者也不用在意锁的底层原语实现,他们只需要定义好如何获取/释放同步状态等操作,提供给锁的使用者使用。ReentrantLock、ReentrantReadWriteLock等都是基于AbstractQueuedSynchronizer来实现的。
二、相关方法
为了实现用户自定义锁的功能,队列同步器可重写的方法有:
方法名称 | 描述 |
protected boolean tryAcquire(int arg) | 独占式获取同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态 |
protected boolean tryAcquireShared(int arg) | 共享式获取同步状态 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 判断同步状态是否被当前线程独占 |
另外,为了让开发人员更好地实现自定义锁,同步器还提供了以下的模板方法:
方法名称 | 描述 |
void acquire(int arg) | 独占式获取同步状态,如果获取失败将会进入同步队列等待;该方法需调用重写的tryAcquire(int arg) |
void acquireInterruptibly(int arg) | 与acquire(int arg)相同,但是该方法会响应中断,如果发生中断,则会抛出InterruptedException |
boolean tryAcquireNanos(int arg, long nanos) | 超时独占式获取同步状态 |
void acquireShared(int arg) | 共享式获取同步状态,同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInterruptibly(int arg) | 与acquireShared(int arg)相同,增加中断功能 |
boolean tryAcquireSharedNanos(int arg, long nanos) | 超时共享式获取同步状态 |
boolean release(int arg) | 独占式释放同步状态 |
boolean releaseShared(int arg) | 共享式释放同步状态 |
Collection<Thread> getQueuedThreads() | 获取在同步状态上的线程集合 |
三、自定义锁实现
基于队列同步器实现自定义锁的主要方法为,定义一个同步器对象,根据需要重写同步器方法,这里面需要加入自定义的同步状态逻辑,一般主要重写tryAcquire和tryRelease;然后将该同步器对象内嵌到自定义锁中,一般是通过内部类的形式;最后定义锁的lock()和unlock()即可。
下面是实现一个自定义锁的简单例子。
自定义TwoLock,用于表示同一时刻允许最多两个线程获得同步状态,其他线程进入等待状态。
class TwoLock {
// 构造同步器对象
private final Sync sync = new Sync(2);
// 以内部类方式嵌入同步器
private static final class Sync extends AbstractQueuedSynchronizer{
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must more than 0");
}
setState(count);
}
protected int tryAcquireShared(int arg) {
// 自旋获取同步状态
for (;;) {
int curCount = getState();
int newCount = curCount - arg;
// 通过CAS方式设置state
if (newCount < 0 || compareAndSetState(curCount, newCount)) {
return newCount;
}
}
}
protected boolean tryReleaseShared(int arg) {
for (;;) {
int curCount = getState();
int newCount = curCount + arg;
if (compareAndSetState(curCount, newCount)) {
return true;
}
}
}
}
public void lock() {
sync.tryAcquireShared(1);
}
public void unlock() {
sync.tryReleaseShared(1);
}
}