線程池就是提前創(chuàng)建若干個線程,如果有任務(wù)需要處理,線程池里的線程就會處理任務(wù),處理完之后線程并不會被銷毀,而是等待下一個任務(wù)。由于創(chuàng)建和銷毀線程都是消耗系統(tǒng)資源的,所以當(dāng)你想要頻繁的創(chuàng)建和銷毀線程的時候就可以考慮使用線程池來提升系統(tǒng)的性能。
java中經(jīng)常需要用到多線程來處理一些業(yè)務(wù),我們非常不建議單純使用繼承Thread或者實現(xiàn)Runnable接口的方式來創(chuàng)建線程,那樣勢必有創(chuàng)建及銷毀線程耗費資源、線程上下文切換問題。同時創(chuàng)建過多的線程也可能引發(fā)資源耗盡的風(fēng)險,這個時候引入線程池比較合理,方便線程任務(wù)的管理。java中涉及到線程池的相關(guān)類均在jdk1.5開始的java.util.concurrent包中,涉及到的幾個核心類及接口包括:Executor、Executors、ExecutorService、ThreadPoolExecutor、FutureTask、Callable、Runnable等。
線程池提供了一種限制和管理資源(包括執(zhí)行一個任務(wù))。 每個線程池還維護(hù)一些基本統(tǒng)計信息,例如已完成任務(wù)的數(shù)量。
線程池的好處如下:
1.降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
2.可有效的控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率,同時避免過多資源競爭,避免堵塞。
3.提供定時執(zhí)行、定期執(zhí)行、單線程、并發(fā)數(shù)控制等功能。
1)線程提交到線程池
2)判斷核心線程池是否已經(jīng)達(dá)到設(shè)定的數(shù)量,如果沒有達(dá)到,則直接創(chuàng)建線程執(zhí)行任務(wù)
3)如果達(dá)到了,則放在隊列中,等待執(zhí)行
4)如果隊列已經(jīng)滿了,則判斷線程的數(shù)量是否已經(jīng)達(dá)到設(shè)定的最大值,如果達(dá)到了,則直接執(zhí)行拒絕策略
5)如果沒有達(dá)到,則創(chuàng)建線程執(zhí)行任務(wù)。
RUNNING :能接受新提交的任務(wù),并且也能處理阻塞隊列中的任務(wù);
SHUTDOWN:關(guān)閉狀態(tài),不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊列中已保存的任務(wù)。在線程池處于 RUNNING 狀態(tài)時,調(diào)用 shutdown()方法會使線程池進(jìn)入到該狀態(tài)。(finalize() 方法在執(zhí)行過程中也會調(diào)用shutdown()方法進(jìn)入該狀態(tài));
STOP:不能接受新任務(wù),也不處理隊列中的任務(wù),會中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時,調(diào)用 shutdownNow() 方法會使線程池進(jìn)入到該狀態(tài);
TIDYING:如果所有的任務(wù)都已終止了,workerCount (有效線程數(shù)) 為0,線程池進(jìn)入該狀態(tài)后會調(diào)用 terminated() 方法進(jìn)入TERMINATED 狀態(tài)。
TERMINATED:在terminated() 方法執(zhí)行完后進(jìn)入該狀態(tài),默認(rèn)terminated()方法中什么也沒有做。
Executor就是一個線程池框架,Executor 位于java.util.concurrent.Executors ,提供了用于創(chuàng)建工作線程的線程池的工廠方法。它包含一組用于有效管理工作線程的組件。Executor API 通過 Executors 將任務(wù)的執(zhí)行與要執(zhí)行的實際任務(wù)解耦。
Executor 接口對象能執(zhí)行我們的線程任務(wù);
Executors 工具類的不同方法按照我們的需求創(chuàng)建了不同的線程池,來滿足業(yè)務(wù)的需求。
ExecutorService 接口繼承了Executor接口并進(jìn)行了擴(kuò)展,提供了更多的方法,我們能夠獲得任務(wù)執(zhí)行的狀態(tài)并且可以獲取任務(wù)的返回值。
1.newSingleThreadExecutor
創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
2.newFixedThreadPoo
創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
3.newCachedThreadPool
創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
4.newScheduledThreadPool
創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。
1)newFixedThreadPool和newSingleThreadExecutor: 主要問題是堆積的請求處理隊列可能會耗費非常大的內(nèi)存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool: 主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會創(chuàng)建數(shù)量非常多的線程,甚至OOM。
3)線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓程序員更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風(fēng)險。
ThreadPoolExecutor是線程池的核心實現(xiàn)類,在JDK1.5引入,位于java.util.concurrent包。
通過下面的demo來了解ThreadPoolExecutor創(chuàng)建線程的過程。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 測試ThreadPoolExecutor對線程的執(zhí)行順序
**/
public class ThreadPoolSerialTest {
public static void main(String[] args) {
//核心線程數(shù)
int corePoolSize = 3;
//最大線程數(shù)
int maximumPoolSize = 6;
//超過 corePoolSize 線程數(shù)量的線程最大空閑時間
long keepAliveTime = 2;
//以秒為時間單位
TimeUnit unit = TimeUnit.SECONDS;
//創(chuàng)建工作隊列,用于存放提交的等待執(zhí)行任務(wù)
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
ThreadPoolExecutor threadPoolExecutor = null;
try {
//創(chuàng)建線程池
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy());
//循環(huán)提交任務(wù)
for (int i = 0; i < 8; i++) {
//提交任務(wù)的索引
final int index = (i + 1);
threadPoolExecutor.submit(() -> {
//線程打印輸出
System.out.println("大家好,我是線程:" + index);
try {
//模擬線程執(zhí)行時間,10s
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//每個任務(wù)提交后休眠500ms再提交下一個任務(wù),用于保證提交順序
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
當(dāng)一個新任務(wù)被提交時:
1. 當(dāng)前活躍線程數(shù)<corePoolSize,則創(chuàng)建一個新線程執(zhí)行新任務(wù);
2. 當(dāng)前活躍線程數(shù)>corePoolSize,且隊列(workQueue)未滿時,則將新任務(wù)放入隊列中;
3. 當(dāng)前活躍線程數(shù)>corePoolSize,且隊列(workQueue)已滿,且當(dāng)前活躍線程數(shù)<maximumPoolSize,則繼續(xù)創(chuàng)建一個新線程執(zhí)行新任務(wù);
4. 當(dāng)前活躍線程數(shù)>corePoolSize,且隊列(workQueue)已滿,且當(dāng)前活躍線程數(shù)=maximumPoolSize,則執(zhí)行拒絕策略(handler);
當(dāng)任務(wù)執(zhí)行完成后:
1. 超出corePoolSize的空閑線程,在等待新任務(wù)時,如果超出了keepAliveTime,則線程會被銷毀;
2. 如果allowCoreThreadTimeOut被設(shè)置為true,那么corePoolSize以內(nèi)的空閑線程,如果超出了keepAliveTime,則同樣會被銷毀。
corePoolSize就是線程池中的核心線程數(shù)量,這幾個核心線程在沒有用的時候,也不會被回收
maximumPoolSize就是線程池中可以容納的最大線程的數(shù)量
keepAliveTime就是線程池中除了核心線程之外的其他的最長可以保留的時間,因為在線程池中,除了核心線程即使在無任務(wù)的情況下也不能被清 除,其余的都是有存活時間的,意思就是非核心線程可以保留的最長的空閑時間
util就是計算這個時間的一個單位。
workQueue就是等待隊列,任務(wù)可以儲存在任務(wù)隊列中等待被執(zhí)行,執(zhí)行的是FIFIO原則(先進(jìn)先出)。
threadFactory就是創(chuàng)建線程的線程工廠。
handler是一種拒絕策略,我們可以在任務(wù)滿了之后,拒絕執(zhí)行某些任務(wù)。
當(dāng)線程充滿了ThreadPool的有界隊列時,飽和策略開始起作用。飽和策略可以理解為隊列飽和后,處理后續(xù)無法入隊的任務(wù)的策略。ThreadPoolExecutor可以通過調(diào)用setRejectedExecutionHandler來修改飽和策略。
當(dāng)請求任務(wù)不斷的過來,而系統(tǒng)此時又處理不過來的時候,我們需要采取的策略是拒絕服務(wù)。RejectedExecutionHandler接口提供了拒絕任務(wù)處理的自定義方法的機(jī)會。在ThreadPoolExecutor中已經(jīng)包含四種處理策略。
AbortPolicy策略:該策略會直接拋出異常,阻止系統(tǒng)正常工作。
CallerRunsPolicy 策略:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運行當(dāng)前的被丟棄的任務(wù)。
DiscardOleddestPolicy策略:該策略將丟棄最老的一個請求,也就是即將被執(zhí)行的任務(wù),并嘗試再次提交當(dāng)前任務(wù)。
DiscardPolicy策略:該策略默默的丟棄無法處理的任務(wù),不予任何處理。
除了JDK默認(rèn)提供的四種拒絕策略,我們可以根據(jù)自己的業(yè)務(wù)需求去自定義拒絕策略,自定義的方式很簡單,直接實現(xiàn)RejectedExecutionHandler接口即可。
1.execute()方法用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功與否;
2.submit()方法用于提交需要返回值的任務(wù)。線程池會返回一個 Future 類型的對象,通過這個 Future 對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過 Future 的 get()方法來獲取返回值,get()方法會阻塞當(dāng)前線程直到任務(wù)完成,而使用 get(long timeout,TimeUnit unit)方法則會阻塞當(dāng)前線程一段時間后立即返回,這時候有可能任務(wù)沒有執(zhí)行完。
線程組ThreadGroup對象中的stop,resume,suspend會導(dǎo)致安全問題,主要是死鎖問題,已經(jīng)被官方廢棄,多以價值已經(jīng)大不如以前。
線程組ThreadGroup不是線程安全的,在使用過程中不能及時獲取安全的信息。
shutdownNow():立即關(guān)閉線程池(暴力),正在執(zhí)行中的及隊列中的任務(wù)會被中斷,同時該方法會返回被中斷的隊列中的任務(wù)列表;
shutdown():平滑關(guān)閉線程池,正在執(zhí)行中的及隊列中的任務(wù)能執(zhí)行完成,后續(xù)進(jìn)來的任務(wù)會被執(zhí)行拒絕策略;
isTerminated():當(dāng)正在執(zhí)行的任務(wù)及對列中的任務(wù)全部都執(zhí)行(清空)完就會返回true;
線程池將線程和任務(wù)進(jìn)行解耦,線程是線程,任務(wù)是任務(wù),擺脫了之前通過 Thread 創(chuàng)建線程時的一個線程必須對應(yīng)一個任務(wù)的限制。在線程池中,同一個線程可以從阻塞隊列中不斷獲取新任務(wù)來執(zhí)行,其核心原理在于線程池對 Thread 進(jìn)行了封裝,并不是每次執(zhí)行任務(wù)都會調(diào)用 Thread.start() 來創(chuàng)建新線程,而是讓每個線程去執(zhí)行一個“循環(huán)任務(wù)”,在這個“循環(huán)任務(wù)”中不停的檢查是否有任務(wù)需要被執(zhí)行,如果有則直接執(zhí)行,也就是調(diào)用任務(wù)中的 run 方法,將 run 方法當(dāng)成一個普通的方法執(zhí)行,通過這種方式將只使用固定的線程就將所有任務(wù)的 run 方法串聯(lián)起來。
1.ArrayBlockingQueue是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,此隊列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。
2.LinkedBlockingQueue一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列
3.SynchronousQueue 一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool()使用了這個隊列。
4.PriorityBlockingQueue 一個具有優(yōu)先級的無限阻塞隊列。
首先是利用好SpringBoot的自動裝配功能,配置好線程池的一些基本參數(shù)。
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
/*
* 線程池名前綴
*/
private static final String threadNamePrefix = "Api-Async-";
/**
* bean的名稱, 默認(rèn)為首字母小寫的方法名
* @return
*/
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/**
* 默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),
* 當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會把到達(dá)的任務(wù)放到緩存隊列當(dāng)中;
* 當(dāng)隊列滿了,就繼續(xù)創(chuàng)建線程,當(dāng)線程數(shù)量大于等于maxPoolSize后,開始使用拒絕策略拒絕
*/
/*
* 核心線程數(shù)(默認(rèn)線程數(shù))
*/
executor.setCorePoolSize(corePoolSize);
//最大線程數(shù)
executor.setMaxPoolSize(maxPoolSize);
//緩沖隊列數(shù)
executor.setQueueCapacity(queueCapacity);
//允許線程空閑時間(單位是秒)
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
//用來設(shè)置線程池關(guān)閉時候等待所有任務(wù)都完成再繼續(xù)銷毀其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//線程池對拒絕任務(wù)的處理策略,CallerRunsPolicy:由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化
executor.initialize();
return executor;
}
}
配置好線程池的基本參數(shù)時候,我們就可以使用線程池了, 只要在一個限定域為public的方法頭部加上@Async注解即可。
@Async
public void createOrder() {
System.out.println("執(zhí)行任務(wù)");
}
1)分析任務(wù)的特性
任務(wù)的性質(zhì):CPU 密集型任務(wù)、IO 密集型任務(wù)和混合型任務(wù)。
任務(wù)的優(yōu)先級:高、中、低。
任務(wù)的執(zhí)行時間:長、中、短。
任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
2)具體策略
[1]CPU 密集型任務(wù)配置盡可能小的線程,如配置N(CPU核心數(shù))+1個線程的線程池。
[2]IO 密集型任務(wù)則由于線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*N(CPU核心數(shù))。
[3]混合型任務(wù)如果可以拆分,則將其拆分成一個 CPU 密集型任務(wù)和一個 IO 密集型任務(wù)。只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率;如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進(jìn)行分解。
[4]優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列 PriorityBlockingQueue 來處理,它可以讓優(yōu)先級高的任務(wù)先得到執(zhí)行。但是,如果一直有高優(yōu)先級的任務(wù)加入到阻塞隊列中,那么低優(yōu)先級的任務(wù)可能永遠(yuǎn)不能執(zhí)行。
[5]執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行。
[6]依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交 SQL 后需要等待數(shù)據(jù)庫返回結(jié)果,線程數(shù)應(yīng)該設(shè)置得較大,這樣才能更好的利用 CPU。
[7]建議使用有界隊列,有界隊列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力。可以根據(jù)需要設(shè)大一點,比如幾千。使用無界隊列,線程池的隊列就會越來越大,有可能會撐滿內(nèi)存,導(dǎo)致整個系統(tǒng)不可用。