几种常见的线程池及使用场景

几种常见的线程池及使用场景

为什么要使用线程池?

创建线程和销毁线程的花销是比较大的,这些时间有可能比处理业务的时间还要长。这样频繁的创建线程和销毁线程,再加上业务工作线程,消耗系统资源的时间,可能导致系统资源不足。(我们可以把创建和销毁的线程的过程去掉)

线程池有什么作用?

1、提高效率 创建好一定数量的线程放在池中,等需要使用的时候就从池中拿一个,这要比需要的时候创建一个线程对象要快的多。 2、方便管理 可以编写线程池管理代码对池中的线程同一进行管理,比如说启动时有该程序创建100个线程,每当有请求的时候,就分配一个线程去工作,如果刚好并发有101个请求,那多出的这一个请求可以排队等候,避免因无休止的创建线程导致系统崩溃。

说说几种常见的线程池及使用场景

1、newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue()));

}

2、newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue());

}

3、newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue());

}

4、newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {

return new ScheduledThreadPoolExecutor(corePoolSize);

}

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端: 1)newFixedThreadPool和newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。 2)newCachedThreadPool和newScheduledThreadPool: 主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。 Positive example 1:

//org.apache.commons.lang3.concurrent.BasicThreadFactory

ScheduledExecutorService executorService =

new ScheduledThreadPoolExecutor(1,

new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()

);

Positive example 2:

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()

.setNameFormat("demo-pool-%d").build();

//Common Thread Pool

ExecutorService pool = new ThreadPoolExecutor(5, 200,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

pool.execute(()-> System.out.println(Thread.currentThread().getName()));

pool.shutdown();//gracefully shutdown

Positive example 3:

class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

//in code

userThreadPool.execute(thread);

个人在项目中用到的是第三种,业务需求,每天会有调度服务器会通过http协议请求

class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

@Controller

@RequestMapping("/windData")

public class WindDataListener {

private final static ThLogger logger = ThLoggerFactory.getLogger("WindDataDispatcher");

@Autowired

private ThreadPoolTaskExecutor controlerThreadPool;

@Autowired

private ThreadPoolTaskExecutor windDataThreadPool;

@Autowired

private WindDataRuntimeService runtimeService;

@Autowired

private MaintainAlarmSender maintainAlarmSender;

/**

* 启动调度

*/

@RequestMapping(value = "/receiveMsg", method = RequestMethod.GET)

@ResponseBody

public void receiveMsg() {

final String paramLog = LogConst.BUSSINESS_NAME + LogConst.HTTP_API;

logger.info("[{}][接收到调度消息]", paramLog);

//定时调度,可能有多个http请求,把请求都放在controlerThreadPool里面

controlerThreadPool.execute(new WindDataDispatcher(windDataThreadPool, runtimeService, maintainAlarmSender,

MDC.getCopyOfContextMap()));

logger.info("[{}][响应给调度系统]", paramLog);

}

}

public class WindDataDispatcher implements Runnable {

private final static ThLogger logger = ThLoggerFactory.getLogger("WindDataDispatcher");

private ThreadPoolTaskExecutor taskThreadPool;

private WindDataRuntimeService runtimeService;

private MaintainAlarmSender maintainAlarmSender;

private Map mdcMap;

public WindDataDispatcher(ThreadPoolTaskExecutor taskThreadPool, WindDataRuntimeService runtimeService, MaintainAlarmSender maintainAlarmSender, Map mdcMap) {

this.taskThreadPool = taskThreadPool;

this.runtimeService = runtimeService;

this.maintainAlarmSender = maintainAlarmSender;

this.mdcMap = mdcMap;

}

@Override

public void run() {

if (null != mdcMap) {

MDC.setContextMap(mdcMap);

}

final String paramLog = LogConst.BUSSINESS_NAME + LogConst.DISPATCHER;

logger.info("[{}启动]", paramLog);

taskThreadPool.execute(new WindDataExecutor(runtimeService, maintainAlarmSender, mdcMap));

logger.info("[{}结束]", paramLog);

}

}

public class WindDataExecutor implements Runnable {

private final static ThLogger logger = ThLoggerFactory.getLogger("WindDataDispatcher");

private WindDataRuntimeService runtimeService;

private MaintainAlarmSender maintainAlarmSender;

private Map mdcMap;

public WindDataExecutor(WindDataRuntimeService runtimeService, MaintainAlarmSender maintainAlarmSender, Map mdcMap) {

this.runtimeService = runtimeService;

this.maintainAlarmSender = maintainAlarmSender;

this.mdcMap = mdcMap;

}

@Override

public void run() {

if (null != mdcMap) {

MDC.setContextMap(mdcMap);

}

final String paramLog = LogConst.BUSSINESS_NAME + LogConst.EXECUTOR;

logger.info("[{}启动]", paramLog);

try {

runtimeService.groundWindData();

} catch (Exception e) {

logger.error("[{}异常]{}", new Object[]{paramLog, e});

maintainAlarmSender.sendMail(MaintainAlarmSender.DEFAULT_MAIL_SUB, paramLog + "异常:" + e);

}

logger.info("[{}结束]", paramLog);

}

}

线程池都有哪几种工作队列

1、ArrayBlockingQueue 是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。 2、LinkedBlockingQueue 一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列 3、SynchronousQueue 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。 4、PriorityBlockingQueue 一个具有优先级的无限阻塞队列。

线程池中的几种重要的参数及流程说明

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS; //天

TimeUnit.HOURS; //小时

TimeUnit.MINUTES; //分钟

TimeUnit.SECONDS; //秒

TimeUnit.MILLISECONDS; //毫秒

TimeUnit.MICROSECONDS; //微妙

TimeUnit.NANOSECONDS; //纳秒

workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue

LinkedBlockingQueue

SynchronousQueue

PriorityBlockingQueue

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和SynchronousQueue。线程池的排队策略与BlockingQueue有关。

threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等

handler:表示当拒绝处理任务时的策略,有以下四种取值:

1、AbortPolicy:直接抛出异常。

2、CallerRunsPolicy:只用调用者所在线程来运行任务。

3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

4、DiscardPolicy:不处理,丢弃掉。

5、也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

/**

* A handler for rejected tasks that runs the rejected task

* directly in the calling thread of the {@code execute} method,

* unless the executor has been shut down, in which case the task

* is discarded.

*/

public static class CallerRunsPolicy implements RejectedExecutionHandler {

/**

* Creates a {@code CallerRunsPolicy}.

*/

public CallerRunsPolicy() { }

/**

* Executes task r in the caller's thread, unless the executor

* has been shut down, in which case the task is discarded.

*

* @param r the runnable task requested to be executed

* @param e the executor attempting to execute this task

*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown()) {

r.run();

}

}

}

/**

* A handler for rejected tasks that throws a

* {@code RejectedExecutionException}.

*/

public static class AbortPolicy implements RejectedExecutionHandler {

/**

* Creates an {@code AbortPolicy}.

*/

public AbortPolicy() { }

/**

* Always throws RejectedExecutionException.

*

* @param r the runnable task requested to be executed

* @param e the executor attempting to execute this task

* @throws RejectedExecutionException always

*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

throw new RejectedExecutionException("Task " + r.toString() +

" rejected from " +

e.toString());

}

}

/**

* A handler for rejected tasks that silently discards the

* rejected task.

*/

public static class DiscardPolicy implements RejectedExecutionHandler {

/**

* Creates a {@code DiscardPolicy}.

*/

public DiscardPolicy() { }

/**

* Does nothing, which has the effect of discarding task r.

*

* @param r the runnable task requested to be executed

* @param e the executor attempting to execute this task

*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

}

}

/**

* A handler for rejected tasks that discards the oldest unhandled

* request and then retries {@code execute}, unless the executor

* is shut down, in which case the task is discarded.

*/

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

/**

* Creates a {@code DiscardOldestPolicy} for the given executor.

*/

public DiscardOldestPolicy() { }

/**

* Obtains and ignores the next task that the executor

* would otherwise execute, if one is immediately available,

* and then retries execution of task r, unless the executor

* is shut down, in which case task r is instead discarded.

*

* @param r the runnable task requested to be executed

* @param e the executor attempting to execute this task

*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown()) {

e.getQueue().poll();

e.execute(r);

}

}

}

ThreadPoolExecutor 源码理解 https://www.cnblogs.com/dolphin0520/p/3932921.html

public static void test(int size) {

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 20, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));

for (int i = 0; i < size; i++) {

poolExecutor.execute(new DemoTask(i));

Console.log("poolSize:" + poolExecutor.getPoolSize());

Console.log("corePoolSize:" + poolExecutor.getCorePoolSize());

Console.log("maximumPoolSize:" + poolExecutor.getMaximumPoolSize());

Console.log("queue:" + poolExecutor.getQueue().size());

Console.log("completedTaskCount:" + poolExecutor.getCompletedTaskCount());

Console.log("largestPoolSize:" + poolExecutor.getLargestPoolSize());

Console.log("keepAliveTime:" + poolExecutor.getKeepAliveTime(TimeUnit.SECONDS));

}

poolExecutor.shutdown();

}

class DemoTask implements Runnable {

private int taskNum;

public DemoTask(int taskNum) {

this.taskNum = taskNum;

}

@Override

public void run() {

Console.log(StringUtils.center("正在执行" + taskNum, 20, "="));

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

Console.log(StringUtils.center("执行完毕" + taskNum, 20, "="));

}

}

=======正在执行0========

poolSize:1

corePoolSize:5

maximumPoolSize:20

queue:0

completedTaskCount:0

largestPoolSize:1

keepAliveTime:2

poolSize:2

corePoolSize:5

maximumPoolSize:20

queue:0

completedTaskCount:0

=======正在执行1========

largestPoolSize:2

keepAliveTime:2

poolSize:3

corePoolSize:5

maximumPoolSize:20

=======正在执行2========

queue:0

completedTaskCount:0

largestPoolSize:3

keepAliveTime:2

poolSize:4

corePoolSize:5

maximumPoolSize:20

queue:0

=======正在执行3========

completedTaskCount:0

largestPoolSize:4

keepAliveTime:2

poolSize:5

corePoolSize:5

=======正在执行4========

maximumPoolSize:20

queue:0

completedTaskCount:0

largestPoolSize:5

keepAliveTime:2

poolSize:5

corePoolSize:5

maximumPoolSize:20

queue:1

completedTaskCount:0

largestPoolSize:5

keepAliveTime:2

poolSize:5

corePoolSize:5

maximumPoolSize:20

queue:2

completedTaskCount:0

largestPoolSize:5

keepAliveTime:2

poolSize:5

corePoolSize:5

maximumPoolSize:20

queue:3

completedTaskCount:0

largestPoolSize:5

keepAliveTime:2

poolSize:5

corePoolSize:5

maximumPoolSize:20

queue:4

completedTaskCount:0

largestPoolSize:5

keepAliveTime:2

poolSize:5

corePoolSize:5

maximumPoolSize:20

queue:5

completedTaskCount:0

largestPoolSize:5

keepAliveTime:2

poolSize:6

corePoolSize:5

maximumPoolSize:20

queue:5

completedTaskCount:0

largestPoolSize:6

keepAliveTime:2

poolSize:7

corePoolSize:5

maximumPoolSize:20

queue:5

completedTaskCount:0

largestPoolSize:7

keepAliveTime:2

=======正在执行11=======

poolSize:8

corePoolSize:5

maximumPoolSize:20

queue:5

completedTaskCount:0

=======正在执行12=======

=======正在执行10=======

largestPoolSize:8

keepAliveTime:2

poolSize:9

corePoolSize:5

=======正在执行13=======

maximumPoolSize:20

queue:5

completedTaskCount:0

largestPoolSize:9

keepAliveTime:2

poolSize:10

corePoolSize:5

maximumPoolSize:20

=======正在执行14=======

queue:5

completedTaskCount:0

largestPoolSize:10

keepAliveTime:2

poolSize:11

corePoolSize:5

maximumPoolSize:20

queue:5

=======正在执行15=======

completedTaskCount:0

largestPoolSize:11

keepAliveTime:2

poolSize:12

corePoolSize:5

maximumPoolSize:20

queue:5

completedTaskCount:0

=======正在执行16=======

largestPoolSize:12

keepAliveTime:2

poolSize:13

corePoolSize:5

maximumPoolSize:20

=======正在执行17=======

queue:5

completedTaskCount:0

largestPoolSize:13

keepAliveTime:2

poolSize:14

corePoolSize:5

maximumPoolSize:20

queue:5

=======正在执行18=======

completedTaskCount:0

largestPoolSize:14

keepAliveTime:2

poolSize:15

corePoolSize:5

maximumPoolSize:20

=======正在执行19=======

queue:5

completedTaskCount:0

largestPoolSize:15

keepAliveTime:2

=======执行完毕0========

=======正在执行5========

=======执行完毕1========

=======执行完毕2========

=======正在执行6========

=======正在执行7========

=======执行完毕4========

=======正在执行8========

=======执行完毕3========

=======正在执行9========

=======执行完毕13=======

=======执行完毕12=======

=======执行完毕10=======

=======执行完毕11=======

=======执行完毕15=======

=======执行完毕16=======

=======执行完毕14=======

=======执行完毕19=======

=======执行完毕18=======

=======执行完毕17=======

=======执行完毕5========

=======执行完毕7========

=======执行完毕6========

=======执行完毕8========

=======执行完毕9========

怎么理解无界队列和有界队列

有界队列 1.初始的poolSize < corePoolSize,提交的runnable任务,会直接做为new一个Thread的参数,立马执行 。 2.当提交的任务数超过了corePoolSize,会将当前的runable提交到一个block queue中。 3.有界队列满了之后,如果poolSize < maximumPoolsize时,会尝试new 一个Thread的进行救急处理,立马执行对应的runnable任务。 4.如果3中也无法处理了,就会走到第四步执行reject操作。无界队列 与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加,若后续仍有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略。

相关推荐

手机屏幕一直抖动怎么回事
《神武2》新手入坑看这里:门派选择+卡级必备
乐视会员
365bet足球平台

乐视会员

📅 07-02 👁️ 6154
正确的油耗计算公式,汽车油耗怎么算多少钱一公里
银箔做旧工艺是怎么弄的?
绝地求生掉帧严重怎么解决?五大实战技巧,让掉帧成为过去式!
爱奇艺会员怎么在电脑上登录
2025年最新苹果礼品卡购买和使用保姆级教程
详细步骤教你如何在手机和电脑上安装QQ并快速上手