并發(fā)編程的原則
10年積累的成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問(wèn)題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站設(shè)計(jì)后付款的網(wǎng)站建設(shè)流程,更有宜黃免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
原子性是指在一個(gè)操作中就是cpu不可以在中途暫停然后再調(diào)度,既不被中斷操作,即一個(gè)操作或者多個(gè)操作 要么全部執(zhí)行并且執(zhí)行的過(guò)程不會(huì)被任何因素打斷,要么就都不執(zhí)行。
對(duì)于可見性,Java提供了volatile關(guān)鍵字來(lái)保證可見性。當(dāng)一個(gè)共享變量被volatile修飾時(shí),它會(huì)保證修改的值會(huì)立即被更新到主存,當(dāng)有其他線程需要讀取時(shí),它會(huì)去內(nèi)存中讀取新值。而普通的共享變量不能保證可見性,因?yàn)槠胀ü蚕碜兞勘恍薷闹?,什么時(shí)候被寫入主存是不確定的,當(dāng)其他線程去讀取時(shí),此時(shí)內(nèi)存中可能還是原來(lái)的舊值,因此無(wú)法保證可見性。另外,通過(guò)synchronized和Lock也能夠保證可見性,synchronized和Lock能保證同一時(shí)刻只有一個(gè)線程獲取鎖然后執(zhí)行同步代碼,并且在釋放鎖之前會(huì)將對(duì)變量的修改刷新到主存當(dāng)中。
在Java內(nèi)存模型中,允許編譯器和處理器對(duì)指令進(jìn)行重新排序,但是重新排序過(guò)程不會(huì)影響到單線程程序的執(zhí)行,卻會(huì)影響到多線程并發(fā)執(zhí)行的正確性。
Runnable和Thread
這里只說(shuō)一下實(shí)現(xiàn)Runnable接口和繼承Thread類的區(qū)別:以賣10張票的任務(wù)為例,如果繼承Thread類的話,啟動(dòng)三個(gè)線程就相當(dāng)于開了三個(gè)窗口,每個(gè)窗口都有賣10張票的任務(wù),各賣各的;如果實(shí)現(xiàn)Runnable接口的話,啟動(dòng)三個(gè)線程相當(dāng)開了三個(gè)窗口賣票,這三個(gè)窗口一共賣10張票。
1.?synchronized對(duì)象鎖
synchronized(this)和synchronized方法都是鎖當(dāng)前對(duì)象,synchronized(obj)鎖臨界對(duì)象。使用synchronized的話最好是鎖臨界對(duì)象。如果想要使得任意多個(gè)線程任意多個(gè)用戶訪問(wèn)的時(shí)候都不出任何問(wèn)題,可以考慮一下用鎖當(dāng)前對(duì)象的方法,因?yàn)殒i當(dāng)前對(duì)象量級(jí)較重,所以一般不用。
如下面Sync類中的兩個(gè)方法test_01和test_02()鎖的都是程序創(chuàng)建的Sync對(duì)象,細(xì)粒度控制推薦用test_02()。
public synchronized void test_01() {
System.out.println("鎖當(dāng)前對(duì)象");
}
public void test_02() {
synchronized (this) {
System.out.println("鎖當(dāng)前對(duì)象");
}
}
下面這個(gè)方法鎖的是Sync對(duì)象中的object對(duì)象(即臨界對(duì)象)
public void test_03() {
synchronized (object) {
System.out.println("鎖臨界對(duì)象");
}
}
2.?synchronized使用在靜態(tài)方法中鎖定當(dāng)前類
靜態(tài)同步方法鎖的是當(dāng)前類型的類對(duì)象,如在Sync類中的static test_04()方法上加了同步鎖synchronized,那么此時(shí)synchronized鎖的是Sync.class。
// 下面兩個(gè)方法都是靜態(tài)同步方法
public static synchronized void test_04() {
System.out.println("鎖Sync.class");
}
public static void test_05() {
synchronized (Sync.class) {
System.out.println("鎖Sync.class類");
}
}
3.?synchronized作用于靜態(tài)和非靜態(tài)方法的區(qū)別
synchronized作用與非靜態(tài)方法,相當(dāng)于鎖定單個(gè)對(duì)象,不同對(duì)象之間沒(méi)有競(jìng)爭(zhēng)關(guān)系;而作用于靜態(tài)方法時(shí),鎖加載類上,即鎖定class,這時(shí)相當(dāng)于所有對(duì)象競(jìng)爭(zhēng)同一把鎖。
如下例子,線程1會(huì)在i=5的時(shí)候拋出異常,此時(shí)線程1鎖被釋放,線程2開始調(diào)用方法。
public class Test {
static class Test02 implements Runnable {
private int i = 0;
@Override
public synchronized void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + "_" + i++);
if (i == 5) { // 當(dāng)i==5時(shí)拋出異常,鎖被釋放
i = 1 / 0;
}
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException ignored) { }
}
}
}
public static void main(String[] args) {
Test02 test02 = new Test02();
new Thread(test02, "LQ").start();
new Thread(test02, "WH").start();
}
}
在下面代碼中,object被LQ鎖定,WH阻塞。
public class Test {
static Object object = new Object();
void m() {
System.out.println(Thread.currentThread().getName() + " start...");
synchronized (object){
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
}
}
}
static class Test01 implements Runnable {
@Override
public void run() {
new Test().m();
}
}
static class Test02 implements Runnable {
@Override
public void run() {
new Test().m();
}
}
public static void main(String[] args) {
Test01 test01 = new Test01();
Thread thread = new Thread(test01, "LQ");
thread.start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception ignored) {}
Test02 test02 = new Test02();
thread = new Thread(test02, "WH");
thread.start();
}
}
在WH線程中新創(chuàng)建了一個(gè)Object,WH正常運(yùn)行。
public class Test {
static Object object = new Object();
void m() {
System.out.println(Thread.currentThread().getName() + " start...");
synchronized (object) {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored){}
System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
}
}
}
static class Test01 implements Runnable {
@Override
public void run() {
new Test().m();
}
}
static class Test02 implements Runnable {
@Override
public void run() {
object = new Object();
new Test().m();
}
}
public static void main(String[] args) {
Test01 test01 = new Test01();
Thread thread = new Thread(test01, "LQ");
thread.start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception ignored) {}
Test02 test02 = new Test02();
thread = new Thread(test02, "WH");
thread.start();
}
}
上面代碼中,WH線程啟動(dòng)后會(huì)一只處于等待狀態(tài),因?yàn)閛bject被LQ線程鎖著,但如果在WH線程中重新new Object()并賦值給object,這樣的話WH線程就能夠正常運(yùn)行了,原因是:同步鎖鎖定的是對(duì)內(nèi)存中的對(duì)象,所以LQ鎖定的是第一次new的對(duì)象而WH鎖定的是第二次new的對(duì)象,如下圖。
?
對(duì)于常量:String a = “aaa” 和String b = “aaa”是同一個(gè)對(duì)象,因此,假如A方法鎖定了a,B方法鎖定了b,啟動(dòng)LQ線程調(diào)用A方法,然后啟動(dòng)WH線程調(diào)用B方法,這樣的話WH線程會(huì)等到LQ線程結(jié)束后才執(zhí)行。因此,在定義同步代碼塊時(shí),不要使用常量作為鎖的目標(biāo)對(duì)象。
volatile關(guān)鍵字
計(jì)算機(jī)中有CPU、內(nèi)存和緩存,當(dāng)CPU運(yùn)行的時(shí)候,默認(rèn)找緩存中的數(shù)據(jù)。當(dāng)CPU有中斷的時(shí)候,根據(jù)操作系統(tǒng)對(duì)CPU的管理特性,可能會(huì)清空緩存,重新將內(nèi)存中的數(shù)據(jù)讀到緩存中,也可能不清空緩存,仍舊使用緩存中的數(shù)據(jù)進(jìn)行后續(xù)的計(jì)算。如果CPU不中斷的話,默認(rèn)CPU只會(huì)找緩存數(shù)據(jù)。volatile這個(gè)關(guān)鍵字不是改變緩存數(shù)據(jù)特性的,而是直接改變內(nèi)存中的數(shù)據(jù)特性,當(dāng)對(duì)一個(gè)對(duì)象加了volatile關(guān)鍵字修飾的時(shí)候,相當(dāng)于通知了底層OS操作系統(tǒng),告訴CPU每次進(jìn)行計(jì)算的時(shí)候最好去看一下內(nèi)存數(shù)據(jù)是否發(fā)生了變更,這就是內(nèi)存的可見性。volatile關(guān)鍵字就是為了保證內(nèi)存的可見性。
如下代碼會(huì)發(fā)生死鎖現(xiàn)象。
public class Volatile01 {
private static boolean b = true;
private void m() {
System.out.println("start...");
while (b) {}
System.out.println("end...");
}
static class Volatile_01 implements Runnable {
@Override
public void run() {
new Volatile01().m();
}
}
public static void main(String[] args) {
Volatile_01 = new Volatile_01();
new Thread(volatile_01).start();
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException ignored) {}
b = false;
}
}
當(dāng)將上述代碼塊中的共享變量b用volatile修飾時(shí)(保證了可見性),就能夠跳出循環(huán)了。
public class Volatile01 {
private static volatile boolean b = true;
private void m() {
System.out.println("start...");
while (b){}
System.out.println("end...");
}
static class Volatile_01 implements Runnable {
@Override
public void run(){
new Volatile01().m();
}
}
public static void main(String[] args) {
Volatile_01 = new Volatile_01();
new Thread(volatile_01).start();
try{
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException ignored){}
b = false;
}
}
join()方法
將多個(gè)線程連在一起,阻塞線程,直到調(diào)用join的線程執(zhí)行完成。
如下程序打印的結(jié)果時(shí)100000,如果不用join()的話打印的結(jié)果將遠(yuǎn)遠(yuǎn)小于100000。用join()可以用來(lái)等待一組線程執(zhí)行完畢后再進(jìn)行后續(xù)邏輯處理,以保證數(shù)據(jù)的正確。
public class Test {
private static volatile int count = 0;
private void m() {
for (int i = 0; i < 10000; i++) {
count++;
}
}
static class Test02 implements Runnable {
@Override
public synchronized void run() {
new Test().m();
}
}
public static void main(String[] args) {
Test02 test02 = new Test02();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(test02));
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(count);
}
}
上述代碼中用了synchronized關(guān)鍵字來(lái)實(shí)現(xiàn)原子性,也可以不用synchronized而用AtomicInteger對(duì)象,因?yàn)锳tomicInteger是一個(gè)原子性操作對(duì)象,代碼如下。
public class Test{
private static AtomicInteger count = new AtomicInteger();
private void m(){
for (int i = 0; i < 10000; i++){
count.incrementAndGet();
}
}
static class Test02 implements Runnable{
@Override
public void run(){
new Test().m();
}
}
public static void main(String[] args){
Test02 test02 = new Test02();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++){
threads.add(new Thread(test02));
}
for (Thread thread : threads){
thread.start();
try{
thread.join();
}catch (InterruptedException e){
e.printStackTrace();
}
}
System.out.println(count);
}
}
CountDownLatch對(duì)象
CountDownLatch相當(dāng)于一個(gè)門閂,在創(chuàng)建門閂對(duì)象的時(shí)候可以指定鎖的個(gè)數(shù),若某個(gè)方法調(diào)用了門閂的await()方法,那么該方法執(zhí)行到await()的時(shí)候會(huì)被阻塞等待門閂釋放,當(dāng)門閂上沒(méi)有鎖也就是門閂開放的時(shí)候繼續(xù)執(zhí)行。減門閂上鎖的方法時(shí)countDown()。
如下例,當(dāng)在m1中調(diào)用了await(),在m2中調(diào)用了countDown(),因此根據(jù)m2的邏輯當(dāng)m2執(zhí)行完了之后門閂上的鎖數(shù)量就為0了,此時(shí)m1方法可以繼續(xù)執(zhí)行了。
public class Test {
private CountDownLatch countDownLatch = new CountDownLatch(5);
private void m1() {
try {
countDownLatch.await(); // 等待門閂開放
} catch (Exception ignored) {
}
System.out.println("method m1.");
}
private void m2() {
while (countDownLatch.getCount() != 0) {
countDownLatch.countDown(); // 減門閂上的鎖
System.out.println("method m2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
}
}
public static void main(String[] args) {
Test count01 = new Test();
new Thread(count01::m2).start();
new Thread(count01::m1).start();
}
}
門閂可以和鎖混合使用,或替代鎖的功能,再門閂開放之前等待,當(dāng)門閂完全開放之后執(zhí)行,可避免鎖的效率低下問(wèn)題。
wait()、notify()和notifyAll()
wait():在對(duì)象上調(diào)用wait(), 會(huì)使當(dāng)前線程進(jìn)入等待狀態(tài), 直至另一個(gè)線程對(duì)這個(gè)對(duì)象調(diào)用了notify() 或notifyAll() 方法喚醒線程。
notify():?jiǎn)拘褜?duì)象正在等待的一個(gè)線程。
notifyAll():當(dāng)調(diào)用對(duì)象的notifyAll()方法時(shí),所有waiting狀態(tài)的線程都會(huì)被喚醒。
(生產(chǎn)者消費(fèi)者)自定義同步容器,容器上限為10,可以在多線程中應(yīng)用,并保證數(shù)據(jù)線程安全。
public class DeviceSingleton<E> {
private DeviceSingleton() {
}
private final int max = 10;
private int count = 0;
private static final DeviceSingleton DEVICE_SINGLETON = new DeviceSingleton();
public static DeviceSingleton getInstance() {
return DEVICE_SINGLETON;
}
private final List<E> devices = new ArrayList<>();
/**
* 添加
*/
public synchronized void add(E data) {
// 當(dāng)容器滿了之后進(jìn)入等待狀態(tài)
while (devices.size() == max) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("add: " + data);
ThreadUtils.sleep(1000);
devices.add(data);
count++;
this.notify();
}
/**
* 獲取
*/
public synchronized E get() {
E data = null;
while (devices.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ThreadUtils.sleep(1000);
data = devices.remove(0);
count--;
this.notifyAll();
return data;
}
/**
* 獲取長(zhǎng)度
*/
public synchronized int size() {
return count;
}
@Data
static class Device {
private int id;
private String name;
public Device(int id, String name) {
this.id = id;
this.name = name;
}
}
static class ThreadUtils {
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (Exception ignore) {}
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceSingleton deviceSingleton = DeviceSingleton.getInstance();
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
Thread.sleep(2000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new DeviceSingleton.Device(j, "device " + j));
}
}, "producer").start();
}
}
}
ReentrantLock鎖
為盡量避免使用synchronized和同步方法出現(xiàn)的一種多線程鎖機(jī)制,建議使用的同步方式,效率比synchronized高。使用重入鎖時(shí),需要手動(dòng)釋放鎖(lock.unlock())。示例如下:
public class ReentrantLockTest {
private final Lock lock = new ReentrantLock();
private void m1() {
lock.lock(); // 加鎖
for (int i = 0; i < 10; i++) {
System.out.println("method m1() " + i);
ThreadUtils.sleep(1000);
}
lock.unlock(); // 解鎖
}
private void m2() {
lock.lock(); // 加鎖
System.out.println("method m2()");
lock.unlock(); // 解鎖
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
new Thread(reentrantLockTest::m1).start();
new Thread(reentrantLockTest::m2).start();
}
}
如果沒(méi)有獲取到鎖標(biāo)記則返回false,當(dāng)前線程等待,如果獲取到了鎖標(biāo)記,則返回true,當(dāng)前線程被鎖定執(zhí)行。示例如下:
public class ReentrantLockTest {
private Lock lock = new ReentrantLock();
private void m1() {
lock.lock(); // 加鎖
for (int i = 0; i < 10; i++) {
ThreadUtils.sleep(1000);
System.out.println("method m1() " + i);
}
lock.unlock(); // 解鎖
}
private void m2() {
boolean isLocked = false;
try {
/*
嘗試鎖,如果有鎖,則無(wú)法獲取鎖標(biāo)記,返回false,否則返回true
如果無(wú)法獲取到鎖標(biāo)記,則說(shuō)明別的線程正在使用鎖,該線程等待
如果獲取到了鎖標(biāo)記,則該線程的代碼塊被鎖定
下面是獲取鎖標(biāo)記的無(wú)參方法,當(dāng)執(zhí)行到該語(yǔ)句的時(shí)候立刻獲取鎖標(biāo)記
也可以用有參的,即當(dāng)執(zhí)行到該語(yǔ)句多長(zhǎng)時(shí)間之內(nèi)獲取鎖標(biāo)記,如果超時(shí),不等待,直接返回。如isLocked = lock.tryLock(5, TimeUnit.SECONDS);表示5秒之內(nèi)獲取鎖標(biāo)記(5秒之內(nèi)任何時(shí)間獲取到鎖標(biāo)記都會(huì)繼續(xù)執(zhí)行),如果超時(shí)則直接返回。
*/
isLocked = lock.tryLock();
System.out.println(isLocked ? "m2() synchronized" : "m2() unsynchronized");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 嘗試鎖在解除鎖標(biāo)記的時(shí)候一定要判斷是否獲取到鎖標(biāo)記
if (isLocked) {
lock.unlock();
}
}
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
new Thread(reentrantLockTest::m1).start();
new Thread(reentrantLockTest::m2).start();
}
}
非可中斷鎖當(dāng)客戶端調(diào)用interrupt方法時(shí),只是簡(jiǎn)單的去設(shè)置interrupted中斷狀態(tài),并沒(méi)有進(jìn)一步拋出異常,而可中斷鎖在監(jiān)測(cè)到中斷請(qǐng)求時(shí)會(huì)拋出InterruptedException ,進(jìn)而中斷線程執(zhí)行。示例如下:
public class ReentrantLockTest {
private Lock lock = new ReentrantLock();
private void m1() {
lock.lock(); // 加鎖
for (int i = 0; i < 5; i++) {
ThreadUtils.sleep(1000);
System.out.println("method m1() " + i);
}
lock.unlock(); // 解鎖
}
private void m2() {
try {
/*
可打斷鎖,阻塞等待鎖,可以被其他的線程打斷阻塞狀態(tài)
*/
lock.lockInterruptibly(); // 可嘗試打斷
System.out.println("method m2()");
} catch (InterruptedException e) {
System.out.println("鎖被打斷");
} finally {
try {
lock.unlock();
} catch (Exception ignored) {
}
}
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
Thread thread1 = new Thread(reentrantLockTest::m1);
thread1.start();
ThreadUtils.sleep(1000);
Thread thread2 = new Thread(reentrantLockTest::m2);
thread2.start();
ThreadUtils.sleep(1000);
thread2.interrupt(); // 打斷線程休眠
}
}
注意:用ReentrantLock打斷鎖,如果要打斷的話是用線程打斷,跟喚醒不同,notifyAll喚醒是用對(duì)象區(qū)喚醒。(打斷thread.interruped(); 喚醒object.notifyAll())。
線程打斷有什么用呢?
我們?cè)谟肳indows的時(shí)候經(jīng)常會(huì)遇到軟件鎖死的問(wèn)題,這時(shí)候我們往往會(huì)通過(guò)打開任務(wù)管理器來(lái)結(jié)束進(jìn)程,這種結(jié)束進(jìn)程可以認(rèn)為是打斷鎖的阻塞狀態(tài)(即非正常結(jié)束)。
先到先得。若沒(méi)有特殊情況,不建議使用公平鎖,如果使用公平鎖的話,一般來(lái)說(shuō)并發(fā)量<=10,如果并發(fā)量較大,而不可避免的有訪問(wèn)先后順序的話,建議采用別的方法。
public class ReentrantLockTest {
static class TestReentrantLock extends Thread {
// 在創(chuàng)建ReentrantLock對(duì)象的時(shí)候傳參為true就代表創(chuàng)建公平鎖
private ReentrantLock lock = new ReentrantLock(true);
public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " get lock.");
ThreadUtils.sleep(1000);
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
TestReentrantLock lock = new TestReentrantLock();
lock.start();
new Thread(lock).start();
new Thread(lock).start();
}
}
為L(zhǎng)ock增加條件,當(dāng)條件滿足時(shí)做一些事情,如加鎖或解鎖、等待或喚醒等。下面示例就是使用Condition實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者。
public class DeviceContainer<T> {
private DeviceContainer() {
}
private static final DeviceContainer DEVICE_CONTAINER = new DeviceContainer<>();
public static DeviceContainer getInstance() {
return DEVICE_CONTAINER;
}
private final List<T> list = new LinkedList<>();
private final int max = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void add(T t) {
lock.lock();
try {
while (this.size() == max) {
System.out.println(Thread.currentThread().getName() + " 等待");
// 當(dāng)數(shù)據(jù)長(zhǎng)度為max的時(shí)候,生產(chǎn)者進(jìn)入等待隊(duì)列,釋放鎖標(biāo)記
// 借助條件進(jìn)入的等待隊(duì)列
producer.await();
}
System.out.println(Thread.currentThread().getName() + " 添加");
list.add(t);
count++;
// 借助條件喚醒所有的消費(fèi)者
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
lock.lock();
try {
while (this.size() == 0) {
System.out.println(Thread.currentThread().getName() + " 等待");
// 借助條件使消費(fèi)者進(jìn)入等待隊(duì)列
consumer.await();
}
System.out.println(Thread.currentThread().getName() + " 獲取");
t = list.remove(0);
count--;
// 借助條件喚醒所有生產(chǎn)者
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
private int size() {
return count;
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceContainer<Device> deviceSingleton = DeviceContainer.getInstance();
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
ThreadUtils.sleep(1000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new Device(j, "device " + j));
}
}, "producer-" + i).start();
}
}
}
ConcurrentHashMap/ConcurrentHashSet:底層哈希實(shí)現(xiàn)的Map/Set,效率高,使用底層技術(shù)實(shí)現(xiàn)的線程安全,量級(jí)較synchronized輕。key和value不能為null(不同于HashMap和HashSet)
ConcurrentSkipListMap/ConcurrentSkipListSet:底層跳表實(shí)現(xiàn)的Map/Set,有序,線程安全,效率較ConcurrentHashMap/ConcurrentHashSet低。
CopyOnWriteArraySet:底層數(shù)組,線程安全,增加和刪除效率低,查詢效率高。
CopyOnWriteArrayList:底層數(shù)組,線程安全,增加和刪除效率低,查詢效率高。
ConcurrentLinkedQueue/ ConcurrentLinkedDeue:基礎(chǔ)鏈表同步隊(duì)列,非阻塞,ConcurrentLinkedQueue底層單向鏈表,ConcurrentLinkedDeue底層雙向鏈表,均***。
ArrayBlockingQueue/LinkedBlockingQueue:阻塞隊(duì)列,隊(duì)列容量不足自動(dòng)阻塞,隊(duì)列容量為0自動(dòng)阻塞。ArrayBlockingQueue底層使用數(shù)組,有界;LinkedBlockingQueue底層使用鏈表,默認(rèn)***。ArrayBlockingQueue根據(jù)調(diào)用API的不同,有不同的特性。當(dāng)容量不足的時(shí)候有阻塞能力。add方法在容量不足的時(shí)候會(huì)拋出異常;put方法在容量不足時(shí)阻塞等待;offer默認(rèn)不阻塞,當(dāng)容量不足的時(shí)候返回false,否則返回true;三參offer可設(shè)定阻塞時(shí)長(zhǎng),若在阻塞時(shí)長(zhǎng)內(nèi)有容量空閑,則添加并返回true,如果阻塞時(shí)長(zhǎng)范圍內(nèi)無(wú)容量空閑,放棄新增數(shù)據(jù)并返回false。LinkedBlockingQueue的add方法在容量不足的時(shí)候會(huì)拋出異常;offer方法在容量不足時(shí)返回false,否則返回true;三參offer可設(shè)定阻塞時(shí)長(zhǎng),若在阻塞時(shí)長(zhǎng)內(nèi)有容量空閑,則添加并返回true,如果阻塞時(shí)長(zhǎng)范圍內(nèi)無(wú)容量空閑,放棄新增數(shù)據(jù)并返回false。
PriorityQueue:有限集隊(duì)列,底層數(shù)組,***。
PriorityBlockingQueue:優(yōu)先級(jí)阻塞隊(duì)列,底層數(shù)組,***。
LinkedTransferQueue:轉(zhuǎn)移隊(duì)列,使用transfer方法實(shí)現(xiàn)數(shù)據(jù)的即時(shí)處理。隊(duì)列使用add保存數(shù)據(jù),不做阻塞等待。transfer是TransferQueue的特有方法,轉(zhuǎn)移隊(duì)列必須要有消費(fèi)者(take()方法的調(diào)用者)。如果沒(méi)有任何線程消費(fèi)數(shù)據(jù),則transfer方法阻塞。一般用于處理即時(shí)消息。
SynchronousQueue:阻塞的同步隊(duì)列,有界。是一個(gè)容量為0的隊(duì)列,是一個(gè)特殊的TransferQuque。必須先有消費(fèi)線程等待才能使用的隊(duì)列。add方法無(wú)阻塞,若沒(méi)有消費(fèi)線程阻塞等待數(shù)據(jù),則拋出異常。put方法有阻塞,若沒(méi)有消費(fèi)線程阻塞等待數(shù)據(jù),則put方法阻塞。
DelayQueue:延時(shí)阻塞隊(duì)列,***。類似輪詢機(jī)制,一般用來(lái)做定時(shí)任務(wù)。業(yè)務(wù)場(chǎng)景舉例:具有過(guò)期時(shí)間的緩存,訂單過(guò)期自動(dòng)取消等。
?
線程池
線程池是一個(gè)進(jìn)程級(jí)的資源,默認(rèn)的生命周期和JVM一致,即從開啟線程池開始,到JVM關(guān)閉為止,是線程池的默認(rèn)生命周期。如果顯式調(diào)用shutdown方法,那么線程池執(zhí)行所有的任務(wù)后自動(dòng)關(guān)閉。
Executor接口
線程池頂級(jí)接口。Executor中只有一個(gè)方法execute,是用來(lái)處理任務(wù)的一個(gè)服務(wù)方法。調(diào)用者提供Runnable接口的實(shí)現(xiàn),線程池通過(guò)執(zhí)行線程執(zhí)行這個(gè)Runnable。
public class Executor01 {
public static void main(String[] args) {
new Executor_01().execute(() ->
System.out.println(Thread.currentThread().getName() + " test executor.")
);
}
static class Executor_01 implements Executor {@Override
br/>@Override
new Thread(command).start();
}
}
}
ExecutorService
Executor的子接口,與Executor不同的是,它還提供了一個(gè)返回值為Future的服務(wù)方法submit。
Executors工具類
Executor的工具類,為線程池提供工具方法,可快速創(chuàng)建線程池,所有的線程池類型都實(shí)現(xiàn)了這個(gè)接口,實(shí)現(xiàn)了這個(gè)接口就代表有提供線程池的能力。常用方法有:void execute(),F(xiàn)uture submit(Callable),F(xiàn)uture submit(Runnable),void shutdown,boolean isShutdown(),boolean isTerminated()。
public class Test {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建一個(gè)長(zhǎng)度為5的線程池對(duì)象
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
System.out.println(executorService);
// 優(yōu)雅關(guān)閉
executorService.shutdown();
// 是否已經(jīng)結(jié)束,相當(dāng)于判斷是否回收了資源,因?yàn)榫€程睡眠,此時(shí)還未回收,因此為false
System.out.println(executorService.isTerminated());
// 是否已經(jīng)關(guān)閉,即是否調(diào)用過(guò)shutdown方法
System.out.println(executorService.isShutdown());
System.out.println(executorService);
ThreadUtils.sleep(1000);
// 因?yàn)樯厦嫠?秒,任務(wù)都已經(jīng)執(zhí)行完了,資源也被回收了,因此為true
System.out.println(executorService.isTerminated());
System.out.println(executorService.isShutdown());
System.out.println(executorService);
}
}
Future
未來(lái)結(jié)果,代表線程執(zhí)行結(jié)束后的結(jié)果。通過(guò)get方法獲取線程執(zhí)行結(jié)果。
常用方法:get()、get(long, TimeUnit)和isDown()。
get():阻塞等待線程執(zhí)行結(jié)束并得到返回結(jié)果;
get(long, TimeUnit):阻塞固定時(shí)長(zhǎng),等待線程結(jié)束后的結(jié)果,如果在阻塞時(shí)長(zhǎng)范圍內(nèi)線程未執(zhí)行結(jié)束,拋出異常。
isDown():判斷線程是否結(jié)束即判斷call方法是否已完成,要特別注意,這里的isDown與ExecutorService中的isShutdown不同,isShutdown是用來(lái)判斷線程是否關(guān)閉的。
public class ExecutorServiceTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
testExecutorService();
}
private static void testExecutorService() throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(1);
Future<String> future = service.submit(() -> {
ThreadUtils.sleep(1000);
return Thread.currentThread().getName() + " submit.";
});
// 查看任務(wù)是否完成即線程是否結(jié)束即call方法是否執(zhí)行結(jié)束,
// 要注意的是,這里判斷是否結(jié)束,跟ExecutorService中的isShutDowm不同, isShutdowm是判斷線程是否結(jié)束,而shutdown表示關(guān)閉線程
System.out.println(future.isDone());
// 獲取call方法的返回值
System.out.println(future.get()); // false
System.out.println(future.isDone());
System.out.println(future.get()); // true
// 關(guān)閉線程池
service.shutdown();
}
}
Callable接口
可執(zhí)行接口。類似Runnable接口,也是可以啟動(dòng)線程的接口。
接口方法:call(),相當(dāng)于Runnable中的run方法,區(qū)別在于call方法有返回值。
Callable和Runnable的選擇:當(dāng)需要返回值或需要拋出異常時(shí),使用Callable,其他情況任意選。
ThreadPoolExecutor創(chuàng)建線程池
通過(guò)new ThreadPoolExecutor來(lái)創(chuàng)建,下圖是ThreadPoolExecutor的三個(gè)構(gòu)造方法:
參數(shù)說(shuō)明:
corePoolSize? 核心線程數(shù)
maximumPoolSize? 最大線程數(shù)
keepAliveTime? 線程最大空閑時(shí)間
unitTimeUnit? 時(shí)間單位
workQueueBlockingQueue<Runnable>? 線程等待隊(duì)列
threadFactoryThreadFactory? 線程創(chuàng)建工廠
handlerRejectedExecutionHandler? 拒絕策略
?
核心線程數(shù)和最大線程數(shù):
當(dāng)提交一個(gè)新任務(wù)到線程池時(shí)首先判斷核心線程數(shù)corePoolSize是否已達(dá)上限,若未達(dá)到corePoolSize上限,創(chuàng)建一個(gè)工作線程來(lái)執(zhí)行任務(wù);否則,再判斷線程池工作隊(duì)列workQueueBlockingQueue是否已滿,若沒(méi)滿,則將新提交的任務(wù)存儲(chǔ)在工作隊(duì)列里;否則,線程池將判斷最大線程數(shù)是否已達(dá)上限,若未達(dá)到maximumPoolSize上限,則創(chuàng)建一個(gè)新的工作線程來(lái)執(zhí)行任務(wù),滿了,則交給飽和策略來(lái)處理這個(gè)任務(wù)。如果線程池中的線程數(shù)量大于核心線程數(shù) corePoolSize 時(shí),線程空閑時(shí)間超過(guò)線程最大空閑時(shí)間keepAliveTime,則線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize。
自定義線程池
public class ExecutorThreadPoolTest {
public static void main(String[] args) {
testExecutorThreadPool();
}
private static void testExecutorThreadPool() {
// 創(chuàng)建線程池,核心線程數(shù)為2,最大線程數(shù)為4,最大空閑時(shí)間為10
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
4,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new MyTreadFactory(),
new MyIgnorePolicy());
// 啟動(dòng)所有核心線程,使其出與等待狀態(tài)
executor.prestartAllCoreThreads();
// 創(chuàng)建并執(zhí)行任務(wù)
for (int i = 1; i <= 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}
}
static class MyTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(runnable, "線程【" + mThreadNum.getAndIncrement() + "】");
System.out.println(t.getName() + " 已創(chuàng)建");
return t;
}
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
doLog(runnable, executor);
}
private void doLog(Runnable runnable, ThreadPoolExecutor executor) {
System.err.println(runnable.toString() + " 被拒絕");
}
}
@Data
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(this.toString() + " 正在運(yùn)行");
ThreadUtils.sleep(1000);
}
@Override
public String toString() {
return "線程【" + name + "】";
}
}
}
FixedThreadPool線程池
固定容量的線程池,可由Executors來(lái)創(chuàng)建,活動(dòng)狀態(tài)和線程池容量是有上限的,需要手動(dòng)銷毀線程池。構(gòu)造方法如下:
由此可見,該線程池核心線程數(shù)和最大線程數(shù)均為構(gòu)造參數(shù)值nThreads,線程最大空閑時(shí)間為0,任務(wù)隊(duì)列采用LinkedBlockingQueue,默認(rèn)容量上限是Integer.MAX_VALUE。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
// 創(chuàng)建容量為10的FixedThreadPool線程池
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
service.execute(()-> System.out.println(Thread.currentThread().getName()));
}
// 銷毀線程池
service.shutdown();
}
}
CachedThreadPool線程池
緩存線程池,通過(guò)Executors來(lái)創(chuàng)建,默認(rèn)最大容量為Integer.MAX_VALUE,自動(dòng)擴(kuò)容,執(zhí)行完后自動(dòng)銷毀(這一點(diǎn)與FixedThreadPool不同,F(xiàn)ixedThreadPool的銷毀需要手動(dòng)調(diào)用shutdown方法)。構(gòu)造方法如下:
由構(gòu)造方法可見,核心線程數(shù)為0,最大線程數(shù)為Integer.MAX_VALUE,最大空閑時(shí)間為60秒,任務(wù)隊(duì)列使用SynchronousQueue。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
// 創(chuàng)建緩存線程池
ExecutorService service = Executors.newCachedThreadPool();
System.out.println(service);
for (int i = 0; i < 5; i++) {
service.execute(() -> {
ThreadUtils.sleep(1000);
System.out.println(Thread.currentThread().getName() + " executor.");
});
}
System.out.println(service);
ThreadUtils.sleep(65);
System.out.println(service);
}
}
ScheduledThreadPool線程池
計(jì)劃任務(wù)線程池,可以根據(jù)任務(wù)自動(dòng)執(zhí)行計(jì)劃的線程池,由Executors創(chuàng)建,需要手動(dòng)銷毀。計(jì)劃任務(wù)時(shí)選用,如需要定時(shí)整理數(shù)據(jù)、服務(wù)器定期清除無(wú)效文件等。構(gòu)造方法如下:
核心線程數(shù)為構(gòu)造參數(shù)大小,最大線程數(shù)為Integer.MAX_VALUE,最大空閑時(shí)間0,任務(wù)隊(duì)列使用DelayedWorkQuquq。
常用方法有:scheduledAtFixedRate、schedule、execute等。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
// 創(chuàng)建計(jì)劃任務(wù)線程池
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
System.out.println(service);
// 定期任務(wù),線程池啟動(dòng)500毫秒后第一次執(zhí)行任務(wù),以后每300毫秒執(zhí)行一次
service.scheduleAtFixedRate(() -> {
ThreadUtils.sleep(1000);
System.out.println(Thread.currentThread().getName() + " executor.");
}, 500, 300, TimeUnit.MILLISECONDS);
System.out.println(service);
service.shutdown();
}
}
SingleThreadExecutor線程池
單一容量的線程池。需要手動(dòng)銷毀。有保證任務(wù)順序需求時(shí)可選用。如大廳中的公共頻道聊天,固定數(shù)量商品的秒殺等。構(gòu)造方法如下:
核心線程數(shù)和最大線程數(shù)均為1,任務(wù)隊(duì)列為L(zhǎng)inkedBlockingQueue。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
// 創(chuàng)建單一容量線程池
ExecutorService service = Executors.newSingleThreadExecutor();
System.out.println(service);
for (int i = 0; i < 5; i++) {
service.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
service.shutdown();
}
}
ForkJoinPool線程池
分支合并線程池,適用于處理復(fù)雜任務(wù)。初始化線程容量與CPU核心數(shù)有關(guān)。
ForkJoinPool沒(méi)有所謂的容量,默認(rèn)都是一個(gè)線程,根據(jù)任務(wù)自動(dòng)分支新的子線程,,當(dāng)子線程結(jié)束后自動(dòng)合并。所謂自動(dòng)合并,是用fork和join兩個(gè)方法實(shí)現(xiàn)的(手動(dòng)調(diào)用)。
線程池中運(yùn)行的可分治合并的任務(wù)必須是ForkJoinTask的子類型(RecursiveTask或RecursiveAction,二者的區(qū)別在于一個(gè)運(yùn)行完之后有返回值,一個(gè)沒(méi)有),其中提供了分支和合并能力。
ForkJoinTask提供了兩個(gè)抽象子類型RecursiveTask和RecursiveAction,RecursiveTask是有返回結(jié)果的分支合并任務(wù),RecursiveAction是無(wú)返回結(jié)果的分支合并任務(wù)(類似Callable和Runnable的區(qū)別)。
ForkJoinTask提供了一個(gè)compute方法,這個(gè)方法里面就是任務(wù)的執(zhí)行邏輯。
該線程池主要用于大量數(shù)據(jù)的計(jì)算、數(shù)據(jù)分析等。
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long result = 0L;
for (int NUMBER : NUMBERS) {
result += NUMBER;
}
System.out.println(result);
ForkJoinPool pool = new ForkJoinPool();
// 分支合并任務(wù)
AddTask task = new AddTask(0, NUMBERS.length);
// 提交任務(wù)
Future<Long> future = pool.submit(task);
System.out.println(future.get());
}
private static final int[] NUMBERS = new int[1000000];
private static final int MAX_SIZE = 50000;
private static final Random RANDOM = new Random();
static {
for (int i = 0; i < NUMBERS.length; i++) {
NUMBERS[i] = RANDOM.nextInt(1000);
}
}
static class AddTask extends RecursiveTask<Long> {
int begin, end;
AddTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Long compute() {
if ((end - begin) < MAX_SIZE) {
long sum = 0L;
for (int i = begin; i < end; i++) {
sum += NUMBERS[i];
}
return sum;
} else {
// 當(dāng)結(jié)束值減去開始值大于臨界值的時(shí)候進(jìn)行分支
int middle = begin + (end - begin) / 2;
AddTask task1 = new AddTask(begin, middle);
AddTask task2 = new AddTask(middle, end);
// 分支的工作,就是開啟一個(gè)新的線程任務(wù)
task1.fork();
task2.fork();
// join就是合并,將任務(wù)的結(jié)果獲取,是一個(gè)阻塞方法,一定會(huì)得到結(jié)果數(shù)據(jù)
return task1.join() + task2.join();
}
}
}
}
一組線程的集合,線程組中多個(gè)線程執(zhí)行同一批任務(wù),線程之間是隔離的,互不影響。同一組的線程之間可以通信,但不同組的線程之間不能通信,這樣就做到了線程屏蔽,保證了線程安全。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
ThreadGroup group = new ThreadGroup("LQ");
Thread thread = new Thread(group, () ->
System.out.println("group is " + Thread.currentThread().getThreadGroup().getName())
);
thread.start();
}
}
朋友們覺得內(nèi)容有什么錯(cuò)誤、不足之處,或者有什么疑問(wèn),盡可留言指出來(lái),一起學(xué)習(xí)哦。
分享題目:Java并發(fā)編程基本知識(shí)
網(wǎng)站URL:http://jinyejixie.com/article32/gdjjsc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供Google、網(wǎng)站制作、網(wǎng)站導(dǎo)航、云服務(wù)器、定制開發(fā)、服務(wù)器托管
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)