一、什麼是RXJAVARXJAVA是一個庫,用來支持我們需求裡需要異步操作的地方它比起handler等異步操作的實現方式來說,顯得更為簡潔把整個操作整合成一條流水線,從上遊到下遊都能夠看的清,我來為大家科普一下關于java架構師必備高可用架構?下面希望有你要的答案,我們一起來看看吧!
java架構師必備高可用架構
一、什麼是RXJAVA
RXJAVA是一個庫,用來支持我們需求裡需要異步操作的地方。它比起handler等異步操作的實現方式來說,顯得更為簡潔。把整個操作整合成一條流水線,從上遊到下遊都能夠看的清。
二、RXJAVA的原理
RXJAVA的實現,是一種擴展式的觀察者模式。
RXJAVA中有四種概念。observable(被觀察者),Observer(觀察者),subscribe(訂閱),事件。Observable和Observer通過subscribe來實現訂閱關系。與傳統的觀察者模式不同,除了onNext事件外,Rxjava還提供了onCompleted和onError。當不再有onNext事件發送時,将以onCompleted事件作為結束。當處理過程中出現異常時,會觸發onError,同時隊列自動終止,不允許再有事件發出。onCompleted和onError在一個序列中有且隻有一個,二者互斥,隻能出現一個。
subscribeOn和observeOn
subscribeOn調用可以将之前的操作加如線程池,從而保證運行于子線程中,observeOn會使後邊的執行運行于主線程,這裡的之前和後邊均是指的代碼結構上的前後。
subscribeOn
經過分析可知道,當subscribeOn調用的時候,會創建一個ObservableSubscribeOn對象返回,與此同時,上一級産生的對象會被保存在當前對象的source變量中,并且,将創建出一個線程池,先看線程池的創建,這裡直接以io線程為例
Schedulers.io(Schedulers.io())
public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); }
其中的IO是在Schedulers類加載的時候就創建出來的,從這個結構可以看出,IO就是IoScheduler對象,RxJavaPlugins.initIoScheduler方法接收一個Callable線程,返回callable.call,也就是call方法中返回的就是這個函數的返回值(Callable是另一種開啟線程的方式,這個線程有返回值,當返回值獲取到之前,會阻塞當前線程)
IO = RxJavaPlugins.initIoscheduler(new CallableScheduler() { @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } }); static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); }
那麼IoScheduler是什麼?當IoScheduler創建的時候
public IoScheduler() { this.pool = new AtomicReferenceCachedworkerPool start(); } @Override public void start() { CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } }
NONE是IoScheduler中創建的一個線程池,所以IoScheduler其實就是一個封裝好了的線程池對象
static final CachedWorkerPool NONE; static NONE = new CachedWorkerPool(0, null); } CachedWorkerPool(long keepAliveTime, TimeUnit unit) { this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L; this.expiringWorkerQueue = new ConcurrentLinkedQueueThreadWorker this.allWorkers = new CompositeDisposable(); ScheduledExecutorService evictor = null; Future task = null; if (unit != null) { evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS); } evictorService = evictor; evictorTask = task; }
Schedulers.io(Schedulers.io())的調用,執行了兩個動作,第一,保存上一級的對象,第二創建線程池
接下來來看主線程的切換,調用observeOn方法,創建ObservableObserveOn對象,同樣保存上一級産生的對象到source中,這裡指的就是subscribeOn返回的對象ObservableSubscribeOn,并且保存傳入的Scheduler--AndroidSchedulers.mainThread()
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, scheduler is null ObjectHelper.verifyPositive(bufferSize, bufferSize return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize)); }
進入AndroidSchedulers.mainThread(),與上邊同樣的寫法,最後返回HandlerScheduler
public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new CallableScheduler() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } }); private static final class MainHolder { //可以猜測這個HandlerScheduler是一個通過對Handler進行封裝 //運行于主線程的線程,可以看到Looper.getMainLooper()傳入了一個主線程的 //looper對象,事實上也是如此 static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); } }
所以,很類似,observeOn(AndroidSchedulers.mainThread())同樣是做了兩件事,保存source和Scheduler,那麼兩種線程是如何進行調度的,其實看到這裡,還沒有進入正題,真正的邏輯其實在subscribe方法上。
subscribe
以subscribe(new ObserverString())為例說明(new Consumer最終源碼也是相同的),調用subscribe方法後會來到Observable的抽象方法subscribeActual中,所以我們要到當前Observable實現類中找這個方法,按照上邊程序調用的順序,此時,調用subscribe方法的對象是observeOn方法産生的ObservableObserveOn,進入這個類,找到subscribeActual方法
@Override protected void subscribeActual(Observer? super T observer) { //這個scheduler是指AndroidSchedulers.mainThread(),也就是HandlerScheduler if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //創建一個worker Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize)); } }
進入HandlerScheduler找到createWorker方法,這裡創建了一個HandlerWorker對象,看到這裡大概也可以猜測一下,HandlerWorker中的schedule方法将會是一個關鍵,傳入的handler是主線程中的handler,明顯是要通過消息機制發送到主線程執行,問題的關鍵,在于是怎麼發送到主線程執行的,schedule方法的具體執行我們暫且不看,按照程序執行順序繼續往下走
@Override public Worker createWorker() { return new HandlerWorker(handler); } private static final class HandlerWorker extends Worker { private final Handler handler; private volatile boolean disposed; HandlerWorker(Handler handler) { this.handler = handler; } @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ...... ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this workers runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); ...... return scheduled; } ...... }
在創建了worker之後,調用方法subscribe,source很明顯是ObservableObserveOn對象創建的時候所保存的上一級的調用subscribeOn方法産生的ObservableSubscribeOn對象,通過這個對象調用subscribe方法,又會進入到ObservableSubscribeOn的subscribeActual方法。observer指的是我們代碼中傳入的observer(subscribe時new的那個),這裡對observer封裝了一層,以ObserveOnObserver的形式傳入到ObservableSubscribeOn的subscribeActual方法中,向上層傳遞了一級,可以參考08.RxJava運作流程源碼分析中提供的流程圖
//source指ObservableSubscribeOn對象 source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
來到ObservableSubscribeOn的subscribeActual
@Override //參數s指的時對observer封裝了一層之後的ObserveOnObserver(new ObserveOnObserver(new Observer )) public void subscribeActual(final Observer? super T s) { //對ObserveOnObserver對象進行一次封裝 //此時Observer已經被封裝了兩層 //(new SubscribeOnObserver(new ObserveOnObserver(new Observer))) final SubscribeOnObserver parent = new SubscribeOnObserver(s); //調用ObserveOnObserver對象的onSubscribe s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
看看ObserveOnObserver的onSubscribe方法
@Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; //注意這裡這個判斷這次是不會滿足的,也就是這裡的代碼不會走 if (s instanceof QueueDisposable) { @SuppressWarnings(unchecked) QueueDisposable qd = (QueueDisposable) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); //同步,如果是要同步執行,就是指如果設置了在主線程執行,那麼 //就執行schedule(),往下看可以發現是使用我我們創建的worker //發送到主線程執行 if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true; //actual指的就是我們傳入的最原始的那個observer actual.onSubscribe(this); schedule(); return; } //異步,如果是異步執行,直接在當前線程執行,當前線程也就是子線程 if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; actual.onSubscribe(this); return; } queue = new SpscLinkedArrayQueue(bufferSize); //actual是我們new的那個Observer,所以這裡直接回調了onSubscribe方法 actual.onSubscribe(this); } }
scheduler就是Schedulers.io()得到的就是IoSchedule對象,在上邊分析subscribeOn方法時我們已經知道這個對象是一個線程池,調用scheduleDirect方法就是将SubscribeTask這個Runnable放進了線程池執行,并且是在子線程中
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
createWorker()是個抽象類,在IoSchedule中找到重寫的方法
@Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }
所以這樣一來也就是說new SubscribeTask(parent))這個Runnable被放入了線程池執行,這時候會調用它的run方法,這樣就又回到了調用上一級産生對象的subscribe方法中去了,不同的是此時subscribe已經是在線程池中執行了(子線程)
@Override public void run() { source.subscribe(parent); }
就這樣一級一級的往上調用,下一個會走到ObservableMap的subscribeActual方法,最後走到ObservableJust的subscribeActual,s.onSubscribe(sd)方法并沒有執行什麼東西,onSubscribe在之前已經被調用了,重點在 sd.run()
@Override protected void subscribeActual(Observer? super T s) { ScalarDisposable sd = new ScalarDisposable(s, value); s.onSubscribe(sd); sd.run(); }
終于在這裡要看到onNext onComplete方法的執行了
@Override public void run() { if (get() == START compareAndSet(START, ON_NEXT)) { //observer 是 new MapObserver(new SubscribeOnObserver(new ObserveOnObserver(new Observer(){......}))) observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } }
此時一層一層的調用到這裡,observer對象已經是經過層層封裝包裹的observer了(new MapObserver(new SubscribeOnObserver(new ObserveOnObserver(new Observer(){......})))),所以調用observer.onNext會首先執行MapObserver中的onNext,不管用戶調用了幾次map操作符,都會一個一個的通過回調onNext方法執行完成(如果有多個map方法被調用,當執行完一個apply方法後,後邊的actual.onNext就會進入下一個MapObserver中的onNext方法),當執行到最後一個onNext方法的時候,此時這個actual表示的就是SubscribeOnObserver對象了,也就會去執行它裡邊的onNext
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { //執行apply方法,也就是map操作符中的回調方法 v = ObjectHelper.requireNonNull(mapper.apply(t), The mapper function returned a null value. } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); }
SubscribeOnObserver中的onNext,這裡的actual指的是ObserveOnObserver,所以又要去執行它的onNext
@Override public void onNext(T t) { actual.onNext(t); }
ObserveOnObserver中的onNext
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { //這個worker是AndroidScheduler.mainThread得到的一個運行于主線程的封裝類 HandlerWorker worker.schedule(this); } }
在分析observeOn方法的時候我們已經知道這個worker是AndroidScheduler.mainThread得到的一個運行于主線程的封裝類 HandlerWorker ,worker.schedule(this)傳入的是一個Runnable,也就是會在主線程中執行這個Runnable,我們找到重寫的run方法。終于找到onNext和onComplete的最終執行的地方了,并且我們知道,這兩個方法是在主線程執行的
@Override public void run() { if (outputFused) { drainFused(); } else { //會執行這個,上邊那個先不管 drainNormal(); } } void drainNormal() { int missed = 1; final SimpleQueue q = queue; final Observer? super T a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }
到這裡,RxJava線程調度的實現方式基本上我們已經了解了。
這裡可以插一個題外話,通常我們使用handler發送的消息都是在handleMessage方法中執行,但是這裡我們無論如何找不到這個方法的實現,那麼handler是如何處理消息的?
@Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ...... ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this workers runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); ...... return scheduled; }
可以看到這裡Message message = Message.obtain(handler, scheduled),看一下obtain方法的源碼會發現傳入的第二個參數是一個callback,保存到了message的成員變量m.callback中,當handler調用sendMessageDelayed會将消息加入主線程的消息隊列(因為handler就是主線程的handler),我們知道應用啟動就會初始化一個主線程的handler一個looper和messageQueue(對消息機制不理解的可以看另一篇15.源碼閱讀(安卓消息機制)),調用looper.loop開啟一個無限循環不斷的從主線程消息隊列中取消息,我們看看它是如何取的
public static void loop() { for (;;) { Message msg = queue.next(); // might block ...... msg.target.dispatchMessage(msg); } }
無限循環中取到message後會執行發送這個Message的handler中的dispatchMessage方法,這時候會判斷callback也就是我們上邊那個傳入的,如果它不能與null,就執行handleCallback,執行callback的run方法,找到這裡終于找到為什麼沒有handlerMessage仍然可以處理消息了
public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } } private static void handleCallback(Message message) { message.callback.run(); }
傳入的callback是哪個,就是 Message message = Message.obtain(handler, scheduled)中的schedule,schedule是哪個ScheduledRunnable ,也就是說執行的是ScheduledRunnable 的run方法,delegate就是ScheduledRunnable 中傳入的那個runnable,追溯上去,這個runnable就是worker.schedule(this)中的this,所以可以找到重寫的run方法
@Override public void run() { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } }
專注于技術熱點大數據,人工智能,JAVA、Python、 C 、GO、Javascript等語言最新前言技術,及業務痛點問題分析,請關注【編程我最懂】共同交流學習。
,
更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!