Phaser由JDK1.7提出,是一個複雜強大的同步輔助類,是對同步工具類CountDownLatch和CyclicBarrier的綜合升級,能夠支持分階段實現等待的業務場景。
我們可以回憶下CountDownLatch講的是先指定N個線程,在N個線程幹完活之前,其它線程都需要等待(導遊等待旅遊團所有人上車才能開車),而CyclicBarrier講的是先指定N個線程。等N個線程到齊了大家同時幹活(多個驢友相約去旅遊,先到的需要等待後來的),而Phaser是兩者的結合,可以理解為先指定N個線程,等N個線程到齊後開始幹第一階段的活,等第一階段所有的線程都幹完活了,接着N個線程開始幹第二階段的活,直到所有的階段完成工作,程序結束,當然需要注意的是每個階段可以根據業務需要新增或者删除一些線程,并不是開始指定多少個線程每個階段就必須有多少個線程。
入門體驗看了概念可能不容易理解,從一個小demo入手體驗下
publicclassPhaserDemo1{
//指定随機種子
privatestaticRandomrandom=newRandom(System.currentTimeMillis());
publicstaticvoidmain(String[]args){
Phaserphaser=newPhaser();
//将線程注冊到phaser
phaser.register();
for(inti=0;i<5;i ){
Tasktask=newTask(phaser);
task.start();
}
phaser.arriveAndAwaitAdvance();
System.out.println("alltaskexecuteclose");
}
staticclassTaskextendsThread{
Phaserphaser;
publicTask(Phaserphaser){
this.phaser=phaser;
this.phaser.register();
}
@Override
publicvoidrun(){
try{
System.out.println(Thread.currentThread().getName() "開始執行");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(Thread.currentThread().getName() "執行完畢");
//類似CountDownLatch中的await
phaser.arriveAndAwaitAdvance();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
}
不知道有沒有這樣的疑惑,phaser.register是向phaser去注冊這個線程,那麼為什麼主線程也需要注冊呢?
其實很簡單主線程需要等待所有子線程執行完畢才能繼續往下面執行所以必須要phaser.arriveAndAwaitAdvance();阻塞等待,而這個語句是意思當前線程已經到達屏障,在此等待一段時間等條件滿足後需要向下一個屏障繼續執行,如果沒有主線程的phaser.register,直接調用phaser.arriveAndAwaitAdvance,在源碼中提到可能會有異常,所以必須在主程序中注冊phaser.register();
/*<p>Itisausageerrorforanunregisteredpartytoinvokethis
*method.However,thiserrormayresultinan{@code
*IllegalStateException}onlyuponsomesubsequentoperationon
*thisphaser,ifever.
*/
譯:
未注冊方調用此函數是一個使用錯誤方法。但是,這個錯誤可能會導緻
{@codeIllegalStateException}僅在一些後續操作這個相位器,如果有的話。
從體驗的示例中其實沒看出其優勢在哪裡,上訴場景完全可以采用CountDownLatch,所以現在換一種場景來說明Phaser的優勢。
假設某校舉行期末考試,有三門考試語文、數學、英語,每門課允許學生提前交卷,隻有當所有學生完成考試後才能舉行下一次的考試,這就是典型的分階段任務處理,示例圖如下。
将上訴場景語義化如下
publicclassPhaserExam{
publicstaticRandomrandom=newRandom(System.currentTimeMillis());
publicstaticvoidmain(String[]args){
//一次初始化2個相當于兩次register
Phaserphaser=newPhaser(2);
for(inti=0;i<2;i ){
Examexam=newExam(phaser,random.nextLong());
exam.start();
}
}
staticclassExamextendsThread{
Phaserphaser;
Longid;
publicExam(Phaserphaser,Longid){
this.phaser=phaser;
this.id=id;
}
@Override
publicvoidrun(){
try{
System.out.println(Thread.currentThread().getName() "===開始語文考試");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(Thread.currentThread().getName() "===結束語文考試");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() "===開始數學考試");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(Thread.currentThread().getName() "===結束數學考試");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() "===開始英語考試");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(Thread.currentThread().getName() "===結束英語考試");
phaser.arriveAndAwaitAdvance();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
}
代碼執行結果如下,可以看到三個階段都是等待所有線程執行完畢後才往下執行,相當于多個栅欄。
到這裡請注意,通過Phaser類的構造方法構建的party數,也就是線程數需要和循環的次數對應,不然可能影響後續階段器的正常運行。
兩個重要狀态在Phaser内有2個重要狀态,分别是phase和party,乍一看很難理解,他們的定義如下。
phase就是階段,如上面提到的語文、數學、英語考試這每個考試對應一個階段,不過phase是從0開始的,當所有任務執行完畢,準備進入下一個階段時phase就會加一。
party對應注冊到Phaser線程數,party初始值有兩種形式
Phaser常用API總結如下所示
//獲取Phaser階段數,默認0
publicfinalintgetPhase();
//向Phaser注冊一個線程
publicintregister();
//向Phaser注冊多個線程
publicintbulkRegister(intparties);
//獲取已經注冊的線程數,也就是重要狀态party的值
publicintgetRegisteredParties();
//到達并且等待其它線程到達
publicintarriveAndAwaitAdvance();
//到達後注銷不等待其它線程,繼續往下執行
publicintarriveAndDeregister();
//已到達線程數
publicintgetArrivedParties();
//未到達線程數
publicintgetUnarrivedParties();
//Phaser是否結束隻有當party的數量是0或者調用方法forceTermination時才會結束
publicbooleanisTerminated();
//結束Phaser
publicvoidforceTermination();
代碼演示如下
publicclassPhaserApiTest{
publicstaticvoidmain(String[]args)throwsInterruptedException{
Phaserphaser=newPhaser(5);
System.out.println("當前階段" phaser.getPhase());
System.out.println("注冊線程數===" phaser.getRegisteredParties());
//向phaser注冊一個線程
phaser.register();
System.out.println("注冊線程數===" phaser.getRegisteredParties());
//向phaser注冊多個線程,批量注冊
phaser.bulkRegister(4);
System.out.println("注冊線程數===" phaser.getRegisteredParties());
newThread(()->{
//到達且等待
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() "===執行1");
}).start();
newThread(()->{
//到達不等待,從phaser中注銷一個線程
phaser.arriveAndDeregister();
System.out.println(Thread.currentThread().getName() "===執行2");
}).start();
TimeUnit.SECONDS.sleep(3);
System.out.println("已到達線程數===" phaser.getArrivedParties());
System.out.println("未到達線程數===" phaser.getUnarrivedParties());
System.out.println("Phaser是否結束" phaser.isTerminated());
phaser.forceTermination();
System.out.println("Phaser是否結束" phaser.isTerminated());
}
}
執行結果如下所示
arriveAndAwaitAdvance是Phaser中一個重要實現阻塞的API,其實arriveAndAwaitAdvance是由arrive方法和awaitAdvance方法合并而來,兩個方法的作用分别為
測試代碼如下
publicclassPhaserTestArrive{
publicstaticRandomrandom=newRandom(System.currentTimeMillis());
publicstaticvoidmain(String[]args){
Phaserphaser=newPhaser(5);
for(inti=0;i<5;i ){
newTask(i,phaser).start();
}
phaser.register();
//主線程需要調用arrive的原因是主線程注冊的第六個線程還未到達,需要手動到達,才能調用awaitAdvance阻塞屏障
phaser.arrive();
//因為Phaser線程數為6,所以即使5個線程已經到達,但是還差主線程的一個,目前階段數就是0
phaser.awaitAdvance(0);
System.out.println("alltaskisend");
}
staticclassTaskextendsThread{
Phaserphaser;
publicTask(intnum,Phaserphaser){
super("Thread--" String.valueOf(num));
this.phaser=phaser;
}
@Override
publicvoidrun(){
try{
System.out.println(Thread.currentThread().getName() "===task1isstart");
TimeUnit.SECONDS.sleep(random.nextInt(3));
System.out.println(Thread.currentThread().getName() "===task1isend");
//到達且不等待
phaser.arrive();
System.out.println(Thread.currentThread().getName() "===task2isstart");
TimeUnit.SECONDS.sleep(random.nextInt(3));
System.out.println(Thread.currentThread().getName() "===task2isend");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
}
我們需要特别注意的就是Phaser所有API中隻有awaitAdvanceInterruptibly是響應中斷的,其餘全部不會響應中斷所以不需要對其進行異常處理,演示如下
publicstaticvoidmain(String[]args){
Phaserphaser=newPhaser(3);
ThreadT1=newThread(()->{
try{
phaser.awaitAdvanceInterruptibly(phaser.getPhase());
}catch(InterruptedExceptione){
System.out.println("中斷異常");
e.printStackTrace();
}
//phaser.arriveAndAwaitAdvance();
});
T1.start();
T1.interrupt();
phaser.arriveAndAwaitAdvance();
}
更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!