介绍
双队列是一种高效的内存数据结构,在多线程编程中,能保证生产者线程的写入和消费者的读出尽量做到最低的影响,避免了共享队列的锁开销。本文将介绍一种双队列的设计,并给出实现代码,然后会举例使用的场景。该双队列在项目中使用,性能也得到了验证。
设计
接下来具体介绍双队列的设计,并且会粘贴少量方法代码,帮助介绍。
本文中讲述的双队列,本质上是两个数组保存写入的Object,一个数组负责写入,另一个被消费者读出,两个数组都对应一个重入锁。数组内写入的数据会被计数。
public class DoubleCachedQueue<T> extends AbstractQueue<T> implements BlockingQueue<T>, java.io.Serializable { private static final long serialVersionUID = 1L; private static int default_line_limit = 1000; private static long max_cache_size = 67108864L; private int lineLimit; private long cacheSize; private T[] itemsA; private T[] itemsB; private ReentrantLock readLock, writeLock; private Condition notFull; private Condition awake; /** * writeArray : in reader's eyes, reader get data from data source and write * data to this line array. readArray : in writer's eyes, writer put data to * data destination from this line array. * * Because of this is doubleQueue mechanism, the two line will exchange when * time is suitable. * */ private T[] writeArray, readArray; private volatile int writeCount, readCount; private int writeArrayTP, readArrayHP; private volatile boolean closed = false; private int spillSize = 0; private long lineRx = 0; private long lineTx = 0;
队列实现了阻塞队列的接口,所以在向队列offer数据的时候是阻塞的,同样,取出操作poll也会阻塞。两个数组会在适当的时候进行queueSwitch操作。queueSwitch的条件就是当读者把queue读空了之后,且写入的queue此时不为空的时候,两个queue就会进行交换。在交换的时候,写入queue会被上锁,此时生产者不能让队列里写入数据。一般情况下,queue互换其实就是两个数组的引用互换,将相应的计数器也重置,写队列的计数器此时就清零了,因为queue交换是因为读队列已经被读空。
private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException { System.out.println("queue switch"); writeLock.lock(); try { if (writeCount <= 0) { if (closed) { return -2; } try { if (isInfinite && timeout <= 0) { awake.await(); return -1; } else { return awake.awaitNanos(timeout); } } catch (InterruptedException ie) { awake.signal(); throw ie; } } else { T[] tmpArray = readArray; readArray = writeArray; writeArray = tmpArray; readCount = writeCount; readArrayHP = 0; writeCount = 0; writeArrayTP = 0; notFull.signal(); // logger.debug("Queue switch successfully!"); return -1; } } finally { writeLock.unlock(); } }
上面queue交换的时候,可以看到当要被交换的写队列也已经为空的时候,会做一次检查。如果此时queue已经被显示地关闭了,那么poll操作就会返回空,读者此时应该检查queue是否已经被closed了,若已经closed了,那么读者已经把queue里的数据读完了。这里的显示close是我们给双队列加的一个状态,close这件事的作用是为了让读者知道:生产者已经停止往queue里写新数据了,但是queue里其实可能还有未取完的数据(在写queue里,此时还差一次queue switch),你往queue poll取数据的时候,如果取到空了,那么应该做一次check,如果queue已经关闭了,那么读者就知道本次读的任务完全结束了。反过来,close状态其实不影响写,生产者如果还想写的话,其实也是可以的,但是我不推荐这么做。
public void close() { writeLock.lock(); try { closed = true; //System.out.println(this); awake.signalAll(); } finally { writeLock.unlock(); } }
如果没有这个close标志位的话,可能就需要消费者放入一个EOF让读者知道。这在只有一个生产者和一个消费者的情况下是可行的,但是如果是一个多对一,一对多,甚至多对多的情况呢?一对一的情况是最简单的,也是双队列被创造出来最合适的场景。因为双队列完全分离了一个生产者和一个消费者的锁争抢情况,各自只要获得自己的读/写队列的锁就可以了。在本文阐述的双队列中,唯一产生一些开销的就是queue swtich的情况,如果queue频繁交换的话,还是会产生一些性能开销的。

一对多
上面已经大致介绍了双队列的读写。在实际项目中,一对多的场景需要注意的地方有两:
- 单个生产者需要在结束的时候关闭queue
- 多个消费者需要知道任务结束(知道其他线程已经完成任务)

消费者之间或者外部有一方需要知道各个消费者线程的存活情况,这样才能知道本次任务完成。比如如果外面有一个上帝的话,可以加一个CountDownLatch计数,每个消费者完成后就countDown一次,外部调用await()直到大家都已经退出,那么整个任务结束。如果没有上帝,线程之间互相知道对方情况的话,我的做法是让生产者放入一个EOF,当某线程取到EOF的时候,他知道自己是第一个遇到尽头的人,他会置一个布尔,而其他线程在取到空的时候会检查该布尔值,这样就能知道是否已经有小伙伴已经拿到EOF了,那么这时候就可以countDown了,而拿到EOF的线程进程countDown后就await(),最后退出。
下面是我自己针对这种场景,使用双队列的方式,其中的fromQueue是一个ConcurrentLinkedQueue,大家可以忽略,toQueue是双队列,可以注意一下用法。特别是往里面写的时候,需要while循环重试直到写入成功。
@Override
public void run() {
long start = System.currentTimeMillis();
log.debug(Thread.currentThread() + " Unpacker started at " + start);
Random r = new Random(start);
Bundle bundle = null;
boolean shoudShutdown = false;
try {
while(!shoudShutdown) {
bundle = (Bundle) fromQueue.poll();
if (bundle == null) {
if (seeEOF.get()) {
// 当取到空,并且其他线程已经取到EOF,那么本线程将Latch减1,并退出循环
latch.countDown();
shoudShutdown = true;
} else {
// 如果EOF还没被取到,本线程小睡一会后继续取
try {
sleep(r.nextInt(10));
} catch (InterruptedException e) {
log.error("Interrupted when taking a nap", e);
}
}
} else if (!bundle.isEof()) {
// bundle非空且非EOF,则往双队列写入一个Bundle
byte[] lineBytes = BundleUtil.getDecompressedData(bundle);
// 放入双队列时,若offer失败则重试
while (!toQueue.offer(new UnCompressedBundle(bundle.getId(), ByteUtil.bytes2Lines(lineBytes, lineDelim), bundle.getIndex(), bundle.getJobId()))) {
log.info("Unpacker put failed, will retry");
}
log.info("After enqueue, queue size is " + toQueue.size());
} else {
// Unpacker获得到了EOF
seeEOF.set(true);
// 自己将Lacth减1,并等待其他线程退出
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
log.error("Interrupted when waiting the latch ");
}
// 其他线程已经退出,本线程放入EOF
while (!toQueue.offer(new UnCompressedBundle(-1L, new Line[0], -1L, -1L))) {
log.info("Unpacker put EOF failed, will retry");
}
// 关闭Queue
toQueue.close();
// 退出循环
shoudShutdown = true;
}
}
log.debug(Thread.currentThread() + " Unpacker finished in " + (System.currentTimeMillis()-start) + " ms");
} catch (Exception e) {
log.error("Exception when unpacker is running ", e);
// 将latch减1,表示自己异常退出,且不再工作
// latch.countDown();
log.debug(Thread.currentThread() + " Unpacker occured exception and stopped. ");
} finally {
}
}
多对一
多个生产者的情况下,写入队列无可避免发送锁争抢,但是能保证消费者的稳定读出过程。没有什么特殊处理的地方,这里就不啰嗦了。
总结分析
本文介绍了一种经典双队列的设计和实现,也给出了一些代码演示。文章末尾我会贴出整个双队列的代码实现,需要的同学也可以留言,我把.java发给你。如果使用的时候有发现问题,不吝赐教,这个双队列的实现也还不是很完美。使用的时候也存在需要注意的地方。
其实双队列的目的还是在于让写和读互相没有影响,而且更加照顾了写的速度。因为一般写的速度可能会比较快,而读的人读出之后还会做一些额外的处理,所以写的这一方借助双队列,可以持续写的过程,而且如果读的一方慢的话,可以多起几个消费者线程,就像"一对多"场景里阐述的那样来使用双队列。
下面是整个实现。各位可以仔细看看,发现问题一定记得通知我 :)
import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import lombok.ToString; import lombok.extern.log4j.Log4j; /** * Represents a region with two swap spaces, one for storing data which from * data source, the other one for storing data which will be transferred to data * destination. * <br> * A classical DoubleCachedQueue, In beginning, space A and space B both * empty, then loading task begin to load data to space A, when A is almost * full, let the data from data source being loaded to space B, then dumping * task begin to dump data from space A to data source. When space A is empty, * switch the two spaces for load and dump task. Repeat the above operation. * */ @Log4j @ToString public class DoubleCachedQueue<T> extends AbstractQueue<T> implements BlockingQueue<T>, java.io.Serializable { private static final long serialVersionUID = 1L; private static int default_line_limit = 1000; private static long max_cache_size = 67108864L; private int lineLimit; private long cacheSize; private T[] itemsA; private T[] itemsB; private ReentrantLock readLock, writeLock; private Condition notFull; private Condition awake; /** * writeArray : in reader's eyes, reader get data from data source and write * data to this line array. readArray : in writer's eyes, writer put data to * data destination from this line array. * * Because of this is doubleQueue mechanism, the two line will exchange when * time is suitable. * */ private T[] writeArray, readArray; private volatile int writeCount, readCount; private int writeArrayTP, readArrayHP; private volatile boolean closed = false; private int spillSize = 0; private long lineRx = 0; private long lineTx = 0; /** * Get info of line number in {@link DoubleCachedQueue} space. * * @return Information of line number. * */ public String info() { return String.format("Write Array: %s/%s; Read Array: %s/%s", writeCount, writeArray.length, readCount, readArray.length); } /** * Use the two parameters to construct a {@link DoubleCachedQueue} which hold the * swap areas. * * @param lineLimit * Limit of the line number the {@link DoubleCachedQueue} can hold. * * @param byteLimit * Limit of the bytes the {@link DoubleCachedQueue} can hold. * */ public DoubleCachedQueue(int lineLimit) { if (lineLimit <= 0) { this.lineLimit = default_line_limit; }else{ this.lineLimit = lineLimit; } itemsA = (T[])new Object[lineLimit]; itemsB = (T[])new Object[lineLimit]; readLock = new ReentrantLock(); writeLock = new ReentrantLock(); notFull = writeLock.newCondition(); awake = writeLock.newCondition(); readArray = itemsA; writeArray = itemsB; spillSize = lineLimit * 8 / 10; } public DoubleCachedQueue(long cacheSize){ if (cacheSize <= 0) { throw new IllegalArgumentException( "Queue initial capacity can't less than 0!"); } this.cacheSize = cacheSize > max_cache_size ? max_cache_size : cacheSize; readLock = new ReentrantLock(); writeLock = new ReentrantLock(); notFull = writeLock.newCondition(); awake = writeLock.newCondition(); readArray = itemsA; writeArray = itemsB; spillSize = lineLimit * 8 / 10; } /** * Get line number of the {@link DoubleCachedQueue} * * @return lineLimit Limit of the line number the {@link DoubleCachedQueue} can * hold. * */ public int getLineLimit() { return lineLimit; } /** * Set line number of the {@link DoubleCachedQueue}. * * @param capacity * Limit of the line number the {@link DoubleCachedQueue} can hold. * */ public void setLineLimit(int capacity) { this.lineLimit = capacity; } /** * Insert one line of record to a apace which buffers the swap data. * * @param line * The inserted line. * */ private void insert(T line) { writeArray[writeArrayTP] = line; ++writeArrayTP; ++writeCount; ++lineRx; } /** * Insert a line array(appointed the limit of array size) of data to a apace * which buffers the swap data. * * @param lines * Inserted line array. * * @param size * Limit of inserted size of the line array. * */ private void insert(T[] lines, int size) { if(size > 0){ System.arraycopy(lines, 0, writeArray, writeArrayTP, size); writeArrayTP = writeArrayTP + size; writeCount = writeCount + size; lineRx = lineRx + size; } // for (int i = 0; i < size; ++i) { // writeArray[writeArrayTP] = lines[i]; // ++writeArrayTP; // ++writeCount; // ++lineRx; // if(lines[i] != null && lines[i].getLine() != null){ // byteRx += lines[i].getLine().length(); // } // } } /** * Extract one line of record from the space which contains current data. * * @return line A line of data. * */ private T extract() { T e = readArray[readArrayHP]; readArray[readArrayHP] = null; ++readArrayHP; --readCount; ++lineTx; return e; } /** * Extract a line array of data from the space which contains current data. * * @param ea * @return Extracted line number of data. * */ private int extract(T[] ea) { int readsize = Math.min(ea.length, readCount); if(readsize > 0){ readCount = readCount - readsize; lineTx = lineTx + readsize; System.arraycopy(readArray, readArrayHP, ea, 0, readsize); readArrayHP = readArrayHP + readsize; } // for (int i = 0; i < readsize; ++i) { // ea[i] = readArray[readArrayHP]; // readArray[readArrayHP] = null; // ++readArrayHP; // --readCount; // ++lineTx; // } return readsize; } /** * switch condition: read queue is empty && write queue is not empty. * Notice:This function can only be invoked after readLock is grabbed,or may * cause dead lock. * * @param timeout * * @param isInfinite * whether need to wait forever until some other thread awake it. * * @return * * @throws InterruptedException * */ private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException { System.out.println("queue switch"); writeLock.lock(); try { if (writeCount <= 0) { if (closed) { return -2; } try { if (isInfinite && timeout <= 0) { awake.await(); return -1; } else { return awake.awaitNanos(timeout); } } catch (InterruptedException ie) { awake.signal(); throw ie; } } else { T[] tmpArray = readArray; readArray = writeArray; writeArray = tmpArray; readCount = writeCount; readArrayHP = 0; writeCount = 0; writeArrayTP = 0; notFull.signal(); // logger.debug("Queue switch successfully!"); return -1; } } finally { writeLock.unlock(); } } /** * If exists write space, it will return true, and write one line to the * space. otherwise, it will try to do that in a appointed time,when time is * out if still failed, return false. * * @param line * a Line. * * @param timeout * appointed limit time * * @param unit * time unit * * @return True if success,False if failed. * */ public boolean offer(T line, long timeout, TimeUnit unit) throws InterruptedException { if (line == null) { throw new NullPointerException(); } long nanoTime = unit.toNanos(timeout); writeLock.lockInterruptibly(); if(itemsA == null || itemsB == null){ initArray(line); } try { for (;;) { if (writeCount < writeArray.length) { insert(line); if (writeCount == 1) { awake.signal(); } return true; } // Time out if (nanoTime <= 0) { return false; } // keep waiting try { nanoTime = notFull.awaitNanos(nanoTime); } catch (InterruptedException ie) { notFull.signal(); throw ie; } } } finally { writeLock.unlock(); } } private void initArray(T line) { long recordLength = computeSize(line); long size = cacheSize/recordLength; if(size <= 0){ size = default_line_limit; } lineLimit = (int) size; itemsA = (T[])new Object[(int) size]; itemsB = (T[])new Object[(int) size]; readArray = itemsA; writeArray = itemsB; } public long computeSize(T line){ return 1; } /** * If exists write space, it will return true, and write a line array to the * space.<br> * otherwise, it will try to do that in a appointed time,when time out if * still failed, return false. * * @param lines * line array contains lines of data * * @param size * Line number needs to write to the space. * * @param timeout * appointed limit time * * @param unit * time unit * * @return status of this operation, true or false. * * @throws InterruptedException * if being interrupted during the try limit time. * */ public boolean offer(T[] lines, int size, long timeout, TimeUnit unit) throws InterruptedException { if (lines == null || lines.length == 0) { throw new NullPointerException(); } long nanoTime = unit.toNanos(timeout); writeLock.lockInterruptibly(); if(itemsA == null || itemsB == null){ initArray(lines[0]); } try { for (;;) { if (writeCount + size <= writeArray.length) { insert(lines, size); if (writeCount >= spillSize) { awake.signalAll(); } return true; } // Time out if (nanoTime <= 0) { return false; } // keep waiting try { nanoTime = notFull.awaitNanos(nanoTime); } catch (InterruptedException ie) { notFull.signal(); throw ie; } } } finally { writeLock.unlock(); } } /** * Close the synchronized lock and one inner state. * */ public void close() { writeLock.lock(); try { closed = true; //System.out.println(this); awake.signalAll(); } finally { writeLock.unlock(); } } public boolean isClosed() { return closed; } /** * * * @param timeout * appointed limit time * * @param unit * time unit */ public T poll(long timeout, TimeUnit unit) throws InterruptedException { long nanoTime = unit.toNanos(timeout); readLock.lockInterruptibly(); try { for (;;) { if (readCount > 0) { return extract(); } if (nanoTime <= 0) { return null; } nanoTime = queueSwitch(nanoTime, true); } } finally { readLock.unlock(); } } /** * * @param ea * line buffer * * * @param timeout * a appointed limit time * * @param unit * a time unit * * @return line number of data.if less or equal than 0, means fail. * * @throws InterruptedException * if being interrupted during the try limit time. */ public int poll(T[] ea, long timeout, TimeUnit unit) throws InterruptedException { long nanoTime = unit.toNanos(timeout); readLock.lockInterruptibly(); try { for (;;) { if (readCount > 0) { return extract(ea); } if (nanoTime == -2) { return -1; } if (nanoTime <= 0) { return 0; } nanoTime = queueSwitch(nanoTime, false); } } finally { readLock.unlock(); } } public Iterator<T> iterator() { return null; } /** * Get size of {@link Storage} in bytes. * * @return Storage size. * * */ @Override public int size() { return (writeCount + readCount); } @Override public int drainTo(Collection<? super T> c) { return 0; } @Override public int drainTo(Collection<? super T> c, int maxElements) { return 0; } /** * If exists write space, it will return true, and write one line to the * space.<br> * otherwise, it will try to do that in a appointed time(20 * milliseconds),when time out if still failed, return false. * * @param line * a Line. * * @see DoubleCachedQueue#offer(Line, long, TimeUnit) * */ @Override public boolean offer(T line) { try { return offer(line, 20, TimeUnit.MILLISECONDS); } catch (InterruptedException e1) { log.debug(e1.getMessage(), e1); } return false; } @Override public void put(T e) throws InterruptedException { } @Override public int remainingCapacity() { return 0; } @Override public T take() throws InterruptedException { return null; } @Override public T peek() { return null; } @Override public T poll() { try { return poll(1*1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.debug(e.getMessage(), e); } return null; } }
(全文完)