成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

JDK學習筆記-線程池的實現(xiàn)-創(chuàng)新互聯(lián)

前言

好長時間木有寫java,怕忘光了😄,今天抽空翻翻源碼做些總結(jié)??偟膩碚f實現(xiàn)邏輯還是比較簡單清晰的。

創(chuàng)新互聯(lián)公司專注于德興企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,成都商城網(wǎng)站開發(fā)。德興網(wǎng)站建設(shè)公司,為德興等地區(qū)提供建站服務(wù)。全流程按需求定制開發(fā),專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務(wù)實現(xiàn) 1. 架構(gòu)圖

ThreadPoolExecutor 中有維護了隊列,和Worker(對應(yīng)一個線程)的池子,提交的任務(wù)會交給Worker執(zhí)行。

2. 線程池屬性

corePoolSize: 核心線程(即開啟了就會常駐的線程)的數(shù)量

workQueue: 提交任務(wù)的隊列(當核心池用完就會先放在隊列里面)

maximumPoolSize: 線程池大的size,如果隊列里放不下,就會接著起線程運行任務(wù)

RejectedExecutionHandler: 超過線程池大的size,隊列也放不下就開始執(zhí)行Rejected邏輯

keepAliveTime: 隊列消費完數(shù)據(jù)后,線程大的keepAlive時間 ,默認allowCoreThreadTimeOut 是false,只會清理maxpool的線程。

控制

ctl: 記錄線程池生命周期的狀態(tài)位和運行的線程個數(shù) ,高三位記錄狀態(tài),低位記錄Worker個數(shù)。通過樂觀鎖的方式控制線程池(查詢線程個數(shù)的方法workerCountOf和workQueue中有些混淆,前者是運行的線程數(shù),后者是提交上來存放任務(wù))

3.線程池狀態(tài)
private static final int RUNNING ? ?= -1<< COUNT_BITS; ?//運行中
?
private static final int SHUTDOWN ? = ?0<< COUNT_BITS; ?//對應(yīng)shutdown()方法,隊列和Worker現(xiàn)有的方法會繼續(xù)運行,清理掉空閑的Worker(是否空閑看能否持有到Woker的鎖),甚至還能添加Worker
?
private static final int STOP ? ? ? = ?1<< COUNT_BITS; ?//對應(yīng)shutdownNow()方法,所有的Worker都會發(fā)interrupt信號(不會保證線程被釋放掉),隊列也會清空
?
private static final int TIDYING ? ?= ?2<< COUNT_BITS; ?//當所有的任務(wù)到停掉,ctl記錄的任務(wù)數(shù)為0
?
private static final int TERMINATED = ?3<< COUNT_BITS; ?//線程池正式停掉。TiDYING狀態(tài)后執(zhí)行terminated()(空方法)變成TERMINATED狀態(tài)

4.提交流程:
 ?int c = ctl.get();
 ? ? ? ?if (workerCountOf(c)< corePoolSize) {
 ? ? ? ? ? //當前Worker數(shù)小于corePoolSize時候,會創(chuàng)建個新的Worker
 ? ? ? ? ? ?if (addWorker(command, true))
 ? ? ? ? ? ? ? ?return;
 ? ? ? ? ? ?c = ctl.get();
 ? ? ?  }
 ? ? ? ?if (isRunning(c) && workQueue.offer(command)) {//corePool的線程已經(jīng)起完了,就會將其提交到隊列中,由Worker消費
 ? ? ? ? ? ?int recheck = ctl.get();
 ? ? ? ? ? ?if (! isRunning(recheck) && remove(command)) 
 ? ? ? ? ? ? ? ?reject(command); //重新檢查狀態(tài),如果池子如果不是運行中,且task未來的及消費,發(fā)起拒絕策略
 ? ? ? ? ? ?else if (workerCountOf(recheck) == 0) // 如果從隊列中刪除未成功,而且Worker數(shù)等于0
 ? ? ? ? ? ? ? ?addWorker(null, false); //加個Worker把這個任務(wù)消化掉
 ? ? ?  }
 ? ? ? ?else if (!addWorker(command, false)) //如果隊列也滿了,會向maximumPool增加新的Worker,去運行線程
 ? ? ? ? ? ?reject(command); //maximumPool也裝不下觸發(fā)拒絕策略

5.創(chuàng)建Worker的邏輯

5.1 Check是否可以添加Worker

根據(jù)上面提交流程我們知道:

判斷是否需要添加Worker由ctl中保存的Worker數(shù)決定是否addWorkder,但是多個線程之間會有競爭,進入addWorker方法時,是會存在池子已經(jīng)滿了的情況。

這個時候會有個樂觀鎖的方式,重試檢查線程池狀態(tài)和Worker數(shù)量,保證其他提交線程的性能不會受影響:

  • 如果池子真的滿了只能通知調(diào)用者,addWorker失敗,進行其他策略

  • 如果池子未滿,就可以創(chuàng)建一個新的Worker

private boolean addWorker(Runnable firstTask, boolean core) {//bool core 表示核心池還是maxium池
 ? ? ? ?retry:
 ? ? ? ?for (;;) {
 ? ? ? ? ?
 ? ? ? ? ? ?int c = ctl.get();
 ? ? ? ? ? ?int rs = runStateOf(c);
?
 ? ? ? ? ? ?// Check if queue empty only if necessary.
 ? ? ? ? ? ?if (rs >= SHUTDOWN &&
 ? ? ? ? ? ? ? ?! (rs == SHUTDOWN &&
 ? ? ? ? ? ? ? ? ? firstTask == null &&
 ? ? ? ? ? ? ? ? ? ! workQueue.isEmpty()))
 ? ? ? ? ? ? ? ?return false;
?
 ? ? ? ? ? ?for (;;) {
 ? ? ? ? ? ? ? ?int wc = workerCountOf(c);
 ? ? ? ? ? ? ? ?if (wc >= CAPACITY ||
 ? ? ? ? ? ? ? ? ? ?wc >= (core ? corePoolSize : maximumPoolSize))
 ? ? ? ? ? ? ? ? ? ?return false;
 ? ? ? ? ? ? ? ?if (compareAndIncrementWorkerCount(c))
 ? ? ? ? ? ? ? ? ? ?break retry;
 ? ? ? ? ? ? ? ?c = ctl.get(); ?// Re-read ctl
 ? ? ? ? ? ? ? ?if (runStateOf(c) != rs)
 ? ? ? ? ? ? ? ? ? ?continue retry;
 ? ? ? ? ? ? ? ?// else CAS failed due to workerCount change; retry inner loop
 ? ? ? ? ?  }
 ? ? ?  }

5.2 開始添加Worker

這里沒啥好說的,創(chuàng)建個Worker并放到Woker的池子中,

  • 這里用到互斥鎖,不過鎖的粒度很小,只會是在加入池子的時候上鎖

  • 入池子前還是會檢查池子的狀態(tài)

 ?boolean workerStarted = false;
 ? ? ? ?boolean workerAdded = false;
 ? ? ? ?Worker w = null;
 ? ? ? ?try {
 ? ? ? ? ? ?w = new Worker(firstTask);
 ? ? ? ? ? ?final Thread t = w.thread;
 ? ? ? ? ? ?if (t != null) {
 ? ? ? ? ? ? ? ?final ReentrantLock mainLock = this.mainLock;
 ? ? ? ? ? ? ? ?mainLock.lock();
 ? ? ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ? ? ?// Recheck while holding lock.
 ? ? ? ? ? ? ? ? ? ?// Back out on ThreadFactory failure or if
 ? ? ? ? ? ? ? ? ? ?// shut down before lock acquired.
 ? ? ? ? ? ? ? ? ? ?int rs = runStateOf(ctl.get());
?
 ? ? ? ? ? ? ? ? ? ?if (rs< SHUTDOWN ||
 ? ? ? ? ? ? ? ? ? ? ?  (rs == SHUTDOWN && firstTask == null)) {
 ? ? ? ? ? ? ? ? ? ? ? ?if (t.isAlive()) // precheck that t is startable
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?throw new IllegalThreadStateException();
 ? ? ? ? ? ? ? ? ? ? ? ?workers.add(w);
 ? ? ? ? ? ? ? ? ? ? ? ?int s = workers.size();
 ? ? ? ? ? ? ? ? ? ? ? ?if (s >largestPoolSize)
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?largestPoolSize = s;
 ? ? ? ? ? ? ? ? ? ? ? ?workerAdded = true;
 ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  } finally {
 ? ? ? ? ? ? ? ? ? ?mainLock.unlock();
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ?if (workerAdded) {
 ? ? ? ? ? ? ? ? ? ?t.start();
 ? ? ? ? ? ? ? ? ? ?workerStarted = true;
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  } finally {
 ? ? ? ? ? ?if (! workerStarted)
 ? ? ? ? ? ? ? ?addWorkerFailed(w);
 ? ? ?  }

6. Woker 介紹

Worker 對應(yīng)線程池里的一個工作線程

持續(xù)從隊列里獲取任務(wù)執(zhí)行

 boolean timed = allowCoreThreadTimeOut || wc >corePoolSize;
?
 ? ? ? ? ? ?if ((wc >maximumPoolSize || (timed && timedOut))
 ? ? ? ? ? ? ? ?&& (wc >1 || workQueue.isEmpty())) {
 ? ? ? ? ? ? ? ?if (compareAndDecrementWorkerCount(c))
 ? ? ? ? ? ? ? ? ? ?return null;
 ? ? ? ? ? ? ? ?continue;
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ?Runnable r = timed ?
 ? ? ? ? ? ? ? ? ? ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
 ? ? ? ? ? ? ? ? ? ?workQueue.take();
 ? ? ? ? ? ? ? ?if (r != null)
 ? ? ? ? ? ? ? ? ? ?return r;
 ? ? ? ? ? ? ? ?timedOut = true;
 ? ? ? ? ?  } catch (InterruptedException retry) {
 ? ? ? ? ? ? ? ?timedOut = false;
 ? ? ? ? ?  }

7.Worker生命周期
  • 創(chuàng)建: 在上文有介紹

  • 會根據(jù)線程池的的狀態(tài)自行結(jié)束線程

if ((runStateAtLeast(ctl.get(), STOP) ||
 ? ? ? ? ? ? ? ? ? ? (Thread.interrupted() &&
 ? ? ? ? ? ? ? ? ? ? ?runStateAtLeast(ctl.get(), STOP))) &&
 ? ? ? ? ? ? ? ? ? ?!wt.isInterrupted())
 ? ? ? ? ? ? ? ? ? ?wt.interrupt();
 ? ? ? ? ? ? ? ? ? ?
  • 如果是異常退出,會退出后會重新起一個新的woker(注意這個一般代碼的邏輯異常不會導致Worker異常,會被封裝到submit方法返回的future中,jdk留了空方法可以監(jiān)聽到這種異常)

 ?int c = ctl.get();
 ? ? ? ?if (runStateLessThan(c, STOP)) {
 ? ? ? ? ? ?if (!completedAbruptly) {
 ? ? ? ? ? ? ? ?int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
 ? ? ? ? ? ? ? ?if (min == 0 && ! workQueue.isEmpty())
 ? ? ? ? ? ? ? ? ? ?min = 1;
 ? ? ? ? ? ? ? ?if (workerCountOf(c) >= min)
 ? ? ? ? ? ? ? ? ? ?return; // replacement not needed
 ? ? ? ? ?  }
 ?// ? ? ?  completedAbruptly = true 會被任務(wù)是異常退出
 ? ? ? ? ? ?addWorker(null, false);
 ? ? ?  }
  • maxium池子的空閑Worker會被釋放

// Are workers subject to culling?
 ? ? ? ? ? ?boolean timed = allowCoreThreadTimeOut || wc >corePoolSize;
?
 ? ? ? ? ? ?if ((wc >maximumPoolSize || (timed && timedOut))
 ? ? ? ? ? ? ? ?&& (wc >1 || workQueue.isEmpty())) { //判斷超時
 ? ? ? ? ? ? ? ?if (compareAndDecrementWorkerCount(c))
 ? ? ? ? ? ? ? ? ? ?return null; 
 ? ? ? ? ? ? ? ?continue;
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ?Runnable r = timed ?
 ? ? ? ? ? ? ? ? ? ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
 ? ? ? ? ? ? ? ? ? ?workQueue.take();
 ? ? ? ? ? ? ? ?if (r != null)
 ? ? ? ? ? ? ? ? ? ?return r;
 ? ? ? ? ? ? ? ?timedOut = true; 
 ? ? ? ? ?  } catch (InterruptedException retry) {
 ? ? ? ? ? ? ? ?timedOut = false;
 ? ? ? ? ?  }
  • 被主動釋放(看shutdownNow()方法)

    void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }

使用問題:

Q1: 在創(chuàng)建線程池時,在ThreadFactory接口中指定setUncaughtExceptionHandler之后,如果線程池中的任務(wù)拋異常,是否會被UncaughtExceptionHandler捕獲到

不會,在線程池的實現(xiàn)中,Worker的會一直持有申請到的線程,不會處理提交任務(wù)的異常,異常可以在submit 方法返回的future上獲取到。

Q2:showdownNow()執(zhí)行之后線程就一定都執(zhí)行完嘛

只是發(fā)interrupt 信號,看上文介紹只有TYDING ,TERMINATED狀態(tài),所有任務(wù)才會執(zhí)行完。一定要保持代碼的健壯性,控制業(yè)務(wù)代碼的生命周期。

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

當前題目:JDK學習筆記-線程池的實現(xiàn)-創(chuàng)新互聯(lián)
網(wǎng)頁網(wǎng)址:http://jinyejixie.com/article34/jeese.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開發(fā)、建站公司用戶體驗、網(wǎng)站導航、網(wǎng)站營銷、微信小程序

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)
土默特左旗| 永修县| 柳林县| 平舆县| 郎溪县| 巨野县| 鲁甸县| 江达县| 凤翔县| 衢州市| 红桥区| 霍城县| 平罗县| 达日县| 连平县| 昆山市| 平陆县| 丰镇市| 大渡口区| 雅安市| 山西省| 察隅县| 吉安县| 五原县| 九江县| 汉源县| 南投县| 怀远县| 济源市| 青川县| 堆龙德庆县| 佛冈县| 北碚区| 军事| 苏尼特右旗| 新津县| 兴义市| 瓮安县| 安岳县| 光泽县| 水城县|