【尚硅谷】JUC基础篇(2021版)

大厂必备技术之JUC并发编程

B站直达

2021版: 【尚硅谷】大厂必备技术之JUC并发编程

2022版: 尚硅谷JUC并发编程(对标阿里P6-P7)

课程资料

2021版:尚硅谷高级技术之JUC高并发编程2021最新版

2022版:尚硅谷JUC并发编程与源码分析2022

代码仓库

GitHub: https://github.com/Shiguang-coding/learn-juc

Gitee: https://gitee.com/an_shiguang/learn-juc

1、什么是JUC

1.1、JUC简介

JDK6中文在线文档:https://tool.oschina.net/apidocs/apidoc?api=jdk-zh

在Jv中,线程部分是一个重点,本篇文章说的UC也是关于线程的。JUC就是java.util.concurrent工具包的简称。这是一个处理线程的工具包,JDK1.5开始出现的。

1.2、进程与线程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

总结来说:

进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程是资源分配的最小单位。

线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程是程序执行的最小单位。

1.3、线程的状态

1.3.1、线程状态枚举类

进入 java.lang.Thread 类,找到内部类 State ,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
// 新建
NEW,

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
// 准备就绪
RUNNABLE,

/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
// 阻塞
BLOCKED,

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
// 等待(不见不散)
WAITING,

/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
// 带时间的等待(过时不候)
TIMED_WAITING,

/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
// 终结
TERMINATED;
}

1.3.2、wait和sleep的区别

  1. sleep是Thread的静态方法;wait是Object的方法,任何对象实例都能调用。
  2. sleep不会释放锁,它也不需要占用锁;wait会释放锁,但调用它的前提是当前线程占有锁(即代码要在synchronized中)
  3. 它们都可以被interrupted方法中断

1.4、并发与并行

1.4.1、串行模式

串行是一次只能取得一个任务,并执行这个任务。

串行表示所有任务都一一按先后顺序进行。串行意味看必须先装完一车柴才能运送这车柴,只有运送到了,才能卸下这车柴,并且只有完成了这整个三个步骤,才能进行下一个步骤。

1.4.2、并行模式

并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。并行模式相当于将长长的一条队列,划分成了多条短队列,所以并行缩短了任务队列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核CPU。

1.4.3、并发

并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行。但这不是重点,在描述并发的时候也不会去扣这种字眼是否精确,并发的重点在于它是一种现象,并发描述
的是多进程同时运行的现象。但实际上,对于单核心CPU来说,同一时刻只能运行一个线程。所以,这里的”同时运行”表示的不是真的同一时刻有多个线程运行的现象,这是并行的概念,而是提供一种功能让用户看来多个程序同时运行起来了,但实际上这些程序中的进程不是一直霸占CPU的,而是执行一会停一会(线程的上下文切换)。

要解决大并发问题,通常是将大任务分解成多个小任务,由于操作系统对进程的调度是随机的,所以切分成多个小任务后,可能会从任一小任务处执行。这可能会出现一些现象:

  • 可能出现一个小任务执行了多次,还没开始下个任务的情况。这时一般会采用队列或类似的数据结构来存放各个小任务的成果。
  • 可能出现还没准备好第一步就执行第二步的可能。这时,一般采用多路复用或异步的方式,比如只有准备好产生了事件通知才执行某个任务。
  • 可以多进程/多线程的方式并行执行这些小任务。也可以单进程/单线程执行这些小任务,这时很可能要配合多路复用才能达到较高的效率。

1.4.4、小结(重点)

并发:同一时刻多个线程在访问同一个资源,多个线程对一个点。
例子:春运抢票电商秒杀…

并行:多项工作一起执行,之后再汇总
例子:泡方便面,电水壶烧水,一边撕调料倒入桶中

1.5、管程

即Monitor(监视器),即平时所说的锁,是一种同步机制,保证同一时间只有一个线程访问被保护的数据或代码

JVM中的同步基于进入和退出管程对象来实现的。

1.6、用户线程和守护线程

用户线程:即自定义线程,平时所用到的线程基本都是用户线程。主线程结束了,用户线程还在运行,jvm存活。

守护线程:是一种特殊的线程,在后台运行的线程,比如垃圾回收,没有用户线程了,都是守护线程,jvm结束。

举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 是否为守护线程: " + Thread.currentThread().isDaemon());
while (true) {

}
}, "myThread");

// 设置为守护线程,在线程启动之前设置
thread.setDaemon(true);

thread.start();

System.out.println(Thread.currentThread().getName() + " over");

}
}

2、Lock接口

2.1、Synchronized

2.1.1 Synchronized关键字回顾

synchronized是Java中的关键字,是一种同步锁。它修饰的对象有以下几种:

  1. 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号括起来的代码,作用的对象是调用这个代码块的对象;

  2. 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;

    虽然可以使用synchronized来定义方法,但synchronized并不属于方法定义的一部分,因此,synchronized关键字不能被继承。如果在父类中的某个方法使用了synchronized关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上synchronized关键字才可以。当然,还可以在子类方法中调用父类中相应的方法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此,子类的方法也就相当于同步了。

  3. 修饰一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象

  4. 修饰一个类,其作用的范围是synchronized后面括号括起来的部分,作用主的对象是这个类的所有对象。

2.1.2、使用synchronized实现售票案例

多线程的编程步骤:
image-20241209152423704

第一步:创建一个资源类,属性和操作方法

第二步:创建多个线程,调用类里面的操作方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.shiguang.sync;

/**
* Created By Shiguang On 2024/12/9 13:52
*/

// 第一步 创建资源类,定义属性和操作方法
class Ticket {
// 票数
private int number = 30;
// 卖票操作
public synchronized void sale() {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + (number--) + " 张票, 剩余: " + number);
}
}
}
public class SaleTicket {
// 第二步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
// 创建 Ticket 对象
Ticket ticket = new Ticket();

// 创建三个线程
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "A").start();

new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "B").start();


new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "C").start();
}
}

2.2、什么是Lock

Lock锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。Lock提供了比synchronized更多的功能。

Lock与的Synchronized区别

  • Lcck不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问;
  • Lock和synchronized有一点非常大的不同,采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而L0ck则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。

2.3、ReentrantLock

2.3.1、ReentrantLock介绍

ReentrantLock是Java中的一个可重入锁,实现了Lock接口。
可重入意味着同一个线程可以多次获取同一个锁而不会造成死锁,每次获取锁时,锁的持有计数会增
加,每次释放锁时,持有计数会减少,当持有计数为0时,锁被完全释放。

优点:

  • 提供了比synchronized关键字更灵活的锁操作,例如可中断的锁获取、超时等待锁、公平锁和非
    公平锁的选择等。
  • 可以使用tryLock0方法尝试获取锁,避免了线程的无限期等待。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();

public void performTask() {
lock.lock();
try {
// 临界区代码,需要同步的部分
System.out.println("Thread " + Thread.currentThread().getName() + " is performing the task.");
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
ReentrantLockExample example = new ReentrantLockExample();
Thread t1 = new Thread(example::performTask);
Thread t2 = new Thread(example::performTask);
t1.start();
t2.start();
}
}

代码解释:
ReentrantLock lock = new ReentrantLock();:创建一个ReentrantLock实例。

lock.lock();:获取锁,如果锁已被其他线程持有,则当前线程会阻塞直到获取到锁。

try{...} finally {lock.unlock();}:在tny块中执行需要同步的代码,确保在finally块中释放锁,
以防止死锁。

lock.tryLock():尝试获取锁,如果锁可用则立即返回true,否则返回false,不会阻塞线程。

使用场景:
当需要更灵活的锁操作,如可中断的锁获取、超时等待锁、公平锁等时,使用ReentrantLock比
synchronized更合适。
在复杂的同步场景中,如条件变量(Condition)的使用,ReentrantLock提供了更强大的功能.

注意事项:
务必在finally块中释放锁,以确保锁最终会被释放,避免死锁。
对于简单的同步需求,synchronized可能更简洁,但对于复杂的同步需求,ReentrantLock提供
了更多的控制和功能。

2.3.2、售票案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.shiguang.lock.reentrantlock;

import java.util.concurrent.locks.ReentrantLock;

/**
* Created By Shiguang On 2024/12/9 14:09
*/

// 第一步 创建资源类,定义属性和操作方法
class LTicket {
// 票数
private int number = 30;

// 创建可重用锁
private final ReentrantLock lock = new ReentrantLock();

// 卖票操作
public void sale() {
// 加锁
lock.lock();
try {
// 判断是否有票
if (number > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + (number--) + " 张票, 剩余: " + number);
}
} finally {
// 解锁
lock.unlock();
}

}
}

public class LSaleTicket {

// 第二步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
LTicket ticket = new LTicket();
// 创建三个线程
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "A").start();

new Thread(() -> {

for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "B").start();

new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "C").start();
}


}

2.4、ReadWriteLock

2.4.1、ReadWriteLock介绍

ReadWriteLock是Java中的一个接口,它维护了一对关联的锁,一个用于只读操作,一个用于写
入操作。

其主要目的是在多线程环境下,允许多个线程同时读取共享数据,但在写入数据时,需要独占访问,
以保证数据的一致性和完整性。

常见的实现类是 ReentrantReadWriteLock,它提供了可重入的读写锁功能。

优点:

  • 提高并发性能:在多读少写的场景下,使用ReadWriteLock可以显著提高程序的并发性能,因为多
    个读线程可以同时访问共享资源,而不会互相阻塞。
  • 数据一致性:在写操作时,通过独占锁保证数据的一致性,避免了多个线程同时修改数据导致的数据
    不一致问题。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockExample {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private int data = 0;

// 读操作
public int readData() {
lock.readLock().lock();
try {
// 多个读线程可以同时进入这里
return data;
} finally {
lock.readLock().unlock();
}
}

// 写操作
public void writeData(int newData) {
lock.writeLock().lock();
try {
// 只有一个写线程可以进入这里
data = newData;
} finally {
lock.writeLock().unlock();
}
}
}

代码解释:
ReadWriteLock lock=new ReentrantReadWriteLock();:创建-个ReentrantReadWriteLock
实例。

lock.readLock().lock();:获取读锁,多个读线程可以同时获取读锁,只要没有写线程特有写锁。

lock.readLock().unlock();:释放读锁

lock.writeLock().lock();:获取写锁,只有一个写线程可以获取写锁,并且在写锁被持有时,其他读
线程和写线程都将被阻塞。

lock.writeLock().unlock();:释放写锁。

2.4.2、售票案例

将 ReentrantLock 替换为 ReentrantReadWriteLock 。
使用writeLock进行写操作(卖票操作),因为卖票会修改票数,属干写操作,在sale方法中,使用writeLock进行加锁和解锁操作。其他部分代码保持不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.shiguang.lock.reentrantlock.readwritelock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Created By Shiguang On 2024/12/9 14:09
*/

// 第一步 创建资源类,定义属性和操作方法
class LTicket {
// 票数
private int number = 30;

// 创建可重用读写锁
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

// 卖票操作
public void sale() {
// 获取写锁
lock.writeLock().lock();
try {
// 判断是否有票
if (number > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + (number--) + " 张票, 剩余: " + number);
}
} finally {
// 释放写锁
lock.writeLock().unlock();
}
}
}

public class LSaleTicket {

// 第二步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
LTicket ticket = new LTicket();
// 创建三个线程
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "A").start();

new Thread(() -> {

for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "B").start();

new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "C").start();
}


}

使用场景:
适用于读操作颇繁,写操作较少的场景,例收如缓存系统、配置信息读取等。

注意事项:
读锁和写锁的使用要注意正确的加锁和解锁顺序,避免死锁。
写锁是独占的,会阻塞其他读锁和写锁,所以要确保写操作的时间尽量短,以提高并发性能。

2.5、 小结(重点)

Lock和synchronized有以下几点不同:

  1. Lock是一个接口,而synchronized是Java中的关键字,synchronized是内
    置的语言实现;
  2. synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
  3. Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
  4. 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
  5. Lock可以提高多个线程进行读操作的效率。

在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。

3、线程间通信

3.1、线程间通信案例

多线程编程步骤中可在写操作方法时再归纳一个步骤,即判断,干活,通知

image-20241209152525546

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.shiguang.sync;

/**
* Created By Shiguang On 2024/12/9 15:01
*/
// 第一步 创建资源类,定义属性和操作方法
class Share {
//初始值
private int number = 0;

// +1 的方法
public synchronized void incr() throws InterruptedException {
// 第二步:判断, 干活, 通知
if (number != 0) {// 判断number值是否为0,不为0则等待
// 等待
this.wait();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + " : " + number);
// 通知其他线程
this.notifyAll();
}

// -1 的方法
public synchronized void decr() throws InterruptedException {
// 第二步:判断, 干活, 通知
if (number != 1) {// 判断number值是否为1,不为0则等待
// 等待
this.wait();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + " : " + number);
// 通知其他线程
this.notifyAll();

}
}


public class ThreadDemo1 {
//第三步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
// 创建 Share 对象
Share share = new Share();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}, "A").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}, "B").start();

}
}

3.2、虚假唤醒问题

我们多复制一份代码,再引入两个线程,分别两个线程进行加操作,两个线程进行减操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.shiguang.sync;

/**
* Created By Shiguang On 2024/12/9 15:01
*/
// 第一步 创建资源类,定义属性和操作方法
class Share {
//初始值
private int number = 0;

// +1 的方法
public synchronized void incr() throws InterruptedException {
// 第二步:判断, 干活, 通知
if (number != 0) {// 判断number值是否为0,不为0则等待
// 等待
this.wait();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + " : " + number);
// 通知其他线程
this.notifyAll();
}

// -1 的方法
public synchronized void decr() throws InterruptedException {
// 第二步:判断, 干活, 通知
if (number != 1) {// 判断number值是否为1,不为0则等待
// 等待
this.wait();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + " : " + number);
// 通知其他线程
this.notifyAll();

}
}


public class ThreadDemo1 {
//第三步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
// 创建 Share 对象
Share share = new Share();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}, "A").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}, "B").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}, "C").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}, "D").start();

}
}

运行结果如下:

可见出现了错误情况(数值超过一或者数值小于0)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
A : 1
D : 0
C : 1
B : 0
C : 1
B : 0
C : 1
B : 0
C : 1
B : 0
C : 1
B : 0
C : 1
B : 0
C : 1
B : 0
C : 1
B : 0
C : 1
B : 0
C : 1
B : 0
D : -1
A : 0
A : 1
D : 0
A : 1
D : 0
A : 1
D : 0
A : 1
D : 0
A : 1
D : 0
A : 1
D : 0
A : 1
D : 0
A : 1
D : 0

产生问题的原因是因为出现了虚假唤醒的情况,在多线程编程中,尤其是使用wai()notify()notifyAll()进行线程间通信时,可能会出现虚假唤醒的情况。

虚假唤醒就是在多线程执行过程中,线程间的通信未按照我们预想的顺序唤醒,故出现数据不一致等不符合我们预期的结果。例如我们的想法是:加1和减1交替执行,但执行结果出现了超过1的数或者小于0的数。

在上述代码中,如果使用f语句进行条件判断,当线程被虚假唤醒时,它会继续执行wait()之后的
代码,而不重新检查条件。这可能导致程序逻辑错误,例如vaue的值不符合预期。这是因为wait()方法在哪里等待,就在哪里唤醒。

应该使用while循环代替if进行条件判断,以防止虚假唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// +1 的方法
public synchronized void incr() throws InterruptedException {
// 第二步:判断, 干活, 通知
while (number != 0) {// 判断number值是否为0,不为0则等待
// 等待
this.wait();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + " : " + number);
// 通知其他线程
this.notifyAll();
}

// -1 的方法
public synchronized void decr() throws InterruptedException {
// 第二步:判断, 干活, 通知
while (number != 1) {// 判断number值是否为1,不为0则等待
// 等待
this.wait();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + " : " + number);
// 通知其他线程
this.notifyAll();

}

此时,多线程编程步骤可再多加个步骤,即使用while循环代替if进行条件判断防止虚假唤醒问题

image-20241209152659459

3.3、ReentrantLock实现线程间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package com.shiguang.lock.reentrantlock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created By Shiguang On 2024/12/9 15:57
*/


// 第一步 创建资源类,定义属性和操作方法
class Share {
// 初始值
private int number = 0;

// 创建Lock
private Lock lock = new ReentrantLock();

private Condition condition = lock.newCondition();

// +1 的方法
public void incr() throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (number != 0) {
condition.await();
}

// 干活
number++;

System.out.println(Thread.currentThread().getName() + " : " + number);

// 通知
condition.signalAll();
} finally {
// 解锁
lock.unlock();

}
}

// -1 的方法
public void decr() throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (number != 1) {
condition.await();
}

// 干活
number--;
System.out.println(Thread.currentThread().getName() + " : " + number);

// 通知
condition.signalAll();

} finally {
// 解锁
lock.unlock();
}
}
}


public class ThreadDemo2 {

public static void main(String[] args) {
// 创建 Share 对象
Share share = new Share();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();

}

}

4、线程间定制化通信

4.1、案例分析

image-20241209161554013

4.2、案例实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.shiguang.lock.reentrantlock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created By Shiguang On 2024/12/9 16:17
*/
// 第一步 创建资源类,定义属性和操作方法

class shareResource {
// 定义标志位
private int flag = 1; // 1 A ,2 B,3 C

// 创建Lock锁
private Lock lock = new ReentrantLock();

// 创建三个Condition
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();

// 打印5次,参数第几轮
public void print5(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (flag != 1) {
c1.await();
}
// 干活
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i + " , " + "轮数:" + loop);
}
// 通知
flag = 2;
c2.signal();
} finally {
// 解锁
lock.unlock();
}
}

// 打印10次,参数第几轮
public void print10(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (flag != 2) {
c2.await();
}
// 干活
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i + " , " + "轮数:" + loop);
}
// 通知
flag = 3;
c3.signal();
} finally {
// 解锁
lock.unlock();
}
}

// 打印15次,参数第几轮
public void print15(int loop) throws InterruptedException {
// 上锁
lock.lock();
try {
// 判断
while (flag != 3) {
c3.await();
}
// 干活
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i + " , " + "轮数:" + loop);
}
// 通知
flag = 1;
c1.signal();
} finally {
// 解锁
lock.unlock();
}
}
}


public class ThreadDemo3 {
public static void main(String[] args) {
// 创建 Share 对象
shareResource shareResource = new shareResource();

new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
shareResource.print5(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();

new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
shareResource.print10(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();

new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
shareResource.print15(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
}

}

5、集合的线程安全

5.1、ArrayList集合线程不安全演示

ArrayList中add()方法定义如下,没有加synchronized关键字,是非线程安全的方法。

1
2
3
4
5
6
7
8
9
10
11
/**
* This helper method split out from add(E) to keep method
* bytecode size under 35 (the -XX:MaxInlineSize default value),
* which helps when add(E) is called in a C1-compiled loop.
*/
private void add(E e, Object[] elementData, int s) {
if (s == elementData.length)
elementData = grow();
elementData[s] = e;
size = s + 1;
}

问题代码案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Created By Shiguang On 2024/12/9 16:36
* List集合线程不安全问题
*/
public class ThreadDemo4 {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
try {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
} catch (Exception e) {
e.printStackTrace();
// 异常时终止程序, 0表示正常终止,非零值表示异常终止
System.exit(1);
}
}, String.valueOf(i)).start();
}
}
}

可能会出现的异常如下:

image-20241209165550785

5.1.1、解决方案-Vector

Vector中add()方法定义如下,添加了synchronized关键字,是线程安全的方法。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Appends the specified element to the end of this Vector.
*
* @param e element to be appended to this Vector
* @return {@code true} (as specified by {@link Collection#add})
* @since 1.2
*/
public synchronized boolean add(E e) {
modCount++;
add(e, elementData, elementCount);
return true;
}

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
List<String> list = new Vector<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
try {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
} catch (Exception e) {
e.printStackTrace();
// 异常时终止程序, 0表示正常终止,非零值表示异常终止
System.exit(1);
}
}, String.valueOf(i)).start();
}
}

多次执行未出现异常情况

5.1.2、解决方案-Collections

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
List<String> list = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < 30; i++) {
new Thread(() -> {
try {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
} catch (Exception e) {
e.printStackTrace();
// 异常时终止程序, 0表示正常终止,非零值表示异常终止
System.exit(1);
}
}, String.valueOf(i)).start();
}
}

5.1.3、解决方案-CopyOnWriteArrayList

CopyOnWrite 即 写时复制,写时复制一份,在复制的这一份中写入新内容,最后覆盖合并之前的内容,即兼顾了并发读,也照顾了独立的写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1);
es[len] = e;
setArray(es);
return true;
}
}

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {

List<String> list = new CopyOnWriteArrayList<>();

for (int i = 0; i < 30; i++) {
new Thread(() -> {
try {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
} catch (Exception e) {
e.printStackTrace();
// 异常时终止程序, 0表示正常终止,非零值表示异常终止
System.exit(1);
}
}, String.valueOf(i)).start();
}
}

5.2、HashSet集合线程不安全演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {

// 使用Set集合线程不安全问题
HashSet<String> set = new HashSet<>();

for (int i = 0; i < 30; i++) {
new Thread(() -> {
try {
set.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(set);
} catch (Exception e) {
e.printStackTrace();
// 异常时终止程序, 0表示正常终止,非零值表示异常终止
System.exit(1);
}
}, String.valueOf(i)).start();

}
}

多次执行可能出现如下异常:

image-20241209173427827

5.2.1、解决方案-CopyOnWriteArraySet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {

Set<String> set = new CopyOnWriteArraySet<>();

for (int i = 0; i < 30; i++) {
new Thread(() -> {
try {
set.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(set);
} catch (Exception e) {
e.printStackTrace();
// 异常时终止程序, 0表示正常终止,非零值表示异常终止
System.exit(1);
}
}, String.valueOf(i)).start();

}
}

5.3、HashMap集合线程不安全演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {

// 使用Map集合线程不安全问题
Map<String, String> map = new HashMap<>();
for (int i = 0; i < 30; i++) {
String key = String.valueOf(i);
new Thread(() -> {
try {
map.put(key, UUID.randomUUID().toString().substring(0, 8));
System.out.println(map);
} catch (Exception e) {
e.printStackTrace();
// 异常时终止程序, 0表示正常终止,非零值表示异常终止
System.exit(1);
}
}, String.valueOf(i)).start();

}
}

多次执行可能出现如下异常:

image-20241209174110324

5.3.1、解决方案-Collections

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {

Map<String, String> map = Collections.synchronizedMap(new HashMap<>());

for (int i = 0; i < 30; i++) {
String key = String.valueOf(i);
new Thread(() -> {
try {
map.put(key, UUID.randomUUID().toString().substring(0, 8));
System.out.println(map);
} catch (Exception e) {
e.printStackTrace();
// 异常时终止程序, 0表示正常终止,非零值表示异常终止
System.exit(1);
}
}, String.valueOf(i)).start();

}
}

5.3.2、解决方案-ConcurrentHashMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {

Map<String, String> map = new ConcurrentHashMap<>();

for (int i = 0; i < 30; i++) {
String key = String.valueOf(i);
new Thread(() -> {
try {
map.put(key, UUID.randomUUID().toString().substring(0, 8));
System.out.println(map);
} catch (Exception e) {
e.printStackTrace();
// 异常时终止程序, 0表示正常终止,非零值表示异常终止
System.exit(1);
}
}, String.valueOf(i)).start();

}
}

6、多线程锁

6.1、锁的8种情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.shiguang.sync;

import java.util.concurrent.TimeUnit;

/**
* Created By Shiguang On 2024/12/9 17:51
*/

class Phone{
public static synchronized void sendSMS() throws Exception{
//停4秒
TimeUnit.SECONDS.sleep(4);
System.out.println(Thread.currentThread().getName() + "," + "发短信");
}

public synchronized void sendEmail() throws Exception{
System.out.println(Thread.currentThread().getName() + "," + "发邮件");
}

public void getHello(){
System.out.println("hello");
}
}

/**
* 8种锁的总结:
* 1 标准访问,先打印短信还是邮件
* A,发短信
* B,发邮件
* 2 停4秒在短信方法内,先打印短后还是邮件
* A,发短信
* B,发邮件
* 3 新增誉通heLLo方法还是先打短后还是hello
* hello
* A,发短信
* 4 现在有两部手机,先打印短信还是邮件
* B,发邮件
* A,发短信
* 5 两个静态同步方法,1部手机,先打印短层还是邮件
* A,发短信
* B,发邮件
* 6 两个静态同步方法,2部手机,先打印短层还是邮件
* A,发短信
* B,发邮件
* 7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信后还是邮件
* B,发邮件
* A,发短信
* 8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信后还是邮件
* B,发邮件
* A,发短信
*/
public class Lock_8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
Phone phone2 = new Phone();

new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();

Thread.sleep(100);

new Thread(() -> {
try {
// phone.sendEmail();
// phone.getHello();
phone2.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();

}

}

结论

synchronized实现同步的基础:Java中的每一个对象都可以作为锁,具体表现为以下3种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的c1ass对象。
对于同步方法块,锁是Synchonized括号里配置的对象

6.2、公平锁和非公平锁

在并发编程中,公平锁(Fair Lock)和非公平锁(Non-fair Lock)是两种常见的锁机制,它们主要用于控制多个线程对共享资源的访问顺序。

6.2.1、公平锁(Fair Lock)

定义:公平锁是指多个线程按照请求锁的顺序来获取锁,即先请求的线程先获得锁。

特点

  • 顺序性:线程按照请求锁的顺序排队,先到先得。
  • 公平性:避免了线程饥饿(Starvation),即某个线程长时间无法获得锁的情况。
  • 性能开销:由于需要维护一个有序的队列,公平锁的性能通常比非公平锁差,因为每次获取锁时都需要检查队列并进行相应的操作。

适用场景

  • 当系统对线程的执行顺序有严格要求时,或者需要避免线程饥饿时,可以使用公平锁。

示例
在Java中,ReentrantLock可以通过构造函数指定为公平锁:

1
ReentrantLock fairLock = new ReentrantLock(true); // true表示公平锁

6.2.2、非公平锁(Non-fair Lock)

定义:非公平锁是指多个线程获取锁的顺序不一定是按照请求锁的顺序,即线程可以插队获取锁。

特点

  • 无序性:线程获取锁的顺序不固定,新来的线程有可能在当前持有锁的线程释放锁后立即获取锁,而不需要排队。
  • 性能优势:由于不需要维护有序队列,非公平锁的性能通常比公平锁好,尤其是在高并发环境下,因为减少了线程上下文切换的开销。
  • 可能的饥饿问题:某些线程可能会长时间无法获得锁,导致线程饥饿。

适用场景

  • 当系统对线程的执行顺序没有严格要求,且希望提高并发性能时,可以使用非公平锁。

示例
在Java中,ReentrantLock默认是非公平锁:

1
ReentrantLock nonFairLock = new ReentrantLock(false); // false表示非公平锁,默认就是false

6.2.3、总结

  • 公平锁:按照请求顺序获取锁,避免线程饥饿,但性能较差。
  • 非公平锁:不按照请求顺序获取锁,性能较好,但可能导致线程饥饿。

选择公平锁还是非公平锁取决于具体的应用场景和性能需求。在大多数情况下,非公平锁是更好的选择,因为它提供了更好的性能。但在某些需要严格顺序控制或避免线程饥饿的场景中,公平锁可能更合适。

6.3、可重入锁(Reentrant Lock)

定义:可重入锁是一种特殊的锁机制,它允许同一个线程多次获取同一个锁,而不会导致死锁。换句话说,如果一个线程已经持有了某个锁,它可以再次获取该锁而不会被阻塞。

特点

  • 可重入性:同一个线程可以多次获取同一个锁,而不会被自己阻塞。
  • 计数机制:每次线程获取锁时,锁的计数器会加1;每次线程释放锁时,计数器会减1。只有当计数器归零时,锁才会被完全释放。
  • 避免死锁:可重入锁避免了线程在获取自己已经持有的锁时发生死锁的情况。

适用场景

  • 递归调用:在递归方法中,同一个线程可能会多次进入需要加锁的代码块。
  • 嵌套锁:在复杂的并发场景中,同一个线程可能会在不同的代码块中多次获取同一个锁。

示例
在Java中,ReentrantLock是一个典型的可重入锁实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();

public void outerMethod() {
lock.lock();
try {
System.out.println("Outer method acquired the lock.");
innerMethod(); // 调用另一个需要加锁的方法
} finally {
lock.unlock();
}
}

public void innerMethod() {
lock.lock();
try {
System.out.println("Inner method acquired the lock.");
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
ReentrantLockExample example = new ReentrantLockExample();
example.outerMethod();
}
}

在这个例子中,outerMethodinnerMethod都使用了同一个ReentrantLock实例。由于ReentrantLock是可重入的,innerMethod可以在outerMethod已经持有锁的情况下再次获取锁,而不会导致死锁。

可重入锁的优势

  1. 避免死锁:可重入锁允许线程多次获取同一个锁,避免了因递归调用或嵌套锁导致的死锁问题。
  2. 简化编程:开发人员不需要担心同一个线程在不同代码块中多次获取锁的问题,简化了并发编程的复杂性。
  3. 灵活性:可重入锁提供了更高的灵活性,适用于各种复杂的并发场景。

总结

可重入锁是一种允许同一个线程多次获取同一个锁的机制,它通过计数机制来管理锁的获取和释放,避免了死锁问题,简化了并发编程的复杂性。在Java中,ReentrantLock是一个典型的可重入锁实现,广泛应用于各种并发场景中。

6.3.1、synchronized可重用锁(隐式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Created By Shiguang On 2024/12/9 20:09
* 可重用锁
*/
public class SyncLockDemo {

public synchronized void add(){
add();
}
public static void main(String[] args) {
// Object obj = new Object();
// new Thread(() -> {
// synchronized (obj) {
// System.out.println(Thread.currentThread().getName() + " 外层");
//
// synchronized (obj) {
// System.out.println(Thread.currentThread().getName() + " 中层");
//
// synchronized (obj) {
// System.out.println(Thread.currentThread().getName() + " 内层");
// }
// }
// }
// }, "t1").start();

new SyncLockDemo().add();


}
}

6.3.2、Lock可重用锁(显示)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) {
// Lock演示可重用锁
Lock lock = new ReentrantLock();
new Thread(() -> {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 外层");

try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 内层");
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}

}, "t1").start();

new Thread(() -> {
try {
lock.lock();
System.out.println(Thread.currentThread().getName());
} finally {
lock.unlock();
}

}, "t2").start();
}

注意:上锁与解锁需要配套使用,若只有一个线程,不释放锁也可以正常执行,若有多个线程使用同一把锁,不释放锁会阻塞其他线程正常执行。

6.4、死锁(Deadlock)

定义:死锁是指两个或多个线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,这些线程将无法继续执行。

死锁的四个必要条件

  1. 互斥条件:资源不能被共享,只能由一个线程使用。
  2. 请求与保持条件:线程已经持有一个资源,但又提出了新的资源请求,而该资源被其他线程占用,此时请求线程阻塞,但对自己已获得的资源保持不放。
  3. 不剥夺条件:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只能由自己释放。
  4. 循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系。

6.4.1、死锁的示例

以下是一个简单的Java示例,展示了如何通过不当的资源管理导致死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class DeadlockExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();

public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("Thread 1 acquired lock1");
try {
Thread.sleep(100); // 模拟一些工作
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock2) {
System.out.println("Thread 1 acquired lock2");
}
}
});

Thread thread2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("Thread 2 acquired lock2");
try {
Thread.sleep(100); // 模拟一些工作
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock1) {
System.out.println("Thread 2 acquired lock1");
}
}
});

thread1.start();
thread2.start();
}
}

在这个例子中,thread1thread2分别尝试获取lock1lock2,但由于它们的获取顺序不同,最终会导致死锁:

  • thread1先获取lock1,然后尝试获取lock2
  • thread2先获取lock2,然后尝试获取lock1

由于两个线程都在等待对方释放资源,因此它们都无法继续执行,形成了死锁。

6.4.2、如何避免死锁

  1. 避免嵌套锁:尽量减少锁的嵌套使用,避免多个线程同时持有多个锁。
  2. 统一锁顺序:如果必须使用多个锁,确保所有线程以相同的顺序获取锁,避免循环等待。
  3. 使用定时锁:在获取锁时设置超时时间,如果无法在规定时间内获取锁,则放弃并重试。
  4. 使用可重入锁:可重入锁允许线程多次获取同一个锁,避免了因递归调用或嵌套锁导致的死锁问题。
  5. 资源分级:将资源进行分级,高优先级的资源先获取,低优先级的资源后获取。

以下是一个避免死锁的示例,通过统一锁顺序来避免死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class DeadlockAvoidanceExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();

public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("Thread 1 acquired lock1");
try {
Thread.sleep(100); // 模拟一些工作
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock2) {
System.out.println("Thread 1 acquired lock2");
}
}
});

Thread thread2 = new Thread(() -> {
synchronized (lock1) {
System.out.println("Thread 2 acquired lock1");
try {
Thread.sleep(100); // 模拟一些工作
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock2) {
System.out.println("Thread 2 acquired lock2");
}
}
});

thread1.start();
thread2.start();
}
}

在这个例子中,thread1thread2都以相同的顺序获取lock1lock2,从而避免了死锁。

6.4.3、如何检测死锁

可以使用 jspjstack 命令检测,需要将jdk的bin目录添加到系统环境变量

image-20241209204549535

执行java程序后,在终端执行命令jps -l

image-20241209204421998

找到要检测的程序进程号,例如此处我要查看的是com.shiguang.sync.DeadlockExample这个类,可以看到进程号是11012,使用 jstack 11012 命令,可以看到输出了Found 1 deadlock.

image-20241209204850678

7、Callable接口

目前我们学习了有两种创建线程的方法-一种是通过创建Thread类,另一种是通过使用Runnable创建线程。但是,Runnable缺少的一项功能是,当线程终止时(即run()完成时),我们无法使线程返回结果。为了支持此功能,Java中提供了Callable接口。

7.1、测试案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.shiguang.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
* Created By Shiguang On 2024/12/10 9:34
* 比较Runnable和Callable的区别
*/
class MyThread1 implements Runnable {
@Override
public void run() {

}
}


class MyThread2 implements Callable {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+" come in callable");
return 200;
}
}

public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new MyThread1(), "A").start();

// 不能直接通过new Thread 调用call方法,
// 需要借助FutureTask,FutureTask是Runnable的实现类,其构造可以接受Callable的实现类
FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread2());

//lambda表达式
FutureTask<Integer> futureTask2 = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName()+" come in callable");
return 1024;
});

new Thread(futureTask2, "lucy").start();
new Thread(futureTask1, "mary").start();

// while (!futureTask2.isDone()) {
// // 等待callable执行完成
// System.out.println("waiting...");
// }

System.out.println(futureTask2.get());

// 再次调用执行结果,会直接返回缓存的结果,不会重新调用callable
// System.out.println(futureTask2.get());

System.out.println(futureTask1.get());

System.out.println(Thread.currentThread().getName()+" come in main over");
}
}

7.2、Callable接口的特点如下(重点)

  • 为了实现Runnable,需要实现不返回任何内容的run()方法,而对于Callable,需要实现在完成时返回结果的call()方法。
  • cal()方法可以引发异常,而run()则不能。
  • 为实现Callable而必须重写call方法。

7.3、Callable 与 Runnable 的区别

特性 Callable Runnable
实现方式 call()方法 run()方法
返回值 可以返回结果(通过 call() 方法) 无返回值(run() 方法无返回值)
异常处理 可以抛出受检异常(call() 方法) 不能抛出受检异常(run() 方法)
使用场景 需要返回结果或可能抛出异常的任务 不需要返回结果的任务

7.4、FutureTask介绍

FutureTask 是 Java 并发编程中的一个重要类,位于 java.util.concurrent 包中。它是 Future 接口的实现类,同时也是 Runnable 接口的实现类。FutureTask 的主要作用是封装一个可以异步执行的任务,并提供对任务执行状态和结果的管理。

FutureTask 的定义**

FutureTask 实现了以下接口:

  • RunnableFutureRunnableFuture 接口继承了 RunnableFuture 接口,因此 FutureTask 既可以作为任务被执行,也可以管理任务的状态和结果。

FutureTask 的构造方法可以接受一个 CallableRunnable 对象:

1
2
3
4
5
6
7
public FutureTask(Callable<V> callable) {
// 构造方法
}

public FutureTask(Runnable runnable, V result) {
// 构造方法
}

FutureTask 的核心功能**

FutureTask 的主要功能包括:

  1. 任务执行FutureTask 可以封装一个 CallableRunnable 任务,并由线程池或单独的线程执行。
  2. 状态管理FutureTask 管理任务的执行状态,包括:
    • 未启动:任务还未开始执行。
    • 运行中:任务正在执行。
    • 已完成:任务执行完成(正常完成、异常完成或被取消)。
  3. 结果获取:通过 Future 接口的方法(如 get())获取任务的执行结果。
  4. 取消任务:可以通过 cancel() 方法取消任务的执行。

FutureTask 的核心功能**

FutureTask 的主要功能包括:

  1. 任务执行FutureTask 可以封装一个 CallableRunnable 任务,并由线程池或单独的线程执行。
  2. 状态管理FutureTask 管理任务的执行状态,包括:
    • 未启动:任务还未开始执行。
    • 运行中:任务正在执行。
    • 已完成:任务执行完成(正常完成、异常完成或被取消)。
  3. 结果获取:通过 Future 接口的方法(如 get())获取任务的执行结果。
  4. 取消任务:可以通过 cancel() 方法取消任务的执行。

8、JUC强大的辅助类

8.1、减少技术CountDownLatch

CountDownLatch类可以设置一个计数器,然后通过countDown()方法来进行减1的操作,使用await()方法等待计数器不大于0,然后继续执行await()方法之后的语句。

  • CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)。
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。

8.1.1、锁门案例

要求:教室内有一个班长和6名同学,6名同学陆续离开教室,全部离开教室后班长进行锁门操作。

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
// 6个同学陆续离开教室后,班长才锁门
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");
}, String.valueOf(i)).start();
}

System.out.println(Thread.currentThread().getName()+" 班长锁门了");
}

执行结果:

image-20241210103136378

可以看到,其他同学还没有离开教室班长就已经锁门了,不符合我们的预期要求,接下来再看下使用CountDownLatch来实现该案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws InterruptedException {
// 6个同学陆续离开教室后,班长才锁门

// 创建CountDownLatch对象,设置初始值为6
CountDownLatch countDownLatch = new CountDownLatch(6);

for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 号同学离开了教室");
// 计数器减一
countDownLatch.countDown();
}, String.valueOf(i)).start();
}


// 等待计数器归零
countDownLatch.await();

System.out.println(Thread.currentThread().getName() + " 班长锁门了");
}

运行结果:

image-20241210103652803

可以看到,同学离开的顺序随机,全部同学离开后班长锁门,符合我们的预期。

8.2、循环栅栏CyclicBarrier

CyclicBarrier看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier的构造方法第一个参数是目标障碍数,每次执行CyclicBarrier一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await(之后的语句。可以将CyclicBarrier理解为加1操作。

8.2.1、召唤神龙案例

要求:集齐7颗龙珠后方可召唤神龙

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CyclicBarrierDemo {
public static final int NUMBER = 7;

public static void main(String[] args) {
// 集齐7颗龙珠后方可召唤神龙
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {
System.out.println("龙珠已集齐,召唤神龙");
});

// 集齐7颗龙珠的过程
for (int i = 1; i <= 7 ; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 星龙被收集到了");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}

执行结果:

image-20241210105053980

8.3、信号灯Semaphore

8.3.1、案例

要求:6辆车,停3个车位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) {
// 6辆车,停3个车位

// 创建Semaphore对象,设置许可数量
Semaphore semaphore = new Semaphore(3);

// 模拟6辆车
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
// 抢占
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 抢到了车位");

// 设置随机停车时间
// Thread.sleep((long) (Math.random() * 1000));
TimeUnit.SECONDS.sleep(new Random().nextInt(5));

System.out.println(Thread.currentThread().getName() + " >>>离开了车位");

} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放
semaphore.release();
}
}, String.valueOf(i)).start();
}

}

运行结果:

image-20241210110332953

9、ReentrantReadWriteLock读写锁

9.1、乐观锁和悲观锁

在并发编程中,乐观锁(Optimistic Locking)悲观锁(Pessimistic Locking) 是两种常见的并发控制策略,用于解决多个线程对共享资源的访问冲突问题。它们的区别主要在于对资源竞争的态度和处理方式。

9.1.1、悲观锁(Pessimistic Locking)

基本概念

悲观锁是一种“悲观”的并发控制策略,假设在任何时候都可能发生资源竞争,因此在访问共享资源时,会立即加锁以防止其他线程访问。

工作原理

  • 加锁:在访问共享资源之前,先获取锁。
  • 独占访问:在持有锁的期间,其他线程无法访问该资源。
  • 解锁:操作完成后,释放锁,允许其他线程获取锁。

实现方式

悲观锁通常通过以下方式实现:

  • 数据库锁:如行锁、表锁。
  • Java 锁:如 synchronized 关键字、ReentrantLock

优点

  • 安全性高:能够有效避免资源竞争问题,保证数据一致性。
  • 简单易用:实现简单,适合对资源竞争频繁的场景。

缺点

  • 性能开销:频繁加锁和解锁会导致性能下降。
  • 并发性低:独占访问会降低系统的并发性能。

适用场景

  • 写操作频繁:当写操作较多时,悲观锁可以有效避免数据冲突。
  • 资源竞争激烈:当多个线程频繁竞争同一资源时。

image-20241210110635201

9.1.2、乐观锁(Optimistic Locking)

基本概念

乐观锁是一种“乐观”的并发控制策略,假设在大多数情况下不会发生资源竞争,因此在访问共享资源时不会立即加锁,而是在提交操作时检查是否有冲突。

工作原理

  • 无锁访问:在访问共享资源时,不加锁,允许多个线程同时访问。
  • 冲突检测:在提交操作时,检查资源是否被其他线程修改。
  • 重试机制:如果检测到冲突,则回滚操作并重试。

实现方式

乐观锁通常通过以下方式实现:

  • 版本号机制:在数据中添加一个版本号字段,每次更新时检查版本号是否一致。
  • CAS(Compare-And-Swap):通过硬件级别的原子操作实现无锁更新。

优点

  • 性能高:在资源竞争不激烈的情况下,性能优于悲观锁。
  • 并发性好:允许多个线程同时访问资源,提高了并发性能。

缺点

  • 复杂性高:需要实现冲突检测和重试机制,实现较为复杂。
  • 适用性有限:适用于读多写少的场景,写操作频繁时性能可能下降。

适用场景

  • 读操作频繁:当读操作远远多于写操作时,乐观锁可以显著提高性能。
  • 资源竞争不激烈:当多个线程对同一资源的竞争较少时。

image-20241210110713930

9.1.3、乐观锁与悲观锁的对比

特性 悲观锁(Pessimistic Locking) 乐观锁(Optimistic Locking)
资源访问 访问前加锁,独占访问 访问时不加锁,允许多线程同时访问
冲突处理 通过加锁避免冲突 通过冲突检测和重试机制处理冲突
性能 写操作频繁时性能较好 读操作频繁时性能较好
实现复杂性 实现简单,适合资源竞争频繁的场景 实现复杂,适合资源竞争不激烈的场景
适用场景 写操作频繁、资源竞争激烈的场景 读操作频繁、资源竞争不激烈的场景

9.1.4、乐观锁的实现示例(版本号机制)

以下是一个使用版本号机制实现乐观锁的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class OptimisticLockExample {
private int data;
private int version;

public OptimisticLockExample(int data) {
this.data = data;
this.version = 0;
}

public boolean updateData(int newData, int expectedVersion) {
if (this.version == expectedVersion) {
// 版本号匹配,更新数据
this.data = newData;
this.version++; // 更新版本号
return true;
} else {
// 版本号不匹配,更新失败
return false;
}
}

public static void main(String[] args) {
OptimisticLockExample example = new OptimisticLockExample(100);

// 线程1尝试更新数据
new Thread(() -> {
boolean success = example.updateData(200, 0);
System.out.println("Thread 1 update success: " + success);
}).start();

// 线程2尝试更新数据
new Thread(() -> {
boolean success = example.updateData(300, 0);
System.out.println("Thread 2 update success: " + success);
}).start();
}
}

9.1.5、悲观锁的实现示例(synchronized

以下是一个使用 synchronized 实现悲观锁的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class PessimisticLockExample {
private int data;

public PessimisticLockExample(int data) {
this.data = data;
}

public synchronized void updateData(int newData) {
// 加锁,独占访问
this.data = newData;
}

public synchronized int readData() {
// 加锁,独占访问
return this.data;
}

public static void main(String[] args) {
PessimisticLockExample example = new PessimisticLockExample(100);

// 线程1更新数据
new Thread(() -> {
example.updateData(200);
System.out.println("Thread 1 updated data to: " + example.readData());
}).start();

// 线程2读取数据
new Thread(() -> {
System.out.println("Thread 2 read data: " + example.readData());
}).start();
}
}

9.1.6、总结

  • 悲观锁:假设资源竞争频繁,通过加锁避免冲突,适合写操作频繁的场景。
  • 乐观锁:假设资源竞争不频繁,通过冲突检测处理冲突,适合读操作频繁的场景。

在实际应用中,选择乐观锁还是悲观锁需要根据具体的业务场景和资源竞争情况来决定。如果写操作较多,悲观锁是更好的选择;如果读操作较多,乐观锁可以显著提高性能。

9.2、表锁和行锁

在数据库管理系统(DBMS)中,表锁(Table Lock)行锁(Row Lock) 是两种常见的锁机制,用于控制并发事务对数据库表和行的访问。它们的主要目的是确保数据的一致性和完整性,同时避免并发操作导致的冲突问题。

9.2.1、表锁(Table Lock)

基本概念

表锁是对整个数据库表加锁的机制。当一个事务对某个表加锁时,其他事务无法对该表进行某些操作,直到锁被释放。

工作原理

  • 加锁:事务在操作表之前,先对整个表加锁。
  • 独占访问:在锁被持有的期间,其他事务无法对该表进行某些操作(如写操作)。
  • 解锁:事务操作完成后,释放锁,允许其他事务访问。

表锁的类型

表锁可以分为以下两种类型:

  1. 共享锁(Shared Lock,S锁)
    • 允许多个事务同时持有共享锁,用于读操作。
    • 持有共享锁的事务可以读取表,但不能修改表。
  2. 排他锁(Exclusive Lock,X锁)
    • 只允许一个事务持有排他锁,用于写操作。
    • 持有排他锁的事务可以读取和修改表,其他事务无法获取任何类型的锁。

优点

  • 简单易用:实现简单,适合对整个表的操作。
  • 安全性高:能够有效避免表级别的冲突。

缺点

  • 并发性低:锁的粒度较大,可能导致其他事务长时间等待。
  • 性能开销:频繁加锁和解锁会导致性能下降。

适用场景

  • 全表操作:如批量插入、删除或更新整个表。
  • 资源竞争激烈:当多个事务频繁竞争整个表时。

9.2.2、行锁(Row Lock)

基本概念

行锁是对数据库表中的某一行或多行加锁的机制。当一个事务对某一行加锁时,其他事务无法对该行进行某些操作,直到锁被释放。

工作原理

  • 加锁:事务在操作某一行之前,先对该行加锁。
  • 独占访问:在锁被持有的期间,其他事务无法对该行进行某些操作(如写操作)。
  • 解锁:事务操作完成后,释放锁,允许其他事务访问。

行锁的类型

行锁的类型与表锁类似,分为以下两种:

  1. 共享锁(Shared Lock,S锁)
    • 允许多个事务同时持有共享锁,用于读操作。
    • 持有共享锁的事务可以读取行,但不能修改行。
  2. 排他锁(Exclusive Lock,X锁)
    • 只允许一个事务持有排他锁,用于写操作。
    • 持有排他锁的事务可以读取和修改行,其他事务无法获取任何类型的锁。

优点

  • 并发性高:锁的粒度较小,允许多个事务同时操作不同的行。
  • 性能较好:在大多数情况下,行锁的性能优于表锁。

缺点

  • 复杂性高:实现和管理行锁比表锁更复杂。
  • 死锁风险:由于锁的粒度较小,可能导致死锁问题。

适用场景

  • 单行操作:如更新、删除或查询单行数据。
  • 资源竞争不激烈:当多个事务对不同行的竞争较少时。

9.2.3、 表锁与行锁的对比

特性 表锁(Table Lock) 行锁(Row Lock)
锁的粒度 锁的粒度较大,对整个表加锁 锁的粒度较小,对单行或多行加锁
并发性 并发性较低,可能导致其他事务长时间等待 并发性较高,允许多个事务同时操作不同行
性能 性能较低,频繁加锁和解锁会导致性能下降 性能较高,锁的粒度较小,性能较好
复杂性 实现简单,管理较容易 实现复杂,管理较困难,可能引发死锁
适用场景 全表操作、资源竞争激烈的场景 单行操作、资源竞争不激烈的场景

9.2.4、 表锁与行锁的使用示例

表锁示例

在 MySQL 中,可以通过 LOCK TABLES 语句显式地对表加锁:

1
2
3
4
5
6
7
8
-- 对表加共享锁
LOCK TABLES my_table READ;

-- 对表加排他锁
LOCK TABLES my_table WRITE;

-- 解锁
UNLOCK TABLES;

行锁示例

在 MySQL 中,行锁通常由事务自动管理。例如,在 InnoDB 存储引擎中,行锁会在事务执行时自动加锁:

1
2
3
4
5
6
7
8
9
10
11
-- 开启事务
START TRANSACTION;

-- 对某一行加排他锁
SELECT * FROM my_table WHERE id = 1 FOR UPDATE;

-- 更新数据
UPDATE my_table SET column1 = 'new_value' WHERE id = 1;

-- 提交事务,释放锁
COMMIT;

9.2.5、 表锁与行锁的实际应用

表锁的应用

  • 批量操作:如批量插入、删除或更新整个表。
  • 数据备份:在备份数据时,对表加锁以确保数据一致性。

行锁的应用

  • 事务隔离:在事务中对单行数据进行更新或删除操作。
  • 并发控制:在多用户并发访问数据库时,确保单行数据的一致性。

9.2.6、 总结

  • 表锁:锁的粒度较大,适合全表操作和资源竞争激烈的场景,但并发性较低。
  • 行锁:锁的粒度较小,适合单行操作和资源竞争不激烈的场景,并发性较高。

在实际应用中,选择表锁还是行锁需要根据具体的业务场景和资源竞争情况来决定。如果需要对整个表进行操作,表锁是更好的选择;如果需要对单行数据进行操作,行锁可以显著提高并发性能。

9.3、读写锁

在并发编程中,读锁(Read Lock)写锁(Write Lock) 是用于控制对共享资源的访问的机制。它们通常用于实现 读写锁(Read-Write Lock),这是一种特殊的锁机制,允许多个线程同时读取共享资源,但在写操作时需要独占访问。

9.3.1、读写锁的基本概念

读写锁是一种分离锁(Split Lock),它将锁的粒度分为两种:

  • 读锁(Read Lock):允许多个线程同时持有读锁(又称为共享锁),只要没有线程持有写锁。
  • 写锁(Write Lock):只允许一个线程持有写锁(又称为排他锁),且在持有写锁时,其他线程无法持有读锁或写锁。

读写锁的核心思想是:

  • 读操作:通常是安全的,允许多个线程并发读取共享资源。
  • 写操作:通常是独占的,需要保证写操作的线程安全。

9.3.2、读写锁的优点

  1. 提高并发性:允许多个线程同时读取共享资源,从而提高并发性能。
  2. 降低锁竞争:读操作之间不需要互斥,减少了锁的竞争。
  3. 灵活性:适用于读多写少的场景,能够更好地平衡性能和线程安全。

9.3.3、读写锁的实现

在 Java 中,读写锁的实现主要依赖于 java.util.concurrent.locks.ReentrantReadWriteLock 类。它提供了读锁和写锁的实现。

ReentrantReadWriteLock 类**

ReentrantReadWriteLock 是 Java 提供的读写锁实现类,它实现了 ReadWriteLock 接口。

1
2
3
4
public interface ReadWriteLock {
Lock readLock(); // 获取读锁
Lock writeLock(); // 获取写锁
}

读锁(Read Lock)

  • 允许多个线程同时持有:只要没有线程持有写锁。
  • 获取读锁:调用 readLock().lock()
  • 释放读锁:调用 readLock().unlock()

写锁(Write Lock)

  • 只允许一个线程持有:在持有写锁时,其他线程无法持有读锁或写锁。
  • 获取写锁:调用 writeLock().lock()
  • 释放写锁:调用 writeLock().unlock()

9.3.4、读写锁的使用示例

以下是一个使用 ReentrantReadWriteLock 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockExample {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private int sharedData = 0;

// 读操作
public int readData() {
rwLock.readLock().lock(); // 获取读锁
try {
System.out.println(Thread.currentThread().getName() + " is reading: " + sharedData);
return sharedData;
} finally {
rwLock.readLock().unlock(); // 释放读锁
}
}

// 写操作
public void writeData(int newValue) {
rwLock.writeLock().lock(); // 获取写锁
try {
System.out.println(Thread.currentThread().getName() + " is writing: " + newValue);
sharedData = newValue;
} finally {
rwLock.writeLock().unlock(); // 释放写锁
}
}

public static void main(String[] args) {
ReadWriteLockExample example = new ReadWriteLockExample();

// 创建多个读线程
for (int i = 0; i < 5; i++) {
new Thread(() -> {
example.readData();
}).start();
}

// 创建一个写线程
new Thread(() -> {
example.writeData(100);
}).start();
}
}

课程案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//资源类
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();

//写操作
public void put(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在写入:" + key);
TimeUnit.MILLISECONDS.sleep(300);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入完成:" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}

}

//读操作
public Object get(String key) {
rwLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName() + " 正在读取>>:" + key);
TimeUnit.MILLISECONDS.sleep(300);
result = map.get(key);
System.out.println(Thread.currentThread().getName() + " 读取完成>>:" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}

return result;
}
}

public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();

// 存数据
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
myCache.put(Thread.currentThread().getName(), Thread.currentThread().getName());
}, String.valueOf(i)).start();

}

// 取数据
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
myCache.get(Thread.currentThread().getName());
}, String.valueOf(i)).start();
}
}
}

9.3.5、读写锁的规则

  1. 读锁规则
    • 允许多个线程同时持有读锁。
    • 如果一个线程持有写锁,其他线程无法获取读锁。
  2. 写锁规则
    • 只允许一个线程持有写锁。
    • 如果一个线程持有写锁,其他线程无法获取读锁或写锁。
  3. 读写互斥
    • 读锁和写锁是互斥的,即持有读锁的线程无法获取写锁,反之亦然。

9.3.6、读写锁的适用场景

读写锁适用于以下场景:

  1. 读多写少:当读操作远远多于写操作时,读写锁可以显著提高并发性能。
  2. 共享资源访问:需要对共享资源进行并发读写操作的场景。
  3. 缓存系统:缓存系统通常需要频繁读取数据,但写操作较少。

9.3.7、读写锁的缺点

  1. 复杂性:相比于普通锁(如 ReentrantLock),读写锁的实现和使用更复杂。
  2. 饥饿问题:在某些情况下,写锁可能会因为频繁的读操作而长时间无法获取锁,导致写线程饥饿。
  3. 性能开销:读写锁的实现比普通锁更复杂,可能会带来一定的性能开销。

9.3.8、读写锁与其他锁的对比

特性 读写锁(Read-Write Lock) 普通锁(ReentrantLock)
并发性 允许多个线程同时读取共享资源 只允许一个线程持有锁
互斥性 读锁之间不互斥,读写锁之间互斥 所有操作都需要互斥
适用场景 读多写少的场景 读写操作均衡的场景
性能 在读多写少时性能更好 在读写操作均衡时性能更好

9.3.9、读写锁降级

15-读写锁降级

读写锁的降级(Read-Write Lock Downgrade) 是指在持有写锁的情况下,将写锁降级为读锁的过程。这种操作通常用于并发编程中,以提高系统的并发性能和灵活性。

读写锁降级的概念

在某些场景下,一个线程可能需要先对共享资源进行写操作,然后继续对该资源进行读操作。为了避免在写操作完成后,其他线程无法获取读锁,可以通过降级操作将写锁转换为读锁。

  • 降级过程
    1. 线程先获取写锁。
    2. 完成写操作后,释放写锁。
    3. 立即获取读锁。
    4. 继续进行读操作。

读写锁降级的优点

  1. 提高并发性:降级操作允许其他线程在写操作完成后立即获取读锁,从而提高系统的并发性能。
  2. 灵活性:降级操作提供了更灵活的锁管理方式,适用于需要在写操作后继续读取资源的场景。
  3. 避免饥饿:降级操作可以避免写锁长时间占用资源,导致其他线程无法获取读锁或写锁。

读写锁降级的实现

在 Java 中,读写锁的降级可以通过 ReentrantReadWriteLock 实现。以下是降级的基本步骤:

  1. 获取写锁:调用 writeLock().lock()
  2. 执行写操作:对共享资源进行写操作。
  3. 释放写锁:调用 writeLock().unlock()
  4. 获取读锁:调用 readLock().lock()
  5. 执行读操作:对共享资源进行读操作。
  6. 释放读锁:调用 readLock().unlock()

读写锁降级的示例

以下是一个使用 ReentrantReadWriteLock 实现读写锁降级的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDowngradeExample {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private int sharedData = 0;

public void downgradeLock() {
// 获取写锁
rwLock.writeLock().lock();
try {
// 执行写操作
sharedData = 100;
System.out.println(Thread.currentThread().getName() + " is writing: " + sharedData);

// 降级为读锁
rwLock.readLock().lock();
} finally {
// 释放写锁
rwLock.writeLock().unlock();
}

try {
// 执行读操作
System.out.println(Thread.currentThread().getName() + " is reading: " + sharedData);
} finally {
// 释放读锁
rwLock.readLock().unlock();
}
}

public static void main(String[] args) {
ReadWriteLockDowngradeExample example = new ReadWriteLockDowngradeExample();

// 创建一个线程执行降级操作
new Thread(() -> {
example.downgradeLock();
}).start();
}
}

读写锁降级的适用场景

  1. 写操作后需要读操作:当一个线程在完成写操作后,需要继续读取资源时。
  2. 提高并发性:在写操作完成后,允许其他线程获取读锁,从而提高系统的并发性能。
  3. 避免锁竞争:通过降级操作,减少写锁的持有时间,避免其他线程长时间等待。

读写锁降级的注意事项

  1. 顺序问题:必须先释放写锁,再获取读锁,否则会导致死锁。
  2. 线程安全:降级操作必须在同一个线程中完成,否则可能导致并发问题。
  3. 性能开销:频繁的锁降级操作可能会带来一定的性能开销,需要根据具体场景权衡。

读写锁降级是一种在持有写锁的情况下,将写锁降级为读锁的操作。它能够提高系统的并发性能,避免锁竞争,适用于写操作后需要继续读取资源的场景。在实现降级操作时,需要注意顺序问题和线程安全,确保操作的正确性。

3.9.10、读写锁演变

14-读写锁演变

10、阻塞队列(BlockingQueue)

为什么需要BlockingQueue?

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

10.1、BlockingQueue简介

阻塞队列(Blocking Queue) 是 Java 并发编程中的一个重要工具,位于 java.util.concurrent 包中。它是一种特殊的队列,支持在队列为空时阻塞消费者线程,或在队列满时阻塞生产者线程。阻塞队列常用于生产者-消费者模型中,能够有效解决线程间的协作问题。

阻塞队列是一种线程安全的队列,它提供了以下核心功能:

  1. 阻塞操作
    • 队列为空时:消费者线程尝试从队列中获取元素时会被阻塞,直到队列中有可用元素。
    • 队列满时:生产者线程尝试向队列中添加元素时会被阻塞,直到队列中有可用空间。
  2. 线程安全:阻塞队列是线程安全的,多个线程可以并发地进行操作。
  3. 边界性:阻塞队列可以是有界的(固定容量)或无界的(容量无限)

阻塞队列,顾名思义,首先它是一个队列,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出。

image-20241210143211999

  • 当队列是空的,从队列中获取元素的操作将会被阻塞,直到其他线程往空的队列插入新的元素
  • 当队列是满的,从队列中添加元素的操作将会被阻塞,直到其他线程从队列中移除一个或多
    个元素或者完全清空,使队列变得空闲

常用的队列主要有以下两种

  • 先进先出(First In First Out,FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
  • 后进先出(Last In First OUt, LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)

阻塞队列的优点

  1. 线程安全:阻塞队列是线程安全的,无需手动加锁。
  2. 简化并发编程:提供了阻塞和超时机制,简化了生产者-消费者模型的实现。
  3. 高效协作:能够有效解决生产者和消费者之间的协作问题。
  4. 灵活性:提供了多种实现类,适用于不同的场景。

阻塞队列的缺点

  1. 性能开销:阻塞队列的实现通常基于锁机制,可能会带来一定的性能开销。
  2. 死锁风险:在复杂的并发场景中,可能会出现死锁问题。
  3. 适用性有限:阻塞队列适用于生产者-消费者模型,但在其他场景中可能不够灵活。

阻塞队列的适用场景

  1. 生产者-消费者模型:阻塞队列是实现生产者-消费者模型的理想工具。
  2. 任务调度:在任务调度系统中,阻塞队列可以用于存储待执行的任务。
  3. 缓存系统:在缓存系统中,阻塞队列可以用于存储缓存数据。
  4. 消息队列:在消息队列系统中,阻塞队列可以用于存储消息。

阻塞队列与其他队列的对比

特性 阻塞队列(Blocking Queue) 普通队列(Queue)
线程安全
阻塞机制 支持阻塞和超时机制 不支持
适用场景 生产者-消费者模型、任务调度等 单线程场景
性能 在多线程场景下性能较好 在单线程场景下性能较好

10.2、阻塞队列的使用示例

以下是一个使用 ArrayBlockingQueue 的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {
public static void main(String[] args) {
// 创建一个容量为 3 的阻塞队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

// 生产者线程
Thread producer = new Thread(() -> {
try {
queue.put("Task 1");
System.out.println("Produced: Task 1");
queue.put("Task 2");
System.out.println("Produced: Task 2");
queue.put("Task 3");
System.out.println("Produced: Task 3");
queue.put("Task 4"); // 队列已满,阻塞
System.out.println("Produced: Task 4");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

// 消费者线程
Thread consumer = new Thread(() -> {
try {
Thread.sleep(1000); // 模拟消费延迟
System.out.println("Consumed: " + queue.take());
System.out.println("Consumed: " + queue.take());
System.out.println("Consumed: " + queue.take());
System.out.println("Consumed: " + queue.take()); // 队列为空,阻塞
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

producer.start();
consumer.start();
}
}

10.3、BlockingQueue核心方法

image-20241210151618241

BlockingQueue 是阻塞队列的核心接口,定义了阻塞队列的基本操作。它的主要方法包括:

  • 插入元素
    • boolean add(E e):插入元素,如果队列已满则抛出异常。
    • boolean offer(E e):插入元素,如果队列已满则返回 false
    • void put(E e) throws InterruptedException:插入元素,如果队列已满则阻塞。
    • boolean offer(E e, long timeout, TimeUnit unit):插入元素,如果队列已满则等待指定时间。
  • 移除元素
    • E remove():移除元素,如果队列为空则抛出异常。
    • E poll():移除元素,如果队列为空则返回 null
    • E take() throws InterruptedException:移除元素,如果队列为空则阻塞。
    • E poll(long timeout, TimeUnit unit):移除元素,如果队列为空则等待指定时间。
  • 检查元素
    • E element():返回队列头部元素,如果队列为空则抛出异常。
    • E peek():返回队列头部元素,如果队列为空则返回 null

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

// 第一组
// System.out.println(queue.add("a"));
// System.out.println(queue.add("b"));
// System.out.println(queue.add("c"));
//// System.out.println(queue.element());
//
//// System.out.println(queue.add("d")); // 超出容量,抛出异常 IllegalStateException
//
// System.out.println(queue.remove());
// System.out.println(queue.remove());
// System.out.println(queue.remove());
//// System.out.println(queue.remove()); // 队列为空时,抛出异常 NoSuchElementException


// 第二组
// System.out.println(queue.offer("a"));
// System.out.println(queue.offer("b"));
// System.out.println(queue.offer("c"));
// System.out.println(queue.offer("d")); // 超出容量,返回 false
//
// System.out.println(queue.poll());
// System.out.println(queue.poll());
// System.out.println(queue.poll());
// System.out.println(queue.poll()); // 队列为空时,返回 null

// 第三组
// queue.put("a");
// queue.put("b");
// queue.put("c");
//// queue.put("d"); // 超出容量,阻塞
//
// System.out.println(queue.take());
// System.out.println(queue.take());
// System.out.println(queue.take());
// System.out.println(queue.take()); // 队列为空时,阻塞

// 第四组
System.out.println(queue.offer("a"));
System.out.println(queue.offer("b"));
System.out.println(queue.offer("c"));
// System.out.println(queue.offer("d",3L, TimeUnit.SECONDS)); // 超出容量,阻塞, 超时返回 false

System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll(3L, TimeUnit.SECONDS)); // 队列为空时,阻塞, 超时返回 null


}

10.4、常见的BlockingQueue

ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

10.4.1、ArrayBlockingQueue(常用)

基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的
头部和尾部在数组中的位置。

ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还
可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

特点:

  • 基于数组的有界阻塞队列,容量固定。
  • 支持公平性策略(FIFO)。

一句话总结:由数组结构组成的有界阻塞队列。

10.4.2、LinkedBlockingQueue(常用)

基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

特点:

  • 基于链表的无界或固定容量的阻塞队列。
  • 默认是无界的,但可以指定容量。

一句话总结:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。

10.4.3、PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。

因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

特点:

  • 基于优先级的无界阻塞队列。
  • 元素按照优先级排序。

一句话总结:支持优先级排序的无界阻塞队列。

10.4.4、DelayQueue

DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

特点:

  • 延迟队列,元素必须实现 Delayed 接口。
  • 元素只有在延迟到期后才能被消费。

一句话总结:使用优先级队列实现的延迟无界阻塞队列。

10.4.5、SynchronousQueue

一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。

声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。

公平模式和非公平模式的区别:

  • 公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
  • 非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

特点:

  • 同步队列,容量为 0。
  • 生产者和消费者必须配对出现。

一句话总结:不存储元素的阻塞队列,也即单个元素的队列。

10.4.6、LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时
发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。

一句话总结:由链表组成的无界阻塞队列。

10.4.7、LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。

对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况

  • 插入元素时:如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再讲该元素插入,该操作可以通过设置超时参数,超时后返回false表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出InterruptedException异常。
  • 读取元素时:如果当前队列为空会阻塞直到队列不为空然后返回元素,同样可以通过设置超时参数\

特点:

  • 基于链表的双端阻塞队列。
  • 支持从两端插入和移除元素。

一句话总结:由链表组成的双向阻塞队列

11、线程池(ThreadPool)

线程池(Thread Pool) 是 Java 并发编程中的一个重要概念,位于 java.util.concurrent 包中。线程池通过预先创建一组线程并复用它们,避免了频繁创建和销毁线程的开销,从而提高了系统的性能和资源利用率。线程池广泛应用于服务器端编程、任务调度、并发处理等场景。

11.1、 线程池的基本概念

11.1.1、 为什么要使用线程池?

  • 性能优化:创建和销毁线程是昂贵的操作,线程池通过复用线程减少了这些开销。
  • 资源管理:线程池可以限制线程的数量,避免系统资源被耗尽。
  • 任务调度:线程池可以管理和调度任务,确保任务的执行顺序和优先级。

11.1.2、 线程池的核心组件

  1. 线程池管理器:负责创建和管理线程池。
  2. 工作线程:线程池中的线程,负责执行任务。
  3. 任务队列:用于存储待执行的任务。
  4. 拒绝策略:当任务队列满且线程池无法处理更多任务时,拒绝策略决定如何处理新任务。

11.1.3、线程池的优缺点及使用场景

优点:

  1. 性能优化:通过复用线程减少了创建和销毁线程的开销。
  2. 资源管理:限制线程数量,避免系统资源被耗尽。
  3. 任务调度:管理和调度任务,确保任务的执行顺序和优先级。
  4. 灵活性:提供了多种参数和策略,适用于不同的场景。

缺点:

  1. 复杂性:线程池的配置和使用需要一定的经验,配置不当可能导致性能问题。
  2. 死锁风险:在复杂的并发场景中,可能会出现死锁问题。
  3. 适用性有限:线程池适用于任务执行时间较短的场景,对于长时间运行的任务可能不够灵活。

适用场景:

  1. 服务器端编程:如 Web 服务器、数据库连接池等。
  2. 任务调度:如定时任务、批处理任务等。
  3. 并发处理:如多线程下载、并行计算等。
  4. 异步编程:如异步任务处理、事件驱动编程等。

11.2、 线程池的接口和实现类

image-20241210160856756

11.2.1、 Executor 接口

Executor 是线程池的核心接口,定义了任务提交的方法:

1
2
3
public interface Executor {
void execute(Runnable command);
}

11.2.2、 ExecutorService 接口

ExecutorServiceExecutor 的扩展接口,提供了更丰富的功能,如任务提交、任务取消、线程池关闭等:

1
2
3
4
5
6
7
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
}

11.2.3、 常见的实现类

Java 提供了多种线程池的实现类,适用于不同的场景:

  1. ThreadPoolExecutor
    • 最常用的线程池实现类。
    • 支持自定义线程池的参数,如核心线程数、最大线程数、任务队列、拒绝策略等。
  2. ScheduledThreadPoolExecutor
    • 支持定时和周期性任务的线程池。
  3. Executors 工厂类**:
    • 提供了创建常见线程池的便捷方法,如:
      • newFixedThreadPool(int nThreads):创建固定大小的线程池。
      • newCachedThreadPool():创建可缓存的线程池。
      • newSingleThreadExecutor():创建单线程的线程池。
      • newScheduledThreadPool(int corePoolSize):创建支持定时任务的线程池。

11.3、线程池的工作原理

11.3.1、线程池的生命周期

线程池的生命周期包括以下几个阶段:

  1. 创建:初始化线程池,创建核心线程。
  2. 运行:线程池接收任务并分配给工作线程执行。
  3. 关闭:调用 shutdown()shutdownNow() 方法关闭线程池。
  4. 终止:所有任务执行完毕,线程池中的线程被销毁。

11.3.2、任务的执行流程

img

  1. 提交任务:通过 execute()submit() 方法提交任务。
  2. 任务分配
    • 如果线程池中的线程数小于核心线程数,创建新线程执行任务。
    • 如果线程池中的线程数已达到核心线程数,将任务放入任务队列。
    • 如果任务队列已满且线程数小于最大线程数,创建新线程执行任务。
    • 如果任务队列已满且线程数已达到最大线程数,执行拒绝策略。
  3. 任务执行:工作线程从任务队列中获取任务并执行。
  4. 任务完成:任务执行完毕后,线程返回线程池等待下一个任务。

11.4、线程池的参数及自定义线程池

11.4.1、线程池的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

ThreadPoolExecutor 提供了多个参数,用于自定义线程池的行为:

  1. corePoolSize:核心线程数,线程池中始终保持的线程数。
  2. maximumPoolSize:最大线程数,线程池中允许的最大线程数。
  3. keepAliveTime:非核心线程的空闲时间,超过该时间后,非核心线程会被销毁。
  4. unitkeepAliveTime 的时间单位。
  5. workQueue:任务队列,用于存储待执行的任务。
  6. threadFactory:线程工厂,用于创建新线程。
  7. rejectedExecutionHandler:拒绝策略,用于处理无法执行的任务。

11.4.2、自定义线程池

阿里巴巴开发手册要求不允许使用Executors创建线程,而是通过ThreadPoolExecutor创建自定义线程池

image-20241210164416488

下面是一个简单的自定义线程池示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

try {
for (int i = 1; i <= 10 ; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 办理业务");
});
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 关闭线程池
threadPool.shutdown();
}
}

11.5、线程池的拒绝策略

当任务队列满且线程池无法处理更多任务时,拒绝策略决定如何处理新任务。常见的拒绝策略包括:

  1. AbortPolicy:默认策略,直接抛出 RejectedExecutionException 异常。
  2. CallerRunsPolicy:由提交任务的线程执行任务。
  3. DiscardPolicy:直接丢弃任务,不抛出异常。
  4. DiscardOldestPolicy:丢弃任务队列中最旧的任务,然后尝试重新提交新任务。

11.6、线程池的使用示例

以下是一个使用 ThreadPoolExecutor 创建自定义线程池的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.*;

public class ThreadPoolExample {
public static void main(String[] args) {
// 创建自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
10, // 非核心线程的空闲时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(10), // 任务队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

// 提交任务
for (int i = 0; i < 15; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is executing task.");
try {
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

// 关闭线程池
executor.shutdown();
}
}

课程案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
// 第一种: 一池N线程
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);

// 第二种:一池一线程
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();

// 第三种: 一池可扩容线程
ExecutorService threadPool3 = Executors.newCachedThreadPool();

try {
for (int i = 1; i <= 10 ; i++) {
threadPool3.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 办理业务");
});
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 关闭线程池
threadPool3.shutdown();
}

}

12、分支合并框架(Fork/Join)

12.1、Fork/Join框架简介

Fork/Join 框架 是 Java 7 引入的一个并发编程框架,位于 java.util.concurrent 包中。它基于 分治算法(Divide and Conquer),将一个大任务拆分为多个小任务,并行执行这些小任务,最后将结果合并。Fork/Join 框架特别适合处理递归和并行计算问题,如归并排序、快速排序、并行数组处理等。

12.1.1、 分治算法

image-20241210171000245

分治算法的核心思想是将一个大问题分解为多个小问题,分别解决这些小问题,然后将结果合并。Fork/Join 框架通过以下步骤实现分治算法:

  1. Fork(拆分):将大任务拆分为多个小任务。
  2. Join(合并):等待所有小任务完成,并将结果合并。

12.1.2、工作窃取算法(Work-Stealing)

Fork/Join 框架使用 工作窃取算法 来提高任务的执行效率:

  • 每个线程维护一个双端队列(Deque),用于存储任务。
  • 当一个线程的任务队列为空时,它会从其他线程的任务队列末尾“窃取”任务。
  • 这种机制减少了线程的空闲时间,提高了资源利用率。

12.1.3、Fork/Join 框架的优缺点及适用场景

优点:

  1. 高效并行:基于分治算法和工作窃取算法,能够高效地并行处理任务。
  2. 简化并发编程:提供了丰富的 API,简化了并发任务的编写和组合。
  3. 资源管理:通过线程池管理线程,避免了频繁创建和销毁线程的开销。
  4. 适用性广:适用于递归和并行计算问题,如排序、搜索、矩阵运算等。

缺点:

  1. 复杂性:API 较为丰富,初学者可能需要一定时间熟悉。
  2. 性能开销:相比于简单的线程池,Fork/Join 框架的实现较为复杂,可能会带来一定的性能开销。
  3. 适用性有限:适用于递归和并行计算问题,但在简单的场景中可能显得过于复杂。

适用场景:

  1. 递归问题:如归并排序、快速排序、树的遍历等。
  2. 并行计算:如矩阵运算、图像处理、数据分析等。
  3. 大规模数据处理:如大数据集的排序、搜索、聚合等。
  4. 任务拆分:如任务调度、批处理任务等。

12.2、Fork/Join 框架的核心组件

12.2.1、ForkJoinPool

ForkJoinPool 是 Fork/Join 框架的核心类,它是一个特殊的线程池,专门用于执行 ForkJoinTask

  • 创建 ForkJoinPool

    1
    ForkJoinPool pool = new ForkJoinPool();
  • 提交任务

    1
    pool.submit(task);

12.2.2、ForkJoinTask

ForkJoinTask 是 Fork/Join 框架的任务抽象类,表示一个可以被拆分和执行的任务。它有两个主要的子类:

  1. RecursiveAction
    • 表示没有返回值的任务。
    • 适用于不需要返回结果的场景,如文件处理、数据清洗等。
  2. RecursiveTask
    • 表示有返回值的任务。
    • 适用于需要返回结果的场景,如并行计算、排序等。

12.3、Fork/Join 框架的工作原理

12.3.1、任务拆分

  • 任务通过 fork() 方法拆分为多个子任务。
  • 子任务会被放入当前线程的任务队列中,或者被其他线程窃取。

12.3.2、任务执行

  • 每个线程从自己的任务队列中获取任务并执行。
  • 如果任务队列为空,线程会从其他线程的任务队列末尾窃取任务。

12.3.3、任务合并

  • 任务通过 join() 方法等待子任务完成,并将结果合并。
  • 最终结果会被返回给调用者。

12.4、Fork/Join 框架的使用示例

12.4.1、课程案例

要求:计算 1+2+3+4+…+100,计算时两个相邻的数值不能超过10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class MyTask extends RecursiveTask<Integer> {
// 拆分差值不能超过10
public static final int VALUE = 10;

private int begin; // 拆分的开始值
private int end; // 拆分的结束值
private int result; // 结果

//有参构造
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}


//拆分和合并
@Override
protected Integer compute() {
// 两个数值相加是否大于10
if (end - begin <= VALUE) {
//相加操作
for (int i = begin; i <= end; i++) {
result += i;
}
} else {// 进一步拆分
// 获取中间值
int middle = (begin + end) / 2;
// 拆分左边
MyTask t1 = new MyTask(begin, middle);
// 拆分右边
MyTask t2 = new MyTask(middle + 1, end);
// 合并
t1.fork();
t2.fork();
// 合并结果
result = t1.join() + t2.join();
}
return result;
}
}

public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask task = new MyTask(0, 100);
// 创建分支合并池对象
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = pool.submit(task);

// 获取最终合并后的结果
Integer result = forkJoinTask.get();

System.out.println(result);

// 关闭分支合并池
pool.shutdown();

}

}

12.4.2、计算数组元素的和

以下是一个使用 Fork/Join 框架计算数组元素和的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinExample {
public static void main(String[] args) {
// 创建 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();

// 创建一个数组
int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

// 创建任务
SumTask task = new SumTask(array, 0, array.length);

// 提交任务并获取结果
int result = pool.invoke(task);
System.out.println("Sum of array elements: " + result);
}
}

// 定义 RecursiveTask
class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2; // 任务拆分的阈值
private int[] array;
private int start;
private int end;

public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
// 如果任务足够小,直接计算
if (end - start <= THRESHOLD) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 拆分任务
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);

// 执行子任务
leftTask.fork();
rightTask.fork();

// 合并结果
return leftTask.join() + rightTask.join();
}
}
}

12.4.3、归并排序

以下是一个使用 Fork/Join 框架实现归并排序的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class MergeSortExample {
public static void main(String[] args) {
// 创建 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();

// 创建一个数组
int[] array = {5, 3, 8, 6, 2, 7, 1, 4};

// 创建任务
MergeSortTask task = new MergeSortTask(array, 0, array.length - 1);

// 提交任务
pool.invoke(task);

// 输出排序结果
System.out.println("Sorted array: " + Arrays.toString(array));
}
}

// 定义 RecursiveAction
class MergeSortTask extends RecursiveAction {
private static final int THRESHOLD = 2; // 任务拆分的阈值
private int[] array;
private int start;
private int end;

public MergeSortTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected void compute() {
// 如果任务足够小,直接排序
if (end - start <= THRESHOLD) {
Arrays.sort(array, start, end + 1);
} else {
// 拆分任务
int mid = (start + end) / 2;
MergeSortTask leftTask = new MergeSortTask(array, start, mid);
MergeSortTask rightTask = new MergeSortTask(array, mid + 1, end);

// 执行子任务
leftTask.fork();
rightTask.fork();

// 等待子任务完成
leftTask.join();
rightTask.join();

// 合并结果
merge(array, start, mid, end);
}
}

private void merge(int[] array, int start, int mid, int end) {
int[] temp = new int[end - start + 1];
int i = start, j = mid + 1, k = 0;

while (i <= mid && j <= end) {
if (array[i] < array[j]) {
temp[k++] = array[i++];
} else {
temp[k++] = array[j++];
}
}

while (i <= mid) {
temp[k++] = array[i++];
}

while (j <= end) {
temp[k++] = array[j++];
}

System.arraycopy(temp, 0, array, start, temp.length);
}
}

13、异步回调(CompletableFuture)

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,位于 java.util.concurrent 包中。它是 Future 接口的实现类,提供了更丰富的功能,支持链式调用、组合任务、异常处理等,能够更方便地处理异步任务。

13.1、 CompletableFuture 的基本概念

13.1.1、什么是 CompletableFuture

  • CompletableFuture 是一个可完成的 Future,它不仅表示一个异步计算的结果,还允许你显式地完成它(通过设置结果或异常)。
  • 它支持链式调用,可以将多个异步任务串联起来,形成一个任务链。
  • 它提供了丰富的 API,用于处理任务的完成、组合、转换和异常处理。

13.1.2、CompletableFuture 的优缺点及适用场景

优点:

  1. 简化异步编程:提供了丰富的 API,简化了异步任务的编写和组合。
  2. 非阻塞操作:支持非阻塞的链式调用,避免了传统 Future 的阻塞问题。
  3. 任务组合:支持任务之间的组合和依赖关系,能够更灵活地处理复杂的异步任务。
  4. 异常处理:提供了专门的 API 处理异常,简化了异常处理的逻辑。

缺点:

  1. 复杂性:API 较为丰富,初学者可能需要一定时间熟悉。
  2. 性能开销:相比于简单的线程池,CompletableFuture 的实现较为复杂,可能会带来一定的性能开销。
  3. 适用性有限:适用于复杂的异步任务处理,但在简单的场景中可能显得过于复杂。

适用场景:

  1. 异步任务处理:如网络请求、数据库查询等。
  2. 任务组合:如多个异步任务的依赖关系处理。
  3. 事件驱动编程:如响应式编程、事件处理等。
  4. 并发处理:如并行计算、批处理任务等。

13.1.3、与传统 Future 的区别

特性 Future CompletableFuture
异步任务 只能通过 get() 获取结果,阻塞线程 支持非阻塞的链式调用和组合任务
任务完成 无法手动完成任务 可以手动完成任务(设置结果或异常)
异常处理 不支持直接处理异常 提供专门的 API 处理异常
任务组合 不支持任务之间的组合 支持任务的组合和依赖关系

13.2、CompletableFuture 的核心功能

13.2.1、创建 CompletableFuture

  • 空任务

    1
    2
    3
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Task is running...");
    });
  • 带返回值的任务

    1
    2
    3
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Result of the task";
    });

13.2.2、完成任务

  • 手动完成任务
    1
    2
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("Result"); // 手动设置结果
  • 手动抛出异常
    1
    future.completeExceptionally(new RuntimeException("Task failed"));

13.2.3、获取结果

  • 阻塞获取结果

    1
    String result = future.get(); // 阻塞等待结果
  • 非阻塞获取结果

    1
    String result = future.getNow("Default value"); // 如果未完成,返回默认值

13.2.4、链式调用

  • 转换结果

    1
    2
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(result -> result + " World");
  • 消费结果

    1
    future.thenAccept(result -> System.out.println(result));
  • 执行后续任务

    1
    future.thenRun(() -> System.out.println("Task completed"));

13.2.5、任务组合

  • 顺序组合
    1
    2
    3
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
    CompletableFuture<String> combined = future1.thenCombine(future2, (r1, r2) -> r1 + " " + r2);
  • 并行组合
    1
    2
    3
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
    CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);

13.2.6、异常处理

  • 捕获异常
    1
    2
    3
    4
    future.exceptionally(ex -> {
    System.out.println("Exception occurred: " + ex.getMessage());
    return "Default value";
    });
  • 处理异常和结果
    1
    2
    3
    4
    5
    6
    7
    future.handle((result, ex) -> {
    if (ex != null) {
    System.out.println("Exception occurred: " + ex.getMessage());
    return "Default value";
    }
    return result;
    });

13.3、 CompletableFuture 的使用示例

13.3.1、课程案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步调用,无返回值
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 执行异步任务
System.out.println(Thread.currentThread().getName() + " runAsync");
});

future.get();

// 异步调用,有返回值
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
// 执行异步任务
System.out.println(Thread.currentThread().getName() + " supplyAsync");

// 模拟异常
int i = 1 / 0;
return 1;
});

future2.whenComplete((t, u) -> {
// 处理异步任务的结果
System.out.println("异步任务结果:" + t);
System.out.println("异常信息:" + u);
}).exceptionally((e) -> {
// 处理异常
System.out.println("异步任务异常:" + e.getMessage());
return null;
});

}

13.3.2、简单异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
public static void main(String[] args) {
// 创建一个异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result of the task";
});

// 获取结果
future.thenAccept(result -> System.out.println("Result: " + result));

// 防止主线程退出
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

13.3.3、任务组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.CompletableFuture;

public class CompletableFutureCombineExample {
public static void main(String[] args) {
// 创建两个异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

// 组合任务
CompletableFuture<String> combined = future1.thenCombine(future2, (r1, r2) -> r1 + " " + r2);

// 获取结果
combined.thenAccept(result -> System.out.println("Combined result: " + result));

// 防止主线程退出
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}