线程池总结

2020/1/1

# 线程池

# 1. 为什么使用线程池?

池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

《Java 并发编程的艺术》提到的使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

# 2. 如何创建线程池?

《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

# 2.1 Executors 和ExecutorService返回线程池对象的弊端

# 2.1.1 简介

JDK 5.0起提供了线程池相关API:ExecutorServiceExecutors

ExecutorService:真正的线程池接口。常见子类ThreadPoolExecutor

  • void execute(Runnable command) :执行任务/命令,没有返回值,一般用来执行Runnable
  • <T> Future<T> submit(Callable<T> task):执行任务,有返回值,一般又来执行Callable
  • void shutdown():关闭连接池

Executors:工具类、线程池的工厂类,用于创建并返回不同类型的线程池:

  • Executors.newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。 (可根据需要创建新线程的线程池)
    • 创建方式: Executors.newCachedThreadPool()
  • Executors.newFixedThreadPool(n): 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。定长线程池的大小最好根据系统资源进行设置,如Runtime.getRuntime().availableProcessors()
    • 创建方式: Executors.newFixedThreadPool()
  • Executors.newSingleThreadExecutor() :创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    • 创建方式: Executors.newSingleThreadExecutor ()
  • Executors.newScheduledThreadPool(n):创建一个定长线程池,支持定时及周期性任务执行。
    • 创建方式: Executors.newScheduledThreadPool ()

# 2.1.2 Executors 返回线程池对象的弊端

FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。

CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

Out Of Memory(OOM),一般是由于程序编写者对内存使用不当,如对该释放的内存资源没有释放,导致其一直不能被再次使用而使计算机内存被耗尽的现象。

# 2.2 ThreadPoolExecutor创建线程池

# 2.2.1 构造方法

public class ThreadPoolExecutor extends AbstractExecutorService {

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

}
1
2
3
4
5
6
7
8
9
10
11

ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

# 2.2.2 通过ThreadPoolExecutor可以实现 Executors的三种不同类型的线程池

![image-20210614211916188](https://gitee.com/koala010/typora/raw/master/img/ThreadPoolExecutor可以实现 Executors的三种不同类型的线程池.png)

# 3. ThreadPoolExecutor 类分析

    /**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 3.1 构造器参数详解:

  • corePoolSize: ==核心线程数线程数定义了最小可以同时运行的线程数量==。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize线程池最大线程数。它表示在线程池中最多能创建多少个线程;
  • keepAliveTime表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit参数keepAliveTime的时间单位,有7种取值。TimeUnit.DAYS、TimeUnit.HOURS、TimeUnit.MINUTES、TimeUnit.SECONDS、TimeUnit.MILLISECONDS、TimeUnit.MICROSECONDS、TimeUnit.NANOSECONDS
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。 ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和SynchronousQueue。线程池的排队策略与BlockingQueue有关。
  • threadFactoryexecutor创建新线程的时候会用到;
  • handler表示当拒绝处理任务时的策略(饱和策略),四种取值,参看下文。

# 3.2 饱和策略

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义的一些策略。

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务(不处理),但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃最早的未处理的任务请求,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy调用执行自己的线程运行任务。也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。

# 4. 线程池原理

线程池原理

参看:线程池原理分析 (opens new window)

# 4.1 execute()方法和 submit()方法的区别

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Futureget()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

我们以**AbstractExecutorService**接口中的一个 submit 方法为例子来看看源代码:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
1
2
3
4
5
6

上面方法调用的 newTaskFor 方法返回了一个 FutureTask 对象。

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
1
2
3

我们再来看看execute()方法:

    public void execute(Runnable command) {
      ...
    }
1
2
3

# 4.2 线程池的执行过程

  1. 当工作线程数 < corePoolSize 时,新创建一个新线程执行新提交任务,即使此时线程池中存在空闲线程;
  2. 当工作线程数 == corePoolSize 时,新提交任务将被放入 workQueue 中;
  3. 当 workQueue 已满,且工作线程数 < maximumPoolSize 时,新提交任务会创建新的非核心线程执行任务;
  4. 当 workQueue 已满,且 工作线程数==maximumPoolSize 时,新提交任务由 RejectedExecutionHandler 处理;

img

# 线程池问题的排查

参看:一次线上线程池任务问题处理历程(小米信息技术部) (opens new window)