searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Java DelayQueue实现分析

2023-09-21 02:06:53
13
0

DelayQueue是一个无界阻塞队列,队列中的元素比较特殊,必须是实现了Delayed接口的元素。Delayed接口是一个混合接口,它继承了Comparator接口。它也具有PriorityBlockingQueue的特征,元素中优化级最高的元素是延迟时间最长的元素。队列头的元素是呆在队列时间最长的元素,它只有到时期,才能出队。即getDelay获取到的时间小于等于0时,否则返回null元素。
DelayQueue的并发控制同样使用ReentrantLock和它的Condition对象来实现。因为添加元素不阻塞,所以也只有一个Condition对象来实现等待/通知模式。DelayQueue同样不允许使用null元素。

一、DelayQueue的结构

DelayQueue的结构和PriorityBlockingQueue基本一致,它持有一个PriorityQueue的引用
各种方法实现也委托给了PriorityQueue对象来实现。另外还有一个ReentrantLock和它的Condition对象;队列中的元素是Delayed接口类型的元素。
Delayed接口定义:

1
2
3
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

DelayQueue的主要成员变量:

1
2
3
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();

二、DelayQueue的主要方法实现

1、入队操作

入队操作因为是无界队列,所有不会出现阻塞,put/offer都正常添加到队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//取出优先级最高的元素,不移除
E first = q.peek();
q.offer(e);
//如果获取到优先级最高的元素为null,说明队列为空
//或者当前添加的元素优先级比队列中的最高优化级元素低,则通知等待队列中的线程
if (first == null || e.compareTo(first) < 0)
available.signalAll();
return true;
} finally {
lock.unlock();
}
}

2、出队操作
因为出队必须是到期的元素,如果获取不到元素,阻塞版本的take会阻塞等待到delay的时间到期,而超时版本的poll会返回null。具体的实现都大同小异,只看take方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//响应中断
lock.lockInterruptibly();
try {
//自旋
for (;;) {
//获取队列中优化级最高的元素
E first = q.peek();
if (first == null) {
//元素为空,则阻塞,释放锁,进入Condition等待队列
available.await();
} else {
//元素不为空,则获取元素的延迟时间
long delay = first.getDelay(TimeUnit.NANOSECONDS);
//延迟时间>0,继续释放锁,进入有时间的等待
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
//否则,出队,并通知其它出队线程
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}

具体的实现流程如下:
1).获取锁,并加锁;
2).开始自旋,利用peek方法获取队列的头元素,如果获取失败,则释放锁,进入Condition的等待队列;
3).如果能获取到队列头的元素,则判断到期时间;如果还未到期,则继续在剩下的时间中await;
4).如果元素已经到时,则获取成功,poll出队,同时判断队列如果非空,则继续通知阻塞的线程;
5).最后返回获取到的元素;

三、使用DelayQueue

DelayQueue可以用于很多场景,比如缓存过期管理、会话过期管理、连接超时管理等。下面的例子是使用DelayQueue来管理缓存中过期的元素。

1、保存数据的键值对类:

1
2
3
4
5
6
7
8
9
public class Pair<K, V> {
public K first;
public V second;
public Pair(){}
public Pair(K first,V second){
this.first = first;
this.second = second;
}
}

2、实现Delayed接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class DelayItem<T> implements Delayed {

private static long NANO_ORIGIN = System.nanoTime();
final static long now(){
return System.nanoTime() - NANO_ORIGIN;
}
private static final AtomicLong sequencer = new AtomicLong(0);
private long sequenceNumber;
private final long time;
private final T item;

public DelayItem(T sumbmit,long timeout){
this.item = sumbmit;
this.time = now() + timeout;
this.sequenceNumber = sequencer.getAndIncrement();
}

public T getItem(){
return this.item;
}

@Override
public int compareTo(Delayed other) {
if(other == this)
return 0;
if(other instanceof DelayItem){
DelayItem x = (DelayItem)other;
long diff = time - x.time;
if(diff < 0)
return -1;
else if(diff > 0)
return 1;
else if(sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
return d == 0 ? 0 : (d < 0) ? -1 : 1;
}

@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now() , unit.NANOSECONDS);
return d;
}

}

3、缓存实现和测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class Cache<K,V> {
private final ConcurrentMap<K,V> cache = new ConcurrentHashMap<K,V>();


private final DelayQueue<DelayItem<Pair<K,V>>> q = new DelayQueue<DelayItem<Pair<K,V>>>();

private Thread daemonThread;

public Cache(){
Runnable checkTask = new Runnable(){
@Override
public void run(){
checkTimeout();
}
};
daemonThread = new Thread(checkTask);
daemonThread.setDaemon(true);
daemonThread.start();
}

private void checkTimeout(){
for(;;){
try {
DelayItem<Pair<K,V>> item = q.take();
if(item != null){
Pair<K,V> pair = item.getItem();
//删除超时的对象
cache.remove(pair.first,pair.second);
}
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}

public void put(K key,V value,long time,TimeUnit unit){
V oldValue = cache.put(key, value);
if(oldValue != null)
cache.remove(oldValue);
long nanoTime = TimeUnit.NANOSECONDS.convert(time,unit);
q.put(new DelayItem<Pair<K,V>>(new Pair<K,V>(key,value),nanoTime));
}

public V get(K key){
return cache.get(key);
}

public static void main(String[] args) throws InterruptedException {
Cache<Integer,String> cache = new Cache<Integer,String>();
cache.put(1, "aaaa", 3, TimeUnit.SECONDS);
Thread.sleep(2000);
System.out.println(cache.get(1));
Thread.sleep(2000);
System.out.println(cache.get(1));
}

}
0条评论
0 / 1000