我們在前面的文章學習CountDownLatch和CyclicBarrier的時候了解到,CountDownLatch計數器隻能使用一次,不能重複利用,CyclicBarrier雖然可以重複利用但是parties在初始化時指定後就不能再修改了,所以用法上不是很靈活。
Phaser是JDK1.7引入的多線程同步工具,其功能類似于CountDownLatch和CyclicBarrier的合集,它提供的方法更靈活和豐富,但是使用難度也比較大。
Phaser用作CountDownLatchCountDownLatch能完成的功能,Phaser也能實現,其實現方式如下:
publicstaticvoidmain(String[]args){
//初始化不指定parties數量,parties默認為0
finalPhaserphaser=newPhaser();
//主線程調用register,數量 1
phaser.register();
IntStream.range(0,5).forEach(i->
newThread(()->{
//線程調用register方法,數量 1
phaser.register();
try{
//模拟業務執行時間
TimeUnit.SECONDS.sleep(newRandom().nextInt(2));
System.out.println(Thread.currentThread().getName() "完成任務");
//線程完成任務調用arrive方法,不阻塞
phaser.arrive();
}catch(InterruptedExceptione){
e.printStackTrace();
}
},"T-" i).start()
);
//主線程arrive,但是它要等待第一個階段結束,其前提是所有的線程都要arrive
phaser.arriveAndAwaitAdvance();
System.out.println("注冊的任務數量:" phaser.getRegisteredParties());
System.out.println(Thread.currentThread().getName() "被喚醒");
}
初始化parties數量為0,啟動5個線程執行業務,每個線程調用register使得parties數量加1,并在業務執行結束之後調用arrive方法。
main線程調用arriveAndAwaitAdvance方法一直阻塞直到所有的線程都arrive。
運行結果為:
main線程直到所有任務執行結束才會被喚醒,實現了CountDownLatch的效果。
Phaser用作CyclicBarrierCyclicBarrier可以使子任務阻塞直到所有線程到達屏障,在Phaser中隻需要把子任務到達的方法arrive修改為arriveAndAwaitAdvance即可實現CyclicBarrier的效果。
publicstaticvoidmain(String[]args){
//初始化不指定數量
finalPhaserphaser=newPhaser();
phaser.register();
IntStream.range(0,5).forEach(i->
newThread(()->{
//線程調用register使得數量加1
phaser.register();
try{
TimeUnit.SECONDS.sleep(newRandom().nextInt(5));
System.out.println(Thread.currentThread().getName() "完成任務");
//線程完成任務調用arriveAndAwaitAdvance阻塞直到所有線程都到達
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() "繼續執行任務");
}catch(InterruptedExceptione){
e.printStackTrace();
}
},"T-" i).start()
);
//主線程arrive,但是它要等待第一個階段結束,其前提是所有的線程都要arrive
phaser.arriveAndAwaitAdvance();
System.out.println("注冊的任務數量:" phaser.getRegisteredParties());
System.out.println(Thread.currentThread().getName() "被喚醒");
}
初始化parties數量為0,啟動5個線程執行業務,每個線程調用register使得parties數量加1,并在業務執行結束之後調用arriveAndAwaitAdvance,此方法會使線程阻塞直到所有的線程都到達之後才繼續運行。
運行結果為:
通過結果我們看到main線程和所有T線程在所有線程到達之後繼續并發執行,實現了CyclicBarrier的效果。
Phaser(階段)編号在Phaser中有一個比較重要的概念:Phaser(階段)編号。
一個Phaser中所有相關聯的parties都arrive(到達後),Phaser就會從下一個階段開始,除非Phaser已經被終止或者銷毀。
Phaser為每一個Phaser都提供了一個編号,初始化為0,所有任務分片到達之後加1。
Phaser階段可以理解為每次重用Phaser都會為其分配一個階段編号,每次全部任務到達都順序加1。
我們通過一段代碼查看不同情況下Phaser的編号。
publicstaticvoidmain(String[]args){
finalPhaserphaser=newPhaser(3);
System.out.println("Phaser階段編号為:" phaser.getPhase());
phaser.arrive();
phaser.arrive();
phaser.arrive();
System.out.println("Phaser階段編号為:" phaser.getPhase());
//繼續重用phaser
phaser.bulkRegister(1);
//現在parties數量為4
phaser.arrive();
phaser.arrive();
phaser.arrive();
System.out.println("Phaser階段編号為:" phaser.getPhase());
phaser.arrive();
System.out.println("Phaser階段編号為:" phaser.getPhase());
}
getPhase()方法返回Phaser的當前Phaser編号,最大的Phaser編号為Integer.MAX_VALUE,超過最大值将重新從0開始。當Phaser被終止時,Phaser編号将返回負值。
Phaser中大部分方法都會返回Phaser編号,理解Phaser編号是理解Phaser方法的基礎。
Phaser方法總結Phaser構造器
parties為分片任務數量,可以構造時指定,也可以通過調用register方法對其加1。
parent為Phaser的層級,如果指定了parent,那麼子Phaser的Phaser編号會以父Phaser為準,且父Phaser的分片任務等于所有子Phaser的分片任務和父Phaser的分片任務總和。
因parent會使線程管理比較複雜,一般很少使用,所以我們開發中最常用的是以下構造中的前2個。
Phaser():不指定parties數量,默認為0;并且不指定parent,parent默認為null。
Phaser(int parties):指定parties數量,不指定parent。
Phaser(Phaser parent):指定parent,不指定parties數量。
Phaser(Phaser parent, int parties):同時指定parent和parties數量。
register()方法
publicintregister(){
returndoRegister(1);
}
register方法的作用是為Phaser增加一個未到達的parties,并且返回Phaser的編号。使用該方法時,有的時候會陷入阻塞,比如前一個Phaser執行onAdvance方法耗時比較長,那麼如果此時想要一個新的parties通過register方法加入就會陷入阻塞。
publicstaticvoidmain(String[]args){
finalPhaserphaser=newPhaser(1){
@Override
protectedbooleanonAdvance(intphase,intregisteredParties){
try{
TimeUnit.SECONDS.sleep(30);
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnsuper.onAdvance(phase,registeredParties);
}
};
newThread(()->{
phaser.arrive();
}).start();
TimeUnit.SECONDS.sleep(2);
longtime=System.currentTimeMillis();
phaser.register();
System.out.println("當前階段編号為:" phaser.getPhase());
System.out.println("耗時:" (System.currentTimeMillis()-time) "ms");
}
上述代碼中重寫了onAdvance方法,初始化parties為1,啟動一個線程調用arrive方法分片任務到達,此時onAdvance得以被執行。
main線程調用register會被阻塞,因為此時onAdvance正在做收尾工作,它在執行完邏輯之後才會接納新的分片進來,因此onAdvance盡量不要寫太複雜的代碼。
onAdvance在每一個Phaser階段中所有的任務分片達到之後都會被執行,它還決定了Phaser是否終止:當onAdvance方法返回true表明該Phaser将被終止,接下來将不能再使用。onAdvance可以實現CyclicBarrier中Runnable的作用。
bulkRegister(int parties)方法
publicintbulkRegister(intparties){
if(parties<0)
thrownewIllegalArgumentException();
if(parties==0)
returngetPhase();
returndoRegister(parties);
}
該方法的作用與register()一樣,通過源碼我們可以看到他們調用的都是doRegister方法。不同點是register隻允許注冊一個分片到Phaser,但是bulkRegister方法可以允許0個或者多個分片到達Phaser。
arrive()和arriveAndAwaitAdvance()方法
arrive方法是到達Phaser的下一個階段,不會等待其他分片,它返回的階段編号為當前Phaser的編号。
arriveAndAwaitAdvance方法是到達Phaser的下一個階段,需要阻塞等待其他線程的分片都到達。而且此方法返回的是下一個Phaser的編号。
publicstaticvoidmain(String[]args){
finalPhaserphaser=newPhaser(2);
System.out.println("當前Phaser的編号:" phaser.getPhase());
phaser.arrive();
System.out.println("第一次arrive後的Phaser的編号:" phaser.getPhase());
phaser.arrive();
System.out.println("第二次arrive後的Phaser的編号:" phaser.getPhase());
}
getPhase編号初始化為0,第一次調用arrive方法,返回當前編号為0,因我們定義了兩個切片,此時隻有一個分片任務到達。第二次調用arrive方法,返回當前編号為1,因為此時所有的分片任務都已到達,Phaser已經進入下一個階段了。
arriveAndDeregister()方法
該方法可以到達下一個Phaser階段之外,還會将當前Phaser的parties數量減少一個。
publicstaticvoidmain(String[]args){
finalPhaserphaser=newPhaser(2);
System.out.println("當前Phaser的編号:" phaser.getPhase() "&&當前的注冊的parties:" phaser.getRegisteredParties());
phaser.arriveAndDeregister();
System.out.println("當前Phaser的編号:" phaser.getPhase() "&&當前的注冊的parties:" phaser.getRegisteredParties());
phaser.arriveAndDeregister();
System.out.println("當前Phaser的編号:" phaser.getPhase() "&&當前的注冊的parties:" phaser.getRegisteredParties());
}
初始化定義parties數量為2,每次調用arriveAndDeregister打印出的parties數量都會減1,最後一次輸出的Phaser為負數,表示當前的Phaser已經被銷毀。
awaitAdvance(int phase)方法
awaitAdvance有3種不同的表現:awaitAdvance(int phase)、awaitAdvanceInterruptibly(int phase)、awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)。
這三個方法更多的是關注阻塞而不關注arrive,等待某個Phaser關聯的所有分片是否已經到達某個指定的階段。使用這3個方法不會影響Phaser内部分片的arrive和unarrive變化的。
awaitAdvance(int phase)方法的主要作用是等待與Phaser相關聯的分片都到達某個指定的Phaser階段編号,如果有某個分片任務未到達,那麼此方法就會進入阻塞,且不能打斷。如果入參Phaser與當前Phaser的階段編号不一緻,該方法會立即返回,如果Phaser已經被銷毀,該方法的返回值為負數。
publicstaticvoidmain(String[]args){
finalPhaserphaser=newPhaser(1);
newThread(()->{
System.out.println("當前Phaser的階段編号為:" phaser.getPhase());
intphaserNum=phaser.awaitAdvance(phaser.getPhase());
System.out.println("phaserNum為:" phaserNum);
}).start();
TimeUnit.SECONDS.sleep(30);
phaser.arriveAndAwaitAdvance();
System.out.println("當前Phaser的階段編号為:" phaser.getPhase());
}
thread線程調用awaitAdvance方法一直阻塞直到main線程調用arriveAndAwaitAdvance方法分片任務到達後,thread線程退出阻塞,且awaitAdvance方法返回的Phaser編号為下一個階段的Phaser編号。
awaitAdvanceInterruptibly(int phase)與awaitAdvance(int phase)方法作用相同,隻是其可以被打斷。
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)在可以被打斷的基礎上增加了設置超時等待的功能。
Phaser的介紹到這裡就結束了,對其方法的理解還需在使用中加深印象。
JUC包中還有一個并發工具類Exchanger,可以實現線程間的數據交換,感興趣的小夥伴自行查找資料了解喔。
本周勾勾會繼續學習并發編程知識,陸續推出并發容器和線程池的文章,期待和你一起進步!!!
,更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!