好長時間木有寫java,怕忘光了😄,今天抽空翻翻源碼做些總結(jié)??偟膩碚f實現(xiàn)邏輯還是比較簡單清晰的。
創(chuàng)新互聯(lián)公司專注于德興企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,成都商城網(wǎng)站開發(fā)。德興網(wǎng)站建設(shè)公司,為德興等地區(qū)提供建站服務(wù)。全流程按需求定制開發(fā),專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務(wù)實現(xiàn) 1. 架構(gòu)圖ThreadPoolExecutor 中有維護了隊列,和Worker(對應(yīng)一個線程)的池子,提交的任務(wù)會交給Worker執(zhí)行。
2. 線程池屬性corePoolSize
: 核心線程(即開啟了就會常駐的線程)的數(shù)量
workQueue
: 提交任務(wù)的隊列(當核心池用完就會先放在隊列里面)
maximumPoolSize
: 線程池大的size,如果隊列里放不下,就會接著起線程運行任務(wù)
RejectedExecutionHandler
: 超過線程池大的size,隊列也放不下就開始執(zhí)行Rejected邏輯
keepAliveTime
: 隊列消費完數(shù)據(jù)后,線程大的keepAlive時間 ,默認allowCoreThreadTimeOut 是false,只會清理maxpool的線程。
控制
ctl
: 記錄線程池生命周期的狀態(tài)位和運行的線程個數(shù) ,高三位記錄狀態(tài),低位記錄Worker個數(shù)。通過樂觀鎖的方式控制線程池(查詢線程個數(shù)的方法workerCountOf和workQueue中有些混淆,前者是運行的線程數(shù),后者是提交上來存放任務(wù))
private static final int RUNNING ? ?= -1<< COUNT_BITS; ?//運行中 ? private static final int SHUTDOWN ? = ?0<< COUNT_BITS; ?//對應(yīng)shutdown()方法,隊列和Worker現(xiàn)有的方法會繼續(xù)運行,清理掉空閑的Worker(是否空閑看能否持有到Woker的鎖),甚至還能添加Worker ? private static final int STOP ? ? ? = ?1<< COUNT_BITS; ?//對應(yīng)shutdownNow()方法,所有的Worker都會發(fā)interrupt信號(不會保證線程被釋放掉),隊列也會清空 ? private static final int TIDYING ? ?= ?2<< COUNT_BITS; ?//當所有的任務(wù)到停掉,ctl記錄的任務(wù)數(shù)為0 ? private static final int TERMINATED = ?3<< COUNT_BITS; ?//線程池正式停掉。TiDYING狀態(tài)后執(zhí)行terminated()(空方法)變成TERMINATED狀態(tài)4.提交流程:
?int c = ctl.get(); ? ? ? ?if (workerCountOf(c)< corePoolSize) { ? ? ? ? ? //當前Worker數(shù)小于corePoolSize時候,會創(chuàng)建個新的Worker ? ? ? ? ? ?if (addWorker(command, true)) ? ? ? ? ? ? ? ?return; ? ? ? ? ? ?c = ctl.get(); ? ? ? } ? ? ? ?if (isRunning(c) && workQueue.offer(command)) {//corePool的線程已經(jīng)起完了,就會將其提交到隊列中,由Worker消費 ? ? ? ? ? ?int recheck = ctl.get(); ? ? ? ? ? ?if (! isRunning(recheck) && remove(command)) ? ? ? ? ? ? ? ?reject(command); //重新檢查狀態(tài),如果池子如果不是運行中,且task未來的及消費,發(fā)起拒絕策略 ? ? ? ? ? ?else if (workerCountOf(recheck) == 0) // 如果從隊列中刪除未成功,而且Worker數(shù)等于0 ? ? ? ? ? ? ? ?addWorker(null, false); //加個Worker把這個任務(wù)消化掉 ? ? ? } ? ? ? ?else if (!addWorker(command, false)) //如果隊列也滿了,會向maximumPool增加新的Worker,去運行線程 ? ? ? ? ? ?reject(command); //maximumPool也裝不下觸發(fā)拒絕策略5.創(chuàng)建Worker的邏輯
5.1 Check是否可以添加Worker
根據(jù)上面提交流程
我們知道:
判斷是否需要添加Worker由ctl中保存的Worker數(shù)決定是否addWorkder,但是多個線程之間會有競爭,進入addWorker方法時,是會存在池子已經(jīng)滿了的情況。
這個時候會有個樂觀鎖的方式,重試檢查線程池狀態(tài)和Worker數(shù)量,保證其他提交線程的性能不會受影響:
如果池子真的滿了只能通知調(diào)用者,addWorker失敗,進行其他策略
如果池子未滿,就可以創(chuàng)建一個新的Worker
private boolean addWorker(Runnable firstTask, boolean core) {//bool core 表示核心池還是maxium池 ? ? ? ?retry: ? ? ? ?for (;;) { ? ? ? ? ? ? ? ? ? ? ?int c = ctl.get(); ? ? ? ? ? ?int rs = runStateOf(c); ? ? ? ? ? ? ?// Check if queue empty only if necessary. ? ? ? ? ? ?if (rs >= SHUTDOWN && ? ? ? ? ? ? ? ?! (rs == SHUTDOWN && ? ? ? ? ? ? ? ? ? firstTask == null && ? ? ? ? ? ? ? ? ? ! workQueue.isEmpty())) ? ? ? ? ? ? ? ?return false; ? ? ? ? ? ? ?for (;;) { ? ? ? ? ? ? ? ?int wc = workerCountOf(c); ? ? ? ? ? ? ? ?if (wc >= CAPACITY || ? ? ? ? ? ? ? ? ? ?wc >= (core ? corePoolSize : maximumPoolSize)) ? ? ? ? ? ? ? ? ? ?return false; ? ? ? ? ? ? ? ?if (compareAndIncrementWorkerCount(c)) ? ? ? ? ? ? ? ? ? ?break retry; ? ? ? ? ? ? ? ?c = ctl.get(); ?// Re-read ctl ? ? ? ? ? ? ? ?if (runStateOf(c) != rs) ? ? ? ? ? ? ? ? ? ?continue retry; ? ? ? ? ? ? ? ?// else CAS failed due to workerCount change; retry inner loop ? ? ? ? ? } ? ? ? }
5.2 開始添加Worker
這里沒啥好說的,創(chuàng)建個Worker并放到Woker的池子中,
這里用到互斥鎖,不過鎖的粒度很小,只會是在加入池子的時候上鎖
入池子前還是會檢查池子的狀態(tài)
?boolean workerStarted = false; ? ? ? ?boolean workerAdded = false; ? ? ? ?Worker w = null; ? ? ? ?try { ? ? ? ? ? ?w = new Worker(firstTask); ? ? ? ? ? ?final Thread t = w.thread; ? ? ? ? ? ?if (t != null) { ? ? ? ? ? ? ? ?final ReentrantLock mainLock = this.mainLock; ? ? ? ? ? ? ? ?mainLock.lock(); ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?// Recheck while holding lock. ? ? ? ? ? ? ? ? ? ?// Back out on ThreadFactory failure or if ? ? ? ? ? ? ? ? ? ?// shut down before lock acquired. ? ? ? ? ? ? ? ? ? ?int rs = runStateOf(ctl.get()); ? ? ? ? ? ? ? ? ? ? ?if (rs< SHUTDOWN || ? ? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) { ? ? ? ? ? ? ? ? ? ? ? ?if (t.isAlive()) // precheck that t is startable ? ? ? ? ? ? ? ? ? ? ? ? ? ?throw new IllegalThreadStateException(); ? ? ? ? ? ? ? ? ? ? ? ?workers.add(w); ? ? ? ? ? ? ? ? ? ? ? ?int s = workers.size(); ? ? ? ? ? ? ? ? ? ? ? ?if (s >largestPoolSize) ? ? ? ? ? ? ? ? ? ? ? ? ? ?largestPoolSize = s; ? ? ? ? ? ? ? ? ? ? ? ?workerAdded = true; ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? } finally { ? ? ? ? ? ? ? ? ? ?mainLock.unlock(); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?if (workerAdded) { ? ? ? ? ? ? ? ? ? ?t.start(); ? ? ? ? ? ? ? ? ? ?workerStarted = true; ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? } finally { ? ? ? ? ? ?if (! workerStarted) ? ? ? ? ? ? ? ?addWorkerFailed(w); ? ? ? }6. Woker 介紹
Worker 對應(yīng)線程池里的一個工作線程
持續(xù)從隊列里獲取任務(wù)執(zhí)行
boolean timed = allowCoreThreadTimeOut || wc >corePoolSize; ? ? ? ? ? ? ?if ((wc >maximumPoolSize || (timed && timedOut)) ? ? ? ? ? ? ? ?&& (wc >1 || workQueue.isEmpty())) { ? ? ? ? ? ? ? ?if (compareAndDecrementWorkerCount(c)) ? ? ? ? ? ? ? ? ? ?return null; ? ? ? ? ? ? ? ?continue; ? ? ? ? ? } ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?Runnable r = timed ? ? ? ? ? ? ? ? ? ? ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : ? ? ? ? ? ? ? ? ? ?workQueue.take(); ? ? ? ? ? ? ? ?if (r != null) ? ? ? ? ? ? ? ? ? ?return r; ? ? ? ? ? ? ? ?timedOut = true; ? ? ? ? ? } catch (InterruptedException retry) { ? ? ? ? ? ? ? ?timedOut = false; ? ? ? ? ? }7.Worker生命周期
創(chuàng)建: 在上文有介紹
會根據(jù)線程池的的狀態(tài)自行結(jié)束線程
if ((runStateAtLeast(ctl.get(), STOP) || ? ? ? ? ? ? ? ? ? ? (Thread.interrupted() && ? ? ? ? ? ? ? ? ? ? ?runStateAtLeast(ctl.get(), STOP))) && ? ? ? ? ? ? ? ? ? ?!wt.isInterrupted()) ? ? ? ? ? ? ? ? ? ?wt.interrupt(); ? ? ? ? ? ? ? ? ? ?
如果是異常退出,會退出后會重新起一個新的woker(注意這個一般代碼的邏輯異常不會導致Worker異常,會被封裝到submit方法返回的future中,jdk留了空方法可以監(jiān)聽到這種異常)
?int c = ctl.get(); ? ? ? ?if (runStateLessThan(c, STOP)) { ? ? ? ? ? ?if (!completedAbruptly) { ? ? ? ? ? ? ? ?int min = allowCoreThreadTimeOut ? 0 : corePoolSize; ? ? ? ? ? ? ? ?if (min == 0 && ! workQueue.isEmpty()) ? ? ? ? ? ? ? ? ? ?min = 1; ? ? ? ? ? ? ? ?if (workerCountOf(c) >= min) ? ? ? ? ? ? ? ? ? ?return; // replacement not needed ? ? ? ? ? } ?// ? ? ? completedAbruptly = true 會被任務(wù)是異常退出 ? ? ? ? ? ?addWorker(null, false); ? ? ? }
maxium池子的空閑Worker會被釋放
// Are workers subject to culling? ? ? ? ? ? ?boolean timed = allowCoreThreadTimeOut || wc >corePoolSize; ? ? ? ? ? ? ?if ((wc >maximumPoolSize || (timed && timedOut)) ? ? ? ? ? ? ? ?&& (wc >1 || workQueue.isEmpty())) { //判斷超時 ? ? ? ? ? ? ? ?if (compareAndDecrementWorkerCount(c)) ? ? ? ? ? ? ? ? ? ?return null; ? ? ? ? ? ? ? ?continue; ? ? ? ? ? } ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?Runnable r = timed ? ? ? ? ? ? ? ? ? ? ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : ? ? ? ? ? ? ? ? ? ?workQueue.take(); ? ? ? ? ? ? ? ?if (r != null) ? ? ? ? ? ? ? ? ? ?return r; ? ? ? ? ? ? ? ?timedOut = true; ? ? ? ? ? } catch (InterruptedException retry) { ? ? ? ? ? ? ? ?timedOut = false; ? ? ? ? ? }
被主動釋放(看shutdownNow()方法)
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
Q1: 在創(chuàng)建線程池時,在ThreadFactory接口中指定setUncaughtExceptionHandler
之后,如果線程池中的任務(wù)拋異常,是否會被UncaughtExceptionHandler捕獲到
不會,在線程池的實現(xiàn)中,Worker的會一直持有申請到的線程,不會處理提交任務(wù)的異常,異常可以在submit 方法返回的future上獲取到。
Q2:showdownNow()執(zhí)行之后線程就一定都執(zhí)行完嘛
只是發(fā)interrupt 信號,看上文介紹只有TYDING ,TERMINATED狀態(tài),所有任務(wù)才會執(zhí)行完。一定要保持代碼的健壯性,控制業(yè)務(wù)代碼的生命周期。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧
當前題目:JDK學習筆記-線程池的實現(xiàn)-創(chuàng)新互聯(lián)
網(wǎng)頁網(wǎng)址:http://jinyejixie.com/article34/jeese.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開發(fā)、建站公司、用戶體驗、網(wǎng)站導航、網(wǎng)站營銷、微信小程序
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容