tft每日頭條

 > 生活

 > 效果器phase是什麼意思

效果器phase是什麼意思

生活 更新时间:2024-12-02 15:34:34

我們在前面的文章學習CountDownLatch和CyclicBarrier的時候了解到,CountDownLatch計數器隻能使用一次,不能重複利用,CyclicBarrier雖然可以重複利用但是parties在初始化時指定後就不能再修改了,所以用法上不是很靈活。

Phaser是JDK1.7引入的多線程同步工具,其功能類似于CountDownLatch和CyclicBarrier的合集,它提供的方法更靈活和豐富,但是使用難度也比較大。

Phaser用作CountDownLatch

CountDownLatch能完成的功能,Phaser也能實現,其實現方式如下:

publicstaticvoidmain(String[]args){ //初始化不指定parties數量,parties默認為0 finalPhaserphaser=newPhaser(); //主線程調用register,數量 1 phaser.register(); IntStream.range(0,5).forEach(i-> newThread(()->{ //線程調用register方法,數量 1 phaser.register(); try{ //模拟業務執行時間 TimeUnit.SECONDS.sleep(newRandom().nextInt(2)); System.out.println(Thread.currentThread().getName() "完成任務"); //線程完成任務調用arrive方法,不阻塞 phaser.arrive(); }catch(InterruptedExceptione){ e.printStackTrace(); } },"T-" i).start() ); //主線程arrive,但是它要等待第一個階段結束,其前提是所有的線程都要arrive phaser.arriveAndAwaitAdvance(); System.out.println("注冊的任務數量:" phaser.getRegisteredParties()); System.out.println(Thread.currentThread().getName() "被喚醒"); }

初始化parties數量為0,啟動5個線程執行業務,每個線程調用register使得parties數量加1,并在業務執行結束之後調用arrive方法。

main線程調用arriveAndAwaitAdvance方法一直阻塞直到所有的線程都arrive。

運行結果為:

效果器phase是什麼意思(階段器Phaser)1

main線程直到所有任務執行結束才會被喚醒,實現了CountDownLatch的效果。

Phaser用作CyclicBarrier

CyclicBarrier可以使子任務阻塞直到所有線程到達屏障,在Phaser中隻需要把子任務到達的方法arrive修改為arriveAndAwaitAdvance即可實現CyclicBarrier的效果。

publicstaticvoidmain(String[]args){ //初始化不指定數量 finalPhaserphaser=newPhaser(); phaser.register(); IntStream.range(0,5).forEach(i-> newThread(()->{ //線程調用register使得數量加1 phaser.register(); try{ TimeUnit.SECONDS.sleep(newRandom().nextInt(5)); System.out.println(Thread.currentThread().getName() "完成任務"); //線程完成任務調用arriveAndAwaitAdvance阻塞直到所有線程都到達 phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName() "繼續執行任務"); }catch(InterruptedExceptione){ e.printStackTrace(); } },"T-" i).start() ); //主線程arrive,但是它要等待第一個階段結束,其前提是所有的線程都要arrive phaser.arriveAndAwaitAdvance(); System.out.println("注冊的任務數量:" phaser.getRegisteredParties()); System.out.println(Thread.currentThread().getName() "被喚醒"); }

初始化parties數量為0,啟動5個線程執行業務,每個線程調用register使得parties數量加1,并在業務執行結束之後調用arriveAndAwaitAdvance,此方法會使線程阻塞直到所有的線程都到達之後才繼續運行。

運行結果為:

效果器phase是什麼意思(階段器Phaser)2

通過結果我們看到main線程和所有T線程在所有線程到達之後繼續并發執行,實現了CyclicBarrier的效果。

Phaser(階段)編号

在Phaser中有一個比較重要的概念:Phaser(階段)編号。

一個Phaser中所有相關聯的parties都arrive(到達後),Phaser就會從下一個階段開始,除非Phaser已經被終止或者銷毀。

Phaser為每一個Phaser都提供了一個編号,初始化為0,所有任務分片到達之後加1。

Phaser階段可以理解為每次重用Phaser都會為其分配一個階段編号,每次全部任務到達都順序加1。

效果器phase是什麼意思(階段器Phaser)3

我們通過一段代碼查看不同情況下Phaser的編号。

publicstaticvoidmain(String[]args){ finalPhaserphaser=newPhaser(3); System.out.println("Phaser階段編号為:" phaser.getPhase()); phaser.arrive(); phaser.arrive(); phaser.arrive(); System.out.println("Phaser階段編号為:" phaser.getPhase()); //繼續重用phaser phaser.bulkRegister(1); //現在parties數量為4 phaser.arrive(); phaser.arrive(); phaser.arrive(); System.out.println("Phaser階段編号為:" phaser.getPhase()); phaser.arrive(); System.out.println("Phaser階段編号為:" phaser.getPhase()); }

getPhase()方法返回Phaser的當前Phaser編号,最大的Phaser編号為Integer.MAX_VALUE,超過最大值将重新從0開始。當Phaser被終止時,Phaser編号将返回負值。

Phaser中大部分方法都會返回Phaser編号,理解Phaser編号是理解Phaser方法的基礎。

Phaser方法總結

Phaser構造器

parties為分片任務數量,可以構造時指定,也可以通過調用register方法對其加1。

parent為Phaser的層級,如果指定了parent,那麼子Phaser的Phaser編号會以父Phaser為準,且父Phaser的分片任務等于所有子Phaser的分片任務和父Phaser的分片任務總和。

因parent會使線程管理比較複雜,一般很少使用,所以我們開發中最常用的是以下構造中的前2個。

Phaser():不指定parties數量,默認為0;并且不指定parent,parent默認為null。

Phaser(int parties):指定parties數量,不指定parent。

Phaser(Phaser parent):指定parent,不指定parties數量。

Phaser(Phaser parent, int parties):同時指定parent和parties數量。

register()方法

publicintregister(){ returndoRegister(1); }

register方法的作用是為Phaser增加一個未到達的parties,并且返回Phaser的編号。使用該方法時,有的時候會陷入阻塞,比如前一個Phaser執行onAdvance方法耗時比較長,那麼如果此時想要一個新的parties通過register方法加入就會陷入阻塞。

publicstaticvoidmain(String[]args){ finalPhaserphaser=newPhaser(1){ @Override protectedbooleanonAdvance(intphase,intregisteredParties){ try{ TimeUnit.SECONDS.sleep(30); }catch(InterruptedExceptione){ e.printStackTrace(); } returnsuper.onAdvance(phase,registeredParties); } }; newThread(()->{ phaser.arrive(); }).start(); TimeUnit.SECONDS.sleep(2); longtime=System.currentTimeMillis(); phaser.register(); System.out.println("當前階段編号為:" phaser.getPhase()); System.out.println("耗時:" (System.currentTimeMillis()-time) "ms"); }

上述代碼中重寫了onAdvance方法,初始化parties為1,啟動一個線程調用arrive方法分片任務到達,此時onAdvance得以被執行。

main線程調用register會被阻塞,因為此時onAdvance正在做收尾工作,它在執行完邏輯之後才會接納新的分片進來,因此onAdvance盡量不要寫太複雜的代碼。

onAdvance在每一個Phaser階段中所有的任務分片達到之後都會被執行,它還決定了Phaser是否終止:當onAdvance方法返回true表明該Phaser将被終止,接下來将不能再使用。onAdvance可以實現CyclicBarrier中Runnable的作用。

bulkRegister(int parties)方法

publicintbulkRegister(intparties){ if(parties<0) thrownewIllegalArgumentException(); if(parties==0) returngetPhase(); returndoRegister(parties); }

該方法的作用與register()一樣,通過源碼我們可以看到他們調用的都是doRegister方法。不同點是register隻允許注冊一個分片到Phaser,但是bulkRegister方法可以允許0個或者多個分片到達Phaser。

arrive()和arriveAndAwaitAdvance()方法

arrive方法是到達Phaser的下一個階段,不會等待其他分片,它返回的階段編号為當前Phaser的編号。

arriveAndAwaitAdvance方法是到達Phaser的下一個階段,需要阻塞等待其他線程的分片都到達。而且此方法返回的是下一個Phaser的編号。

publicstaticvoidmain(String[]args){ finalPhaserphaser=newPhaser(2); System.out.println("當前Phaser的編号:" phaser.getPhase()); phaser.arrive(); System.out.println("第一次arrive後的Phaser的編号:" phaser.getPhase()); phaser.arrive(); System.out.println("第二次arrive後的Phaser的編号:" phaser.getPhase()); }

getPhase編号初始化為0,第一次調用arrive方法,返回當前編号為0,因我們定義了兩個切片,此時隻有一個分片任務到達。第二次調用arrive方法,返回當前編号為1,因為此時所有的分片任務都已到達,Phaser已經進入下一個階段了。

arriveAndDeregister()方法

該方法可以到達下一個Phaser階段之外,還會将當前Phaser的parties數量減少一個。

publicstaticvoidmain(String[]args){ finalPhaserphaser=newPhaser(2); System.out.println("當前Phaser的編号:" phaser.getPhase() "&&當前的注冊的parties:" phaser.getRegisteredParties()); phaser.arriveAndDeregister(); System.out.println("當前Phaser的編号:" phaser.getPhase() "&&當前的注冊的parties:" phaser.getRegisteredParties()); phaser.arriveAndDeregister(); System.out.println("當前Phaser的編号:" phaser.getPhase() "&&當前的注冊的parties:" phaser.getRegisteredParties()); }

初始化定義parties數量為2,每次調用arriveAndDeregister打印出的parties數量都會減1,最後一次輸出的Phaser為負數,表示當前的Phaser已經被銷毀。

awaitAdvance(int phase)方法

awaitAdvance有3種不同的表現:awaitAdvance(int phase)、awaitAdvanceInterruptibly(int phase)、awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)。

這三個方法更多的是關注阻塞而不關注arrive,等待某個Phaser關聯的所有分片是否已經到達某個指定的階段。使用這3個方法不會影響Phaser内部分片的arrive和unarrive變化的。

awaitAdvance(int phase)方法的主要作用是等待與Phaser相關聯的分片都到達某個指定的Phaser階段編号,如果有某個分片任務未到達,那麼此方法就會進入阻塞,且不能打斷。如果入參Phaser與當前Phaser的階段編号不一緻,該方法會立即返回,如果Phaser已經被銷毀,該方法的返回值為負數。

publicstaticvoidmain(String[]args){ finalPhaserphaser=newPhaser(1); newThread(()->{ System.out.println("當前Phaser的階段編号為:" phaser.getPhase()); intphaserNum=phaser.awaitAdvance(phaser.getPhase()); System.out.println("phaserNum為:" phaserNum); }).start(); TimeUnit.SECONDS.sleep(30); phaser.arriveAndAwaitAdvance(); System.out.println("當前Phaser的階段編号為:" phaser.getPhase()); }

thread線程調用awaitAdvance方法一直阻塞直到main線程調用arriveAndAwaitAdvance方法分片任務到達後,thread線程退出阻塞,且awaitAdvance方法返回的Phaser編号為下一個階段的Phaser編号。

awaitAdvanceInterruptibly(int phase)與awaitAdvance(int phase)方法作用相同,隻是其可以被打斷。

awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)在可以被打斷的基礎上增加了設置超時等待的功能。

Phaser的介紹到這裡就結束了,對其方法的理解還需在使用中加深印象。

JUC包中還有一個并發工具類Exchanger,可以實現線程間的數據交換,感興趣的小夥伴自行查找資料了解喔。

本周勾勾會繼續學習并發編程知識,陸續推出并發容器和線程池的文章,期待和你一起進步!!!

,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关生活资讯推荐

热门生活资讯推荐

网友关注

Copyright 2023-2024 - www.tftnews.com All Rights Reserved