Java并发编程-线程池篇

Java 提供了强大的并发编程工具,其中线程池(ThreadPool)是管理和复用线程的核心机制。本文将深入探讨 Java 线程池的工作原理、核心组件、创建参数及其在实际应用中的使用场景。

线程池原理

线程池是为了减少频繁创建和销毁线程带来的损耗,通过复用线程来提高系统的性能和稳定性。线程池的工作原理如下:

线程池的组成部分

线程池主要由以下几个部分组成:

  1. 核心线程池:线程池中保持活跃的线程数量,即使这些线程处于空闲状态也不会被销毁。
  2. 线程池容量:线程池中允许的最大线程数量。
  3. 等待任务队列:当线程池中的线程都在执行任务时,新提交的任务会被放入等待任务队列中。

线程池的创建参数详解

在 Java 中,ThreadPoolExecutor 是创建线程池的主要类。它提供了多个构造函数,其中最常用的是包含七个可选参数的构造函数。这些参数用于配置线程池的行为和特性。

以下是 ThreadPoolExecutor 构造函数的七个可选参数及其详细说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 参数校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();

// 初始化线程池参数
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

1. corePoolSize

  • 类型int
  • 描述:核心线程池大小,即线程池中保持活跃的线程数量。即使这些线程处于空闲状态也不会被销毁。
  • 默认值:无默认值,必须指定。

2. maximumPoolSize

  • 类型int
  • 描述:线程池中允许的最大线程数量。当等待任务队列已满且核心线程池已满时,线程池会创建新的线程,直到达到最大线程数量。
  • 默认值:无默认值,必须指定。

3. keepAliveTime

  • 类型long
  • 描述:线程空闲时间,即当线程池中的线程数量超过核心线程池大小时,空闲线程在等待新任务的时间超过 keepAliveTime 后会被销毁。
  • 默认值:无默认值,必须指定。

4. unit

  • 类型TimeUnit
  • 描述keepAliveTime 的时间单位,如秒、毫秒等。
  • 默认值:无默认值,必须指定。

5. workQueue

  • 类型BlockingQueue<Runnable>
  • 描述:等待任务队列,用于存放等待执行的任务。当线程池中的线程都在执行任务时,新提交的任务会被放入等待任务队列中。
  • 默认值:无默认值,必须指定。

6. threadFactory

  • 类型ThreadFactory
  • 描述:线程工厂,用于创建新的线程。可以通过自定义线程工厂来设置线程的名称、优先级等属性。
  • 默认值Executors.defaultThreadFactory(),使用默认的线程工厂。

7. handler

  • 类型RejectedExecutionHandler
  • 描述:拒绝策略,当线程池容量已满且等待任务队列已满时,线程池会拒绝新提交的任务,并根据拒绝策略处理该任务。
  • 默认值AbortPolicy,直接抛出 RejectedExecutionException 异常。

以下是一个使用 ThreadPoolExecutor 创建线程池的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.*;

public class ThreadPoolExample {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程池大小
4, // 最大线程池大小
10, // 线程空闲时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(2), // 等待任务队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

// 提交任务
for (int i = 0; i < 7; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

// 关闭线程池
executor.shutdown();
}
}

线程池的工作流程

工作流程如下图所示:

线程池工作流程(图片来源网络)
  1. 提交任务:当一个任务被提交到线程池时,线程池会首先检查核心线程池是否已满。
  2. 核心线程池是否已满
    • 如果核心线程池未满,线程池会创建一个新的线程来执行任务。
    • 如果核心线程池已满,线程池会检查等待任务队列是否已满。
  3. 等待任务队列是否已满
    • 如果等待任务队列未满,线程池会将任务放入等待任务队列中。
    • 如果等待任务队列已满,线程池会检查线程池容量是否已满。
  4. 线程池容量是否已满
    • 如果线程池容量未满,线程池会创建一个新的线程来执行任务。
    • 如果线程池容量已满,线程池会拒绝任务,并根据拒绝策略处理该任务。

线程池的拒绝策略

当线程池容量已满且等待任务队列已满时,线程池会拒绝新提交的任务。Java 提供了以下几种预置拒绝策略:

  1. AbortPolicy:默认策略,直接抛出 RejectedExecutionException 异常。
  2. CallerRunsPolicy:由提交任务的线程执行该任务。
  3. DiscardPolicy:直接丢弃任务,不抛出异常。
  4. DiscardOldestPolicy:丢弃等待队列中最旧的任务,然后尝试重新提交当前任务。

其他

线程池参数设置

在设置线程池参数时,需要根据具体的应用场景和任务类型来调整线程池的核心线程数、最大线程数、等待任务队列和拒绝策略等参数。以下是一些常见的线程池参数设置建议。

  • CPU 密集型任务:对于 CPU 密集型任务,线程池的核心线程数可以设置为 CPU 核数加 1。这样可以确保线程池中的线程数量与 CPU 核心数量相匹配,避免过多的线程竞争 CPU 资源。

  • IO 密集型任务:对于 IO 密集型任务,线程池的核心线程数可以设置为 CPU 核数的两倍。这样可以确保线程池中有足够的线程来处理 IO 操作,避免线程等待 IO 操作完成时阻塞。

核心线程数可以设置为 0 吗?

可以。将核心线程数设置为 0 时,线程池在初始状态下不会创建任何线程。当有任务提交时,任务会先进入等待任务队列。当等待任务队列已满时,线程池才会创建新的线程来执行任务。

线程池的种类

在 Java 中,java.util.concurrent 包提供了多种线程池的实现,每种线程池都有其特定的用途和特点。以下是常见的几种线程池及其特点:

  1. ScheduledThreadPool

ScheduledThreadPool 是一种可以设置定期执行任务的线程池。它允许你安排任务在给定的延迟后执行,或者定期重复执行。

特点

  • 定期执行任务:可以设置任务在给定的延迟后执行,或者定期重复执行。
  • 核心线程数固定:核心线程数和最大线程数相同。
  1. FixedThreadPool

FixedThreadPool 是一种核心线程数和最大线程数相同的线程池。它适用于需要固定数量线程来处理任务的场景。

特点

  • 固定线程数:核心线程数和最大线程数相同。
  • 等待任务队列:使用 LinkedBlockingQueue,容量为 Integer.MAX_VALUE
  1. CachedThreadPool

CachedThreadPool 是一种可以成为缓存线程池的线程池。它的任务等待队列为 SynchronousQueue,容量为 0,仅做任务流转,效率很高。它的特点在于线程数可以一直增加,甚至达到 Integer.MAX_VALUE(即 2^31-1)。

特点

  • 动态线程数:线程数可以一直增加,直到达到 Integer.MAX_VALUE
  • 等待任务队列:使用 SynchronousQueue,容量为 0,仅做任务流转。
  1. SingleThreadExecutor

SingleThreadExecutor 是一种只有一个线程的线程池。它适用于需要顺序执行任务的场景。

特点

  • 单线程:只有一个线程,任务按顺序执行。
  • 等待任务队列:使用 LinkedBlockingQueue,容量为 Integer.MAX_VALUE
  1. SingleThreadScheduledExecutor

SingleThreadScheduledExecutor 是一种只有一个线程的线程池,可以设置定期执行任务。

特点

  • 单线程:只有一个线程,任务按顺序执行。
  • 定期执行任务:可以设置任务在给定的延迟后执行,或者定期重复执行。

shutdownshutdownNow 方法详解

在 Java 中,ThreadPoolExecutor 提供了两种关闭线程池的方法:shutdownshutdownNow。这两种方法用于优雅地关闭线程池,但它们的行为有所不同。

shutdown 方法

shutdown 方法用于优雅地关闭线程池。它会将状态置为SHUTDOWN,拒绝新提交的任务,但会等待当前正在执行的任务和已经在等待队列中的任务完成后再关闭线程池。

以下是 ThreadPoolExecutor 类中 shutdown 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

关键步骤

  1. 获取锁:获取线程池的主锁 mainLock
  2. 检查权限:调用 checkShutdownAccess() 方法检查是否有权限关闭线程池。
  3. 更新状态:调用 advanceRunState(SHUTDOWN) 方法将线程池的状态更新为 SHUTDOWN
  4. 中断空闲线程:调用 interruptIdleWorkers() 方法中断所有空闲的线程。
  5. 调用钩子方法:调用 onShutdown() 方法,这是一个钩子方法,用于在关闭线程池时执行一些自定义操作。
  6. 释放锁:释放线程池的主锁 mainLock
  7. 尝试终止:调用 tryTerminate() 方法尝试终止线程池。

shutdownNow 方法

shutdownNow 方法用于立即关闭线程池。它会立即将线程池的状态设置为 STOP,并尝试中断所有正在执行的任务,同时返回等待队列中尚未执行的任务列表。

shutdownNow 试图通过调用 Thread.interrupt() 方法来终止线程。然而,这种方法的效果有限,如果线程中没有使用 sleepwaitcondition、定时锁等阻塞操作,interrupt() 方法可能无法中断当前的线程。因此,shutdownNow 并不保证线程池能够立即退出,它可能需要等待所有正在执行的任务完成才能真正退出。

以下是 ThreadPoolExecutor 类中 shutdownNow 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

关键步骤

  1. 获取锁:获取线程池的主锁 mainLock
  2. 检查权限:调用 checkShutdownAccess() 方法检查是否有权限关闭线程池。
  3. 更新状态:调用 advanceRunState(STOP) 方法将线程池的状态更新为 STOP
  4. 中断所有线程:调用 interruptWorkers() 方法中断所有线程,包括正在执行任务的线程。
  5. 清空等待队列:调用 drainQueue() 方法清空等待队列,并返回尚未执行的任务列表。
  6. 释放锁:释放线程池的主锁 mainLock
  7. 尝试终止:调用 tryTerminate() 方法尝试终止线程池。

提交到线程池的任务可以撤回?

是的,当向线程池提交任务时,会得到一个 Future 对象。这个 Future 对象提供了几种方法来管理任务的执行,包括取消任务。取消任务的主要方法是 Future 接口中的 cancel(boolean mayInterruptIfRunning) 方法。这个方法尝试取消执行的任务。参数 mayInterruptIfRunning 指示是否允许中断正在执行的任务。

Future 接口

Future 接口表示一个异步计算的结果。它提供了以下几个主要方法:

  • **cancel(boolean mayInterruptIfRunning)**:尝试取消任务的执行。
  • **isCancelled()**:判断任务是否已被取消。
  • **isDone()**:判断任务是否已完成(包括正常完成、异常完成或被取消)。
  • **get()**:获取任务的执行结果,如果任务尚未完成,则阻塞等待。
  • **get(long timeout, TimeUnit unit)**:在指定时间内获取任务的执行结果,如果任务尚未完成,则阻塞等待。
1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

多线程场景示例:按照顺序打印奇偶数

在多线程编程中,实现按照顺序打印奇偶数是一个常见的场景。可以通过使用 synchronized 关键字、Lock 接口或 Semaphore 等同步机制来实现线程间的协作。

方法一:使用 synchronized 关键字

实现思路

  1. 使用 synchronized 关键字同步两个线程的执行。
  2. 通过一个共享的变量来控制打印奇数和偶数的顺序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class Main {
private static final Object lock = new Object();
private static int count = 0;
private static final int MAX_COUNT = 10;

public static void main(String[] args) {
Thread oddP = new Thread(new PrintOdd(), "OddThread");
Thread evenP = new Thread(new PrintEven(), "EvenThread");

oddP.start();
evenP.start();
}

static class PrintOdd implements Runnable {
@Override
public void run() {
while (count < MAX_COUNT) {
synchronized (lock) {
if (count % 2 != 0) {
System.out.println("Odd number: " + count + " ThreadName: " + Thread.currentThread().getName());
count++;
lock.notify();
} else {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}

static class PrintEven implements Runnable {
@Override
public void run() {
while (count < MAX_COUNT) {
synchronized (lock) {
if (count % 2 == 0) {
System.out.println("Even number: " + count + " ThreadName: " + Thread.currentThread().getName());
count++;
lock.notify();
} else {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
Odd: 1
Even: 2
Odd: 3
Even: 4
Odd: 5
Even: 6
Odd: 7
Even: 8
Odd: 9
Even: 10

方法二:使用 Lock 接口和 Condition

实现思路

  1. 使用 ReentrantLock 来实现线程间的同步。
  2. 使用 Condition 来控制线程的等待和唤醒。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class OddEvenPrintWithLock {
private static final Lock lock = new ReentrantLock();
private static final Condition oddCondition = lock.newCondition();
private static final Condition evenCondition = lock.newCondition();
private static int count = 1;
private static final int MAX = 10;

public static void main(String[] args) {
Thread oddThread = new Thread(() -> {
while (count <= MAX) {
lock.lock();
try {
if (count % 2 != 0) {
System.out.println("Odd: " + count++);
evenCondition.signal();
} else {
oddCondition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});

Thread evenThread = new Thread(() -> {
while (count <= MAX) {
lock.lock();
try {
if (count % 2 == 0) {
System.out.println("Even: " + count++);
oddCondition.signal();
} else {
evenCondition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});

oddThread.start();
evenThread.start();
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
Odd: 1
Even: 2
Odd: 3
Even: 4
Odd: 5
Even: 6
Odd: 7
Even: 8
Odd: 9
Even: 10

方法三:使用 Semaphore

实现思路

  1. 使用两个 Semaphore 来控制线程的执行顺序。
  2. 一个 Semaphore 用于控制奇数线程的执行,另一个 Semaphore 用于控制偶数线程的执行。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.concurrent.Semaphore;

public class OddEvenPrintWithSemaphore {
private static final Semaphore oddSemaphore = new Semaphore(1);
private static final Semaphore evenSemaphore = new Semaphore(0);
private static int count = 1;
private static final int MAX = 10;

public static void main(String[] args) {
Thread oddThread = new Thread(() -> {
while (count <= MAX) {
try {
oddSemaphore.acquire();
if (count % 2 != 0) {
System.out.println("Odd: " + count++);
}
evenSemaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread evenThread = new Thread(() -> {
while (count <= MAX) {
try {
evenSemaphore.acquire();
if (count % 2 == 0) {
System.out.println("Even: " + count++);
}
oddSemaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

oddThread.start();
evenThread.start();
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
Odd: 1
Even: 2
Odd: 3
Even: 4
Odd: 5
Even: 6
Odd: 7
Even: 8
Odd: 9
Even: 10