Java线程池的使用学习


=Start=

缘由:

在之前记录的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

为了应对这种情况,所以学习一下Java中线程池的使用,方便以后参考、查阅。

正文:

参考解答:
一、使用线程池(相较于new Thread的方式来说)的好处:
  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。
二、线程池的实现原理

当一个新任务提交到线程池时,简单来说线程池的处理流程如下:

  1. 判断核心线程池里的线程是否都在执行任务,如果不是则创建一个新的工作线程处理任务,否则进入下个流程;
  2. 判断工作队列是否已满,如果未满则将新提交的任务存储在该工作队列,否则进入下个流程;
  3. 判断线程池里的线程是否都处于工作状态,如果不是则创建一个新的工作线程来执行任务,否则交由饱和策略处理。
三、Java中的ThreadPoolExecutor类

具体细节还是看下面的「参考链接」中的介绍吧,更深入详细。

四、使用示例
package com.ixyzero.learn.misc;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;

/**
 public ThreadPoolExecutor(int corePoolSize,
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
 BlockingQueue<Runnable> workQueue,
 ThreadFactory threadFactory,
 RejectedExecutionHandler handler)

 new ThreadPoolExecutor("线程池基本大小", "线程池最大大小", "线程保持存活的时间", "时间单位", "任务队列", "线程工厂", "饱和策略");
 */
public class ThreadPoolDemo {
    /**
     * 定义静态内部线程类
     */
    public static class MyTask implements Runnable {

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread name:"
                    + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        System.out.println("Start @ " + System.currentTimeMillis());
        MyTask myTask = new MyTask();
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("demo-pool-%d").build();
        ExecutorService executorService = new ThreadPoolExecutor(5, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 10; i++) {
            executorService.execute(myTask);
        }
        executorService.shutdown();
        System.out.println("End @ " + System.currentTimeMillis());
    }
}

&

package com.ixyzero.learn.misc;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo2 {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        System.out.println("Start @ " + start);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));

        for(int i=0; i<15; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
                    executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
        }
        executor.shutdown();
        long end = System.currentTimeMillis();
        System.out.println("End @ " + end + "\nSpend " + (end - start) + " ms.");
    }
}

class MyTask implements Runnable {
    private int taskNum;

    public MyTask(int num) {
        this.taskNum = num;
    }

    @Override
    public void run() {
        System.out.println("正在执行task " + taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+" 执行完毕@" + System.currentTimeMillis());
    }
}

从上面这段代码的执行结果中可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。


不过在Java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

Executors.newCachedThreadPool();      //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();  //创建容量为1的缓冲池
Executors.newFixedThreadPool(int);     //创建固定容量大小的缓冲池

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

  • newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
  • newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
  • newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

参考链接:

=END=


《 “Java线程池的使用学习” 》 有 8 条评论

  1. 高并发编程必备基础(上)
    https://mp.weixin.qq.com/s/ilXBIAHrHn1EWDlwWIJ5FQ
    `
    借用Java并发编程实践中的话”编写正确的程序并不容易,而编写正常的并发程序就更难了”,相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作的顺序是不可预期的,本文算是对多线程情况下同步策略的一个一个简单介绍。

    二、 什么是线程安全问题
    线程安全问题是指当多个线程同时读写一个状态变量,并且没有任何同步措施时候,导致脏数据或者其他不可预见的结果的问题。Java中首要的同步策略是使用Synchronized关键字,它提供了可重入的独占锁。

    三、 什么是共享变量可见性问题
    四、 原子性
    五、 CAS介绍
    六、 什么是可重入锁
    七、 Synchronized关键字
    八、 ReentrantReadWriteLock介绍
    九、 Volatile变量
    `

  2. FutureTask是怎样获取到异步执行结果的?
    https://www.cnblogs.com/yougewe/p/11666284.html
    `
      所谓异步任务,就是不在当前线程中进行执行,而是另外起一个线程让其执行。那么当前线程如果想拿到其执行结果,该怎么办呢?

      如果我们使用一个公共变量作为结果容器,两个线程共用这个值,那么应该是可以拿到结果的,但是这样一来,对业务就会造成侵入干扰了,因为你始终得考虑将这个共享变量传入到这个异步线程中去且要维持其安全性。

      我们知道,Future.get() 可以获取异步执行的结果,那么它是怎么做到的呢?

      要实现线程的数据交换,我们按照进程间的通信方式可知有: 管道、共享内存、Socket套接字。而同一个jvm的两个线程通信,所有线程共享内存区域,则一定是通过共享内存再简单不过了。

      本文将以 ThreadPoolExecutor 线程池 来解释这个过程。

      首先,如果想要获取一个线程的执行结果,需要调用 ThreadPoolExecutor.submit(Callable); 方法。然后该方法会返回一个 Future 对象,通过 Future.get(); 即可获取结果了。

      它具体是怎么实现的呢?

    一、首先,我们来看一下 submit 过程
      仅为返回了一个 Future 的对象供下游调用!

    二、异步线程如何执行?
      通过上面的分析,我们可以看到,异步线程的执行被包装成了 FutureTask, 而java的异步线程执行都是由jvm调用Thread.run()进行,所以异步起点也应该从这里去找。

    三、如何获取异步执行结果?
      当然是用户调用 future.get() 获取了!

      可以看到,等待逻辑还是有点多的,毕竟场景多。至此,我们已经完全看到了一个,如何获取异步线程的执行结果实现了。总结下:
        1. 实现Runnable接口,由jvm进行线程调用;
        2. 包装 Callable.call()方法,带返回值,当线程被调起时,转给 call() 方法执行,并返回结果;
        3. 将结果封装到当前future实例中,以备查;
        4. 当用户调用get()方法时,保证状态完成情况下,最快速地返回结果;

    四、扩展: Future.get() vs Thread.join()
      Future.get()方法,一方面是为了获取异步线程的执行结果,另一方面也做到了等待线程执行完成的效果。
      而 Thread.join() 则纯粹是为了等待异步线程执行完成,那它们有什么异曲同工之妙吗?来看下
      可以看到, Thread.join() 的等待逻辑是依赖于 jvm 的调度的, 通过 wait/notify 机制实现。与 Future.get() 相比,它是在 之后的,且无法获取结果。
    `

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注