五一要结束了,是时候开始新的一波学习了~

一、什么是线程池?为什么要用线程池?
线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
优势:
- 降低资源的消耗。降低线程创建和销毁的资源消耗;
- 提高响应速度。例如:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间
- 提高线程的可管理性。
二、如何实现一个线程池
根据线程池的概念,如果要自己创建线程池,应该满足一下条件。
- 保存线程的容器。因为线程必须在池子已经创建好了,并且可以保持住,因此,需要一个容器去保存我们的线程。
- 可以接受外部任务。线程还要能够接受外部的任务,冰并运行这个任务。
- 保存任务的容器,有些任务可能来不及执行,因此需要将来不及执行的任务通过容器保存起来。
根据以上的条件以及之前我们学的并发编程知识,我们先手动自己尝试写一个线程池
Code:
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @Auther: DarkKing
* @Date: 2019/5/4 12:09
* @Description:
*/
public class MyThreadPool {
// 线程池中默认线程的个数为5
private static int WORK_NUM = 5;
// 队列默认任务个数为100
private static int TASK_COUNT = 100;
// 工作线程组
private WorkThread[] workThreads;
// 任务队列,作为一个缓冲
private final BlockingQueue<Runnable> taskQueue;
private final int worker_num;//用户在构造这个池,希望的启动的线程数
// 创建具有默认线程个数的线程池
public MyThreadPool() {
this(WORK_NUM,TASK_COUNT);
}
// 创建线程池,worker_num为线程池中工作线程的个数
public MyThreadPool(int worker_num,int taskCount) {
if (worker_num<=0) worker_num = WORK_NUM;
if(taskCount<=0) taskCount = TASK_COUNT;
this.worker_num = worker_num;
taskQueue = new ArrayBlockingQueue<>(taskCount);
workThreads = new WorkThread[worker_num];
for(int i=0;i<worker_num;i++) {
workThreads[i] = new WorkThread();
workThreads[i].start();
}
}
// 执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器决定
public void execute(Runnable task) {
try {
taskQueue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
public void destroy() {
// 工作线程停止工作,且置为null
System.out.println("ready close pool.....");
for(int i=0;i<worker_num;i++) {
workThreads[i].stopWorker();
workThreads[i] = null;//help gc
}
taskQueue.clear();// 清空任务队列
}
// 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数
@Override
public String toString() {
return "WorkThread number:" + worker_num
+ " wait task number:" + taskQueue.size();
}
/**
* 内部类,工作线程
*/
private class WorkThread extends Thread{
@Override
public void run(){
Runnable r = null;
try {
while (!isInterrupted()) {
//监听阻塞队列,如果有任务,则执行相应的任务
r = taskQueue.take();
if(r!=null) {
System.out.println(getId()+" ready exec :"+r);
r.run();
}
r = null;//help gc;
}
} catch (Exception e) {
// TODO: handle exception
}
}
//停止线程
public void stopWorker() {
interrupt();
}
}
}
1、定义WorkThread类,用来表示执行的线程,用于监听阻塞队列任务。
2、创建构建函数,我们将线程池进行初始化,并启动所有的工作线程。workThreads用来保存运行的线程,使用BlockingQueue<Runnable> taskQueue用来保存我们的任务队列
3、创建提交任务方法execute,用于提交我们的任务。
4、创建销毁线程池的方法destroy,用于销毁线程池。
之后我们编写测试类
import java.util.Random;
/**
* @Auther: DarkKing
* @Date: 2019/5/4 12:09
* @Description:
*/
public class TestMyThreadPool {
public static void main(String[] args) throws InterruptedException {
// 创建3个线程的线程池
MyThreadPool t = new MyThreadPool(3,0);
t.execute(new MyTask("testA"));
t.execute(new MyTask("testB"));
t.execute(new MyTask("testC"));
t.execute(new MyTask("testD"));
t.execute(new MyTask("testE"));
System.out.println(t);
Thread.sleep(10000);
t.destroy();// 所有线程都执行完成才destory
System.out.println(t);
}
// 任务类
static class MyTask implements Runnable {
private String name;
private Random r = new Random();
public MyTask(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {// 执行任务
try {
Thread.sleep(r.nextInt(1000)+2000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
+Thread.currentThread().isInterrupted());
}
System.out.println("任务 " + name + " 完成");
}
}
}

能按照线程池的方式进行执行。
我们简单地手动写了一个线程池,但以上线程池有哪些问题呢?
1、启动的时候就将所有线程启动了,如果长时间不用比较消耗资源。
2、没有任务饱和的一个错略。
3、没有任务超时机制等等。
三、JDK中的线程池和工作机制
我们大致了解了线程池的一个机制,那我们看下JDK中,是如何实现线程池的吧。
1、线程池的创建
JAVA中,ThreadPoolExecutor,是所有线程池实现的父类,类结构图如下。

它的构造函数含有以下参数

参数 |
含义 |
int corePoolSize |
线程池中核心线程数,< corePoolSize ,就会创建新线程,= corePoolSize ,这个任务就会保存到BlockingQueue,如果调用prestartAllCoreThreads()方法就会一次性的启动corePoolSize 个数的线程。 |
int maximumPoolSize |
允许的最大线程数,如果BlockingQueue也满了,并且线程数< maximumPoolSize时候就会再次创建新的线程 |
long keepAliveTime |
线程空闲下来后,存活的时间,这个参数只在线程数> corePoolSize才有用 |
TimeUnit unit |
存活时间的单位值 |
BlockingQueue<Runnable> workQueue |
保存任务的阻塞队列 |
ThreadFactory threadFactory |
创建线程的工厂,给新建的线程赋予名字 |
RejectedExecutionHandler handler |
饱和策略 AbortPolicy :直接抛出异常,默认; CallerRunsPolicy:用调用者所在的线程来执行任务 DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务 DiscardPolicy :当前任务直接丢弃 实现自己的饱和策略只要实现RejectedExecutionHandler接口即可 |
提交任务
execute(Runnable command) 不需要返回
Future<T> submit(Callable<T> task) 需要返回值
关闭线程池
shutdown(),shutdownNow();
shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程
shutdown()设置线程池的状态,只会中断所有没有执行任务的线程
2、线程池的工作机制

1、如果工作线程数小于核心线程数,则创建工作线程
2、如果工作线程数等于或者大于核心线程数,则将任务提交到阻塞队列中
3、如果阻塞队列也满了,但线程数小于最大线程数,则创建新的线程
4、如果创建新的线程也满了,则执行任务饱和策略。
源码如下

3、如何合理配置线程池
根据任务的性质来:计算密集型(CPU),IO密集型,混合型
计算密集型:例如加密,大数分解,正则……等
推荐:机器的Cpu核心数+1,为什么+1,防止页缺失,(机器的Cpu核心=Runtime.getRuntime().availableProcessors();)
IO密集型:读取文件,数据库连接,网络通讯,
推荐:线程数适当大一点,机器的Cpu核心数*2,
混合型:尽量拆分,如果IO密集型远远计算密集型,拆分意义不大。
在阻塞队列的选择上,应该使用有界,无界队列可能会导致内存溢出
4、预定义的线程池
Java中,帮我们预定了5种线程池
1、FixedThreadPool
创建固定线程数量的线程池,适用于负载较重的服务器,使用了无界队列
2、SingleThreadExecutor
创建单个线程的线程池,适用于需要顺序保证执行任务,不会有多个线程业务,使用了无界队列
3、CachedThreadPool
会根据需要来创建新线程的,适用于执行很多短期异步任务的程序,使用了SynchronousQueue
4、WorkStealingPool(JDK7以后)
工作密取线程池,基于ForkJoinPool实现。适用于大任务分解的线程池。
5、ScheduledThreadPoolExecutor
需要定期执行周期任务的线程池。有两种实现
newSingleThreadScheduledExecutor:只包含一个线程,只需要单个线程执行周期任务,保证顺序的执行各个任务
newScheduledThreadPool 可以包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候
方法说明:
schedule:只执行一次,任务还可以延时执行
scheduleAtFixedRate:提交固定时间间隔的任务
scheduleWithFixedDelay:提交固定延时间隔执行的任务
具体使用,大家可以自行百度,都比较多,一般如果并发比较高的业务中,以上线程池都不建议使用,因为他们采用的都是无界队列,任务量比较大的时候有可能导致内存溢出。一般正确用法是直接使用ThreadPoolExecutor进行创建,或根据自身需求进行自定义。
本章重点:线程池的创建,使用,以及运行原理。
其他阅读 并发编程专题
