4、ConcurrentLinkedQueue非阻塞無界鏈表隊列
ConcurrentLinkedQueue是一個線程安全的隊列,基于鏈表結(jié)構(gòu)實現(xiàn),是一個無界隊列,理論上來說隊列的長度可以無限擴大。與其他隊列相同,ConcurrentLinkedQueue也采用的是先進先出(FIFO)入隊規(guī)則,對元素進行排序。當(dāng)我們向隊列中添加元素時,新插入的元素會插入到隊列的尾部;而當(dāng)我們獲取一個元素時,它會從隊列的頭部中取出。因為ConcurrentLinkedQueue是鏈表結(jié)構(gòu),所以當(dāng)入隊時,插入的元素依次向后延伸,形成鏈表;而出隊時,則從鏈表的第一個元素開始獲取,依次遞增;
值得注意的是,在使用ConcurrentLinkedQueue時,如果涉及到隊列是否為空的判斷,切記不可使用size()==0的做法,因為在size()方法中,是通過遍歷整個鏈表來實現(xiàn)的,在隊列元素很多的時候,size()方法十分消耗性能和時間,只是單純的判斷隊列為空使用isEmpty()即可。
public class ConcurrentLinkedQueueTest {
public static int threadCount = 10;
public static ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
static class Offer implements Runnable {
public void run() {
//不建議使用 queue.size()==0,影響效率。可以使用!queue.isEmpty()
if (queue.size() == 0) {
String ele = new Random().nextInt(Integer.MAX_VALUE) + "";
queue.offer(ele);
System.out.println("入隊元素為" + ele);
}
}
}
static class Poll implements Runnable {
public void run() {
if (!queue.isEmpty()) {
String ele = queue.poll();
System.out.println("出隊元素為" + ele);
}
}
}
public static void main(String[] agrs) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int x = 0; x < threadCount; x++) {
executorService.submit(new Offer());
executorService.submit(new Poll());
}
executorService.shutdown();
}
}
一種輸出:
入隊元素為313732926
出隊元素為313732926
入隊元素為812655435
出隊元素為812655435
入隊元素為1893079357
出隊元素為1893079357
入隊元素為1137820958
出隊元素為1137820958
入隊元素為1965962048
出隊元素為1965962048
出隊元素為685567162
入隊元素為685567162
出隊元素為1441081163
入隊元素為1441081163
出隊元素為1627184732
入隊元素為1627184732
ConcurrentLinkedQuere類圖
如圖ConcurrentLinkedQueue中有兩個volatile類型的Node節(jié)點分別用來存在列表的首尾節(jié)點,其中head節(jié)點存放鏈表第一個item為null的節(jié)點,tail則并不是總指向最后一個節(jié)點。Node節(jié)點內(nèi)部則維護一個變量item用來存放節(jié)點的值,next用來存放下一個節(jié)點,從而鏈接為一個單向無界列表。
public ConcurrentLinkedQueue(){
head=tail=new Node<E>(null);
}
如上代碼初始化時候會構(gòu)建一個 item 為 NULL 的空節(jié)點作為鏈表的首尾節(jié)點。
Offer 操作offer 操作是在鏈表末尾添加一個元素,下面看看實現(xiàn)原理。
public boolean offer(E e) {
//e 為 null 則拋出空指針異常
checkNotNull(e);
//構(gòu)造 Node 節(jié)點構(gòu)造函數(shù)內(nèi)部調(diào)用 unsafe.putObject,后面統(tǒng)一講
final Node<E> newNode = new Node<E>(e);
//從尾節(jié)點插入
for (Node<E> t = tail, p = t; ; ) {
Node<E> q = p.next;
//如果 q=null 說明 p 是尾節(jié)點則插入
if (q == null) {
//cas 插入(1)
if (p.casNext(null, newNode)) {
//cas 成功說明新增節(jié)點已經(jīng)被放入鏈表,然后設(shè)置當(dāng)前尾節(jié)點(包含 head,1,3,5.。。個節(jié)點為尾節(jié)點)
if (p != t)// hop two nodes at a time
casTail(t, newNode); // Failure is OK. return true;
}
// Lost CAS race to another thread; re-read next
} else if (p == q)//(2)
//多線程操作時候,由于 poll 時候會把老的 head 變?yōu)樽砸茫缓?head 的 next 變?yōu)樾?head,所以這里需要
//重新找新的 head,因為新的 head 后面的節(jié)點才是激活的節(jié)點
p = (t != (t = tail)) ? t : head;
else
// 尋找尾節(jié)點(3)
p = (p != t && t != (t = tail)) ? t : q;
}
}
從構(gòu)造函數(shù)知道一開始有個item為null的哨兵節(jié)點,并且head和tail都是指向這個節(jié)點。
如圖首先查找尾節(jié)點,q==null,p就是尾節(jié)點,所以執(zhí)行p.casNext通過cas設(shè)置p的next為新增節(jié)點,這時候p==t所以不重新設(shè)置尾節(jié)點為當(dāng)前新節(jié)點。由于多線程可以調(diào)用offer方法,所以可能兩個線程同時執(zhí)行到了(1)進行cas,那么只有一個會成功(假如線程1成功了),成功后的鏈表為:
失敗的線程會循環(huán)一次這時候指針為:
這時候會執(zhí)行(3)所以 p=q,然后在循環(huán)后指針位置為:
所以沒有其他線程干擾的情況下會執(zhí)行(1)執(zhí)行 cas 把新增節(jié)點插入到尾部,沒有干擾的情況下線程 2 cas 會成功,然后去更新尾節(jié)點 tail,由于 p!=t 所以更新。這時候鏈表和指針為:
假如線程 2cas 時候線程 3 也在執(zhí)行,那么線程 3 會失敗,循環(huán)一次后,線程 3 的節(jié)點狀態(tài)為:
這時候 p!=t ;并且 t 的原始值為 told,t 的新值為 tnew ,所以 told!=tnew,所以 p=tnew=tail
然后在循環(huán)一下后節(jié)點狀態(tài):
q==null 所以執(zhí)行(1)。
現(xiàn)在就差 p==q 這個分支還沒有走,這個要在執(zhí)行 poll 操作后才會出現(xiàn)這個情況。poll 后會存在下面的狀態(tài)
這個時候添加元素時候指針分布為:
所以會執(zhí)行(2)分支 結(jié)果 p=head,然后循環(huán),循環(huán)后指針分布:
所以執(zhí)行(1),然后 p!=t 所以設(shè)置 tail 節(jié)點。現(xiàn)在分布圖:
自引用的節(jié)點會被垃圾回收掉。
● add 操作
add操作是在鏈表末尾添加一個元素,下面看看實現(xiàn)原理。
其實內(nèi)部調(diào)用的還是 offer
public boolean add(E e) {
return offer(e);
}
● poll 操作
poll 操作是在鏈表頭部獲取并且移除一個元素,下面看看實現(xiàn)原理。
public E poll() {
restartFromHead:
// 死 循 環(huán)
for (; ; ) {
//死循環(huán)
for (Node<E> h = head, p = h, q;
; ) {
//保存當(dāng)前節(jié)點值
E item = p.item;
//當(dāng)前節(jié)點有值則 cas 變?yōu)?null(1)
if (item != null && p.casItem(item, null)){
//cas 成功標(biāo)志當(dāng)前節(jié)點以及從鏈表中移除
if (p != h) // 類似 tail 間隔 2 設(shè)置一次頭節(jié)點(2)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//當(dāng)前隊列為空則返回 null(3)
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//自引用了,則重新找新的隊列頭節(jié)點(4)
else if (p == q)
continue restartFromHead;
else//(5)
p = q;
}
}
}
final void updateHead(Node<E> h,Node<E> p){
if(h!=p&&casHead(h,p))
h.lazySetNext(h);
}
● 當(dāng)隊列為空時候:
可知執(zhí)行(3)這時候有兩種情況,第一沒有其他線程添加元素時候(3)結(jié)果為 true 然后因為 h!=p 為 false 所以直接返回 null。第二在執(zhí)行 q=p.next 前,其他線程已經(jīng)添加了一個元素到隊列,這時候(3)返回 false,然后執(zhí)行(5)p=q,然后循環(huán)后節(jié)點分布:
這時候執(zhí)行(1)分支,進行 cas 把當(dāng)前節(jié)點值值為 null,同時只有一個線程會成功,cas 成功 標(biāo)示該節(jié)點從隊列中移除了,然后 p!=h,調(diào)用 updateHead 方法,參數(shù)為 h,p;h!=p 所以把 p 變?yōu)楫?dāng)前鏈表 head 節(jié)點,然后 h 節(jié)點的 next 指向自己。現(xiàn)在狀態(tài)為:
cas 失敗 后 會再次循環(huán),這時候分布圖為:
這時候執(zhí)行(3)返回 null.
現(xiàn)在還有個分支(4)沒有執(zhí)行過,那么什么時候會執(zhí)行那?
這時候執(zhí)行(1)分支,進行 cas 把當(dāng)前節(jié)點值值為 null,同時只有一個線程 A 會成功,cas 成功 標(biāo)示該節(jié)點從隊列中移除了,然后 p!=h,調(diào)用 updateHead 方法,假如執(zhí)行 updateHead 前另外一個線程 B 開始 poll 這時候它 p 指向為原來的 head 節(jié)點,然后當(dāng)前線程 A 執(zhí)行 updateHead 這時候 B 線程鏈表狀態(tài)為:
所以會執(zhí)行(4)重新跳到外層循環(huán),獲取當(dāng)前 head,現(xiàn)在狀態(tài)為:
● peek 操作
peek 操作是獲取鏈表頭部一個元素(只讀取不移除),下面看看實現(xiàn)原理。
代碼與 poll 類似,只是少了 castItem.并且 peek 操作會改變 head 指向,offer 后 head 指向哨兵節(jié)點,第一次 peek 后 head 會指向第一個真的節(jié)點元素。
public E peek() {
restartFromHead:
for (; ; ) {
for (Node<E> h = head, p = h, q; ; ) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
} else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
● size 操作
獲取當(dāng)前隊列元素個數(shù),在并發(fā)環(huán)境下不是很有用,因為使用 CAS 沒有加鎖所以從調(diào)用 size 函數(shù)到返回結(jié)果期間有可能增刪元素,導(dǎo)致統(tǒng)計的元素個數(shù)不精確。
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// 最大返回 Integer.MAX_VALUE
if (++count == Integer.MAX_VALUE) break;
return count;
}
//獲取第一個隊列元素(哨兵元素不算),沒有則為 null
Node<E> first() {
restartFromHead:
for (; ; ) {
for (Node<E> h = head, p = h, q; ; ) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
} else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
//獲取當(dāng)前節(jié)點的 next 元素,如果是自引入節(jié)點則返回真正頭節(jié)點
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
● remove 操作
如果隊列里面存在該元素則刪除該元素,如果存在多個則刪除第一個,并返回 true,否則返回 false
ublic boolean remove(Object o){
//查找元素為空,直接返回 false
if(o==null)return false;
Node<E> pred=null;
for(Node<E> p=first();p!=null;p=succ(p)){
E item=p.item;
//相等則使用 cas 值 null,同時一個線程成功,失敗的線程循環(huán)查找隊列中其他元素是否有匹配的。
if(item!=null&&o.equals(item)&&p.casItem(item,null)){
//獲取 next 元素
Node<E> next=succ(p);
//如果有前驅(qū)節(jié)點,并且 next 不為空則鏈接前驅(qū)節(jié)點到 next,
if(pred!=null&&next!=null)
pred.casNext(p,next);
return true;
}
pred=p;
}
return false;
}
● contains 操作
判斷隊列里面是否含有指定對象,由于是遍歷整個隊列,所以類似 size 不是那么精確,有可能調(diào)用該方法時候元素還在隊列里面,但是遍歷過程中才把該元素刪除了,那么就會返回 false.
public boolean contains(Object o) {
if (o == null) return false;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item)) return true;
}
return false;
}
關(guān)于ConcurrentLinkedQuere的offer方法有意思的問題:
offer 中有個 判斷 t != (t = tail)假如 t=node1;tail=node2;并且 node1!=node2 那么這個判斷是 true 還是 false 那,答案是 true,這個判斷是看當(dāng)前 t 是不是和 tail 相等,相等則返回 true 否者為 false,但是無論結(jié)果是啥執(zhí)行后 t 的值都是 tail。
下面從字節(jié)碼來分析下為啥。
舉一個例子:
public static void main(String[] args){
int t=2;
int tail=3;
System.out.println(t!=(t=tail));
}
結(jié)果為:true
字節(jié)碼文件:
C:\Users\Simple\Desktop\TeacherCode\Crm_Test\build\classes\com\bjpowernode\crm\util>javap-c Test001
警告:二進制文件Test001包含com.bjpowernode.crm.util.Test001 Compiled from"Test001.java"
public class com.bjpowernode.crm.util.Test001{
public class com.bjpowernode.crm.util.Test001();
Code:
0:aload_0
1:invokespecial #8 // Method java/lang/Object."<init>":()V
4:return
public static void main(java.lang.String[]){
Code:
0:iconst_2
1:istore_1
2:iconst_3
3:istore_2
4:getstatic #16 // Field java/lang/System.out:Ljava/io/PrintStream;
7:iload_1
8:iload_2
9:dup
10:istore_1
11:if_icmpeq 18
14:iconst_1
15:goto 19
18:iconst_0
19:invokevirtual #22 // Method java/io/PrintStream.println:(Z)V
22:return
}
我們從上面main方法的字節(jié)碼文件中分析
一開始棧為空:
棧
第0行指令作用是把值2入棧棧頂元素為2
棧
第1行指令作用是將棧頂int類型值保存到局部變量t中
棧
第2行指令作用是把值3入棧棧頂元素為3
棧
第3行指令作用是將棧頂int類型值保存到局部變量tail中。
棧
第4調(diào)用打印命令
第7行指令作用是把變量t中的值入棧
棧
第8行指令作用是把變量tail中的值入棧
棧
現(xiàn)在棧里面的元素為3、2,并且3位于棧頂
第9行指令作用是當(dāng)前棧頂元素入棧,所以現(xiàn)在棧內(nèi)容3,3,2
棧
第10行指令作用是把棧頂元素存放到t,現(xiàn)在棧內(nèi)容3,2
棧
第11行指令作用是判斷棧頂兩個元素值,相等則跳轉(zhuǎn)18。由于現(xiàn)在棧頂嚴(yán)肅為3,2不相等所以返回true.
第14行指令作用是把1入棧
然后回頭分析下!=是雙目運算符,應(yīng)該是首先把左邊的操作數(shù)入棧,然后在去計算了右側(cè)操作數(shù)。
● ConcurrentLinkedQuere總結(jié)
ConcurrentLinkedQueue使用CAS非阻塞算法實現(xiàn)使用CAS解決了當(dāng)前節(jié)點與next節(jié)點之間的安全鏈接和對當(dāng)前節(jié)點值的賦值。由于使用CAS沒有使用鎖,所以獲取size的時候有可能進行offer,poll或者remove操作,導(dǎo)致獲取的元素個數(shù)不精確,所以在并發(fā)情況下size函數(shù)不是很有用。另外第一次peek或者first時候會把head指向第一個真正的隊列元素。
下面總結(jié)下如何實現(xiàn)線程安全的,可知入隊出隊函數(shù)都是操作volatile變量:head,tail。所以要保證隊列線程安全只需要保證對這兩個Node操作的可見性和原子性,由于volatile本身保證可見性,所以只需要看下多線程下如果保證對著兩個變量操作的原子性。
對于offer操作是在tail后面添加元素,也就是調(diào)用tail.casNext方法,而這個方法是使用的CAS操作,只有一個線程會成功,然后失敗的線程會循環(huán)一下,重新獲取tail,然后執(zhí)行casNext方法。對于poll也是這樣的。