程序员社区

Java并发编程中由浅入深理解线程池

线程池的提出

在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。因为频繁创建线程和销毁线程需要时间。线程本身也要占用内存空间,大量的线程会占用内存资源并且可能会导致
Out of Memory。大量的线程回收也会给GC带来很大的压力。
在Java中内存资源是及其宝贵的,所以就提出了线程池的概念。

线程池简介

Java中开辟出了一种管理线程的概念,线程池的好处就是可以方便的管理线程,也可以减少内存的消耗。为了避免重复的创建线程,线程池的出现可以让线程进行复用。通俗点讲,当有工作来,就会向线程池拿一个线程,当工作完成后,并不是直接关闭线程,而是将这个线程归还给线程池供其他任务使用。

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

创建线程

之前也说过创建线程的三种方式

1.继承Thread类,重写run()

public class Th1 extends Thread{
    @Override
    public void run() {
        System.out.println("Th1");
    }
}

2.实现Runable接口,重写run()

public class Th2 implements Runnable{
    @Override
    public void run() {
        System.out.println("Th2");
    }
}

3.实现Callable接口,重写call()

public class Th3 implements Callable {
    @Override
    public Object call() throws Exception {
        System.out.println("th3");
        String th="Th3";
        return th;
    }
}

传统的启动线程的方法

实现Callable接口的线程需要配合FutureTask来使用,RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

public class thMain {
    public static void main(String[] arg) {
        Th1 th1=new Th1();
        Th2 th2=new Th2();
        Th3 th3=new Th3();
        Thread thread1=new Thread(th1);
        Thread thread2=new Thread(th2);
        FutureTask futureTask=new FutureTask(th3);
        Thread thread3=new Thread(futureTask);
        thread1.start();
        thread2.start();
        thread3.start();
    }
}
这里可能会有个问题,为什么不直接调用run(),而是start()

就是为了实现多线程的优点,没这个start()不行。直接调用run()就直接安排好了顺序,调用start()后,线程会被放到等待队列,等待CPU调度,并不一定要马上开始执行,只是将这个线程置于就绪状态。然后通过JVM,线程Thread会调用run(),执行本线程的线程体。

线程的生命周期图解

Java并发编程中由浅入深理解线程池插图

引入线程池

应用场景:

一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
T1 + T3 远大于T2,则可以采用线程池,以提高服务器性能。

使用线程池的好处

减少线程创建和销毁的时间。以上例来说,线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。T1T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1T3的开销了。
显著减少了创建线程的数目。一个服务器一天要处理n个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为n。利用线程池的服务器程序不会为了创建n个线程而在处理请求时浪费时间,从而提高效率。(n>>>线程池的数目)

首先来了解下线程池的流程

Java并发编程中由浅入深理解线程池插图1
image.png

任务进来时,wonrkerCountOf()能够取得当前线程池中的线程的总数,取得当前线程数与核心池大小比较,
1.如果小于,将通过addWorker()调度执行。
2.如果大于核心池大小,那么就提交到等待队列,等待执行
3.如果进入等待队列失败,则会将任务直接提交给线程池。
4.如果线程数达到最大线程数,那么就提交失败,就调用handler实现拒绝策略。

线程池的类图

Java并发编程中由浅入深理解线程池插图2
image.png

Executor只是一个接口,它是Java线程池框架的基础,它将任务的提交与任务的执行分离开来。
ExecutorService继承自Executor,有两个关键类实现了ExecutorService接口:
ThreadPoolExecutorScheduledThreadPoolExecutor
ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
ScheduledThreadPoolExecutor 也是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。它比Timer更灵活,功能更强大(继承了ThreadPoolExecutor类实现了
ScheduledExecutorService接口)

Executors提供的线程服务,都是通过参数设置来实现不同的线程池机制。
ThreadPoolExecutor构造方法参数讲解

参数名 作用 类型
corePoolSize 线程池的基本大小,包括空闲线程 int
maximumPoolSize 池中允许的最大线程数 int
keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间 long
TimeUnit keepAliveTime时间单位 TimeUnit
workQueue 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务 BlockingQueue<Runnable>
threadFactory 执行程序创建新线程时使用的工厂 ThreadFactory
handler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理,由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 RejectedExecutionHandler

poolSize:线程池中当前线程的数量。
当新提交一个任务时:
(1)如果poolSize<corePoolSize,新增加一个线程处理新的任务。
(2)如果poolSize=corePoolSize,新任务会被放入阻塞队列等待。
(3)如果阻塞队列的容量达到上限,且这时poolSize<maximumPoolSize,新增线程来处理任务。
(4)如果阻塞队列满了,且poolSize=maximumPoolSize,那么线程池已经达到极限,会根据饱和拒绝策略RejectedExecutionHandler拒绝新的任务。

创建一个线程池(使用默认的线程工厂)

ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,new SynchronousQueue<>(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

相关说明

TimeUnitjava.util.concurrent包下面的一个类,表示给定单元粒度的时间段

TimeUnit.DAYS          //天  
TimeUnit.HOURS         //小时  
TimeUnit.MINUTES       //分钟  
TimeUnit.SECONDS       //秒  
TimeUnit.MILLISECONDS  //毫秒 
TimeUnit.NANOSECONDS   //毫微秒
TimeUnit.MICROSECONDS  //微秒

ThreadFactory默认使用Executors.defaultThreadFactory()有需要的可以实现ThreadFactory接口进行重写,DefaultThreadFactoryExecutors类中的一个内部类

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

拒绝策略RejectedExecutionHandler,是ThreadPoolExecutor的内部类,都实现了
RejectedExecutionHandler接口,源码如下

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

线程池的组成

线程池包括以下四个基本组成部分:
1.ThreadPool(线程池管理器):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务
2.PoolWorker(工作线程):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务
3.Task(任务接口):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等
4.TaskQueue(任务队列):用于存放没有处理的任务。提供一种缓冲机制

常见的线程池

1.newCachedThreadPool(推荐使用):可缓存线程池,无界线程池,可以进行自动线程回收。该线程池中没有核心线程,非核心线程的数量为无限大Integer.max_value,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行。适用于耗时少,任务量大的情况。
2.newScheduledThreadPool:周期性执行任务的线程池,按照某种特定的计划执行线程中的任务,有核心线程,但也有非核心线程,非核心线程的大小也为无限大。适用于执行周期性的任务。
3.newSingleThreadPool:单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务
4.newFixedThreadPool:固定数量的线程池,每提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行

任务缓存队列及排队策略

任务缓存队列workQueue,用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>
1.ArrayBlockingQueue(有界队列):用数组实现的有界阻塞队列,其内部按先进先出的原则对元素进行排序,通过重入锁ReenterLockCondition条件队列实现的,此队列创建时必须指定大小,最大的特点便是可以防止资源耗尽的情况发生。
2.LinkedBlockingQueue(无界队列):基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE,在正常情况下,链接队列的吞吐量要高于基于数组的队列ArrayBlockingQueue,因为其内部实现添加和删除操作使用的两个ReenterLock来控制并发执行,而ArrayBlockingQueue内部只是使用一个ReenterLock控制并发
3.SynchronousQueue(直接提交):一个不存储元素的队列,这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务,通常要求maximumPoolSize是无界的。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于ArrayBlockingQueueLinkedBlockingQueue

Handler的拒绝策略:

1.AbortPolicy:不执行新任务,直接抛出RejectedExecutionException异常,提示线程池已满
2.DisCardPolicy:不执行新任务,也不抛出异常
3.DisCardOldSetPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
4.CallerRunsPolicy:直接调用execute()来执行当前任务

代码实现

使用newCachedThreadPool线程池
public class thMain {
    public static void main(String[] arg) throws Exception{
        Th1 th1=new Th1();
        Th2 th2=new Th2();
        Th3 th3=new Th3();
        Thread thread1=new Thread(th1);
        Thread thread2=new Thread(th2);
        FutureTask futureTask=new FutureTask(th3);
        Thread thread3=new Thread(futureTask);
        ExecutorService executorService= Executors.newCachedThreadPool();
        executorService.execute(thread1);
        executorService.execute(thread2);
        executorService.execute(thread3);
        executorService.shutdown();
    }
}

使用newFixedThreadPool线程池
ExecutorService executorService= Executors.newFixedThreadPool(20);

使用newSingleThreadPool线程池(单线程的,不推荐)
ExecutorService executorService= Executors.newSingleThreadExecutor();

使用newScheduledThreadPool线程池
ExecutorService executorService= Executors.newScheduledThreadPool(20);

线程池的关闭

ThreadPoolExecutor提供了两个方法用于线程池的关闭
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

关于合理设置线程池大小

公式:最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Java并发编程中由浅入深理解线程池

相关推荐

  • 暂无文章

一个分享Java & Python知识的社区