傳輸隊(duì)列擴(kuò)展阻塞隊(duì)列。
生產(chǎn)者使用 TransferQueue 的 transfer(E element)方法將元素傳遞給消費(fèi)者。
當(dāng)生產(chǎn)者調(diào)用傳遞(E元素)方法時(shí),它等待直到消費(fèi)者獲取其元素。 tryTransfer()方法提供了該方法的非阻塞和超時(shí)版本。
getWaitingConsumerCount()方法返回等待消費(fèi)者的數(shù)量。
如果有一個(gè)等待消費(fèi)者, hasWaitingConsumer()方法返回true; 否則,返回false。 LinkedTransferQueue 是 TransferQueue 接口的實(shí)現(xiàn)類。 它提供了一個(gè)無(wú)界的 TransferQueue 。
以下代碼顯示如何使用 TransferQueue 。
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
class TQProducer extends Thread {
private String name;
private TransferQueue<Integer> tQueue;
private AtomicInteger sequence;
public TQProducer(String name, TransferQueue<Integer> tQueue,
AtomicInteger sequence) {
this.name = name;
this.tQueue = tQueue;
this.sequence = sequence;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
int nextNum = this.sequence.incrementAndGet();
if (nextNum % 2 == 0) {
System.out.format("%s: Enqueuing: %d%n", name, nextNum);
tQueue.put(nextNum); // Enqueue
} else {
System.out.format("%s: Handing off: %d%n", name, nextNum);
System.out.format("%s: has a waiting consumer: %b%n", name,
tQueue.hasWaitingConsumer());
tQueue.transfer(nextNum); // A hand off
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class TQConsumer extends Thread {
private final String name;
private final TransferQueue<Integer> tQueue;
public TQConsumer(String name, TransferQueue<Integer> tQueue) {
this.name = name;
this.tQueue = tQueue;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(3000);
int item = tQueue.take();
System.out.format("%s removed: %d%n", name, item);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
final TransferQueue<Integer> tQueue = new LinkedTransferQueue<>();
final AtomicInteger sequence = new AtomicInteger();
for (int i = 0; i < 5; i++) {
try {
tQueue.put(sequence.incrementAndGet());
System.out.println("Initial queue: " + tQueue);
new TQProducer("Producer-1", tQueue, sequence).start();
new TQConsumer("Consumer-1", tQueue).start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
上面的代碼生成以下結(jié)果。