前言
本文主要是結合源碼去線程池執行任務的過程,基於JDK 11,整個過程基本與JDK 8相同。
個人水平有限,文中若有表達有誤的,歡迎大夥留言指出,謝謝了!
一、線程池簡介
1.1 使用線程池的優點
1)通過復用已創建的線程,降低資源的消耗(線程的創建/銷毀是要消耗資源的)、提高響應速度;
2)管理線程的個數,線程的個數在初始化線程池的時候指定;
3)統一管理線程,比如停止,stop()方法;
1.2 線程池執行任務過程
線程池執行任務的過程如下圖所示,主要分為以下4步,其中參數的含義會在後面詳細講解:
1)判斷工作的線程是否小於核心線程數據(workerCountOf(c) < corePoolSize),若小於則會新建一個線程去執行任務,這一步僅僅的是根據線程個數決定;
2)若核心線程池滿了,就會判斷線程池的狀態,若是running狀態,則嘗試加入任務隊列,若加入成功后還會做一些事情,後面詳細說;
3)若任務隊列滿了,則加入失敗,此時會判斷整個線程池線程是否滿,若沒有則創建非核心線程執行任務;
4)若線程池滿了,則根據拒絕測試處理無法執行的任務;
整體過程如下圖:
二、ThreadPoolExecutor類解析
2.1 ThreadPoolExecutor的構造函數
ThreadPoolExecutor類一共提供了4個構造函數,涉及5~7個參數,下面就5個必備參數的構造函數進行說明:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
1)corePoolSize :初始化核心線程池中線程個數的大小;
2)maxmumPoolSize:線程池中線程大小;
3)keepAliveTime:非核心線程的超時時長;
非核心線程空閑時常大於該值就會被終止。
4)unit :keepAliveTime的單位,類型可以參見TimeUnit類;
5)BlockingQueue workQueue:阻塞隊列,維護等待執行的任務;
2.2 私有類Worker
在ThreadPoolExecutor類中有兩個集合類型比較重要,一個是用於放置等待任務的workQueue,其類型是阻塞對列;一個是用於用於存放工作線程的works,其是Set類型,其中存放的類型是Worker。
進一步簡化線程池執行過程,可以理解為works中的工作線程不停的去阻塞對列中取任務,執行結束,線程重新加入大works中。
為此,有必要簡單了解一下Work類型的組成。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ //工作線程,由線程的工廠類初始化 final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; //不可重入的鎖 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } ....... }
Worker類繼承於隊列同步器(AbstractQueueSynchronizer),隊列同步器是採取鎖或其他同步組件的基礎框架,其主要結構是自旋獲取鎖的同步隊列和等待喚醒的等待隊列,其方法因此可以分為兩類:對state改變的方法 和 入、出隊列的方法,即獲取獲取鎖的資格的變化(可能描述的不準確)。關於隊列同步器後續博客會詳細分析,此處不展開討論。
Work類中通過CAS設置狀態失敗后直接返回false,而不是判斷當前線程是否已獲取鎖來實現不可重入的鎖,源碼註釋中解釋這樣做的原因是因為避免work tash重新獲取到控制線程池全局的方法,如setCorePoolSize。
2.3 拒絕策略類
ThreadPoolExecutor的拒絕策略類是以私有類的方式實現的,有四種策略:
1)AbortPolicy:丟棄任務並拋出RejectedExecutionException異常(默認拒絕處理策略)。
2)DiscardPolicy:拋棄新來的任務,但是不拋出異常。
3)DiscardOldestPolicy:拋棄等待隊列頭部(最舊的)的任務,然後重新嘗試執行程序(失敗則會重複此過程)。
4)CallerRunsPolicy:由調用線程處理該任務。
其代碼相對簡單,可以參考源碼。
三、任務執行過程分析
3.1 execute(Runnable)方法
execute(Runnable)方法的整體過程如上文1.2所述,其實現方式如下:
public void execute(Runnable command) { //執行的任務為空,直接拋出異常 if (command == null) throw new NullPointerException(); //ctl是ThreadPoolExecutor中很關鍵的一個AtomicInteger,主線程池的控制狀態 int c = ctl.get(); //1、判斷是否小於核心線程池的大小,若是則直接嘗試新建一個work線程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2、大於核心線程池的大小或新建work失敗(如創建thread失敗),會先判斷線程池是否是running狀態,若是則加入阻塞對列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //重新驗證線程池是否為running,若否,則嘗試從對列中刪除,成功后執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); //若線程池的狀態為shutdown則,嘗試去執行完阻塞對列中的任務 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3、新建非核心線程去執行任務,若失敗,則採取拒絕策略 else if (!addWorker(command, false)) reject(command); }
3.2 addWorker(Runnable,boole)方法
execute(Runnable)方法中,新建(非)核心線程執行任務主要是通過addWorker方法實現的,其執行過程如下:
private boolean addWorker(Runnable firstTask, boolean core) { //此處反覆檢查線程池的狀態以及工作線程是否超過給定的值 retry: for (int c = ctl.get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { //核心和非核心線程的區別 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //通過工廠方法初始化,可能失敗,即可能為null final Thread t = w.thread; if (t != null) { //獲取全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); //線程池處於running狀態 //或shutdown狀態但無需要執行的task,個人理解為用於去阻塞隊列中取任務執行 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //執行任務,這裡會執行thread的firstTask獲取阻塞對列中取任務 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //開始失敗,則會從workers中刪除新建的work,work數量減1,嘗試關閉線程池,這些過程會獲取全局鎖 addWorkerFailed(w); } return workerStarted; }
3.3 runWorker(this) 方法
在3.2 中當新建的worker線程加入在workers中成功后,就會啟動對應任務,其調用的是Worker類中的run()方法,即調用runWorker(this)方法,其過程如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //while()循環中,前者是新建線程執行firstTask,對應線程個數小於核心線程和阻塞隊列滿的情況, //getTask()則是從阻塞對列中取任務執行 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //僅線程池狀態為stop時,線程響應中斷,這裏也就解釋了調用shutdown時,正在工作的線程會繼續工作 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { //執行任務 task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; //完成的個數+1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //處理後續工作 processWorkerExit(w, completedAbruptly); } }
3.4 processWorkerExit(Worker,boole)方法
當任務執行結果后,在滿足一定條件下會新增一個worker線程,代碼如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //對工作線程的增減需要加全局鎖 workers.remove(w); } finally { mainLock.unlock(); } //嘗試終止線程池 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { //線程不是中斷,會維持最小的個數 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //執行完任務后,線程重新加入workers中 addWorker(null, false); } }
至此,線程池執行任務的過程分析結束,其他方法的實現過程可以參考源碼。
Ref:
[1]http://concurrent.redspider.group/article/03/12.html
[2]《Java併發編程的藝術》
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能
※台北網頁設計公司這麼多該如何選擇?
※智慧手機時代的來臨,RWD網頁設計為架站首選
※評比南投搬家公司費用收費行情懶人包大公開
※回頭車貨運收費標準