总结:线程池的特点是,在线程的数量=corePoolSize后,仅任务队列满了之后,才会从任务队列中取出一个任务,然后构造一个新的线程,循环往复直到线程数量达到maximumPoolSize执行拒绝策略。
线程池-intsmaze
自定义线程池-intsmaze
如果观察jdk提供的各种线程池的源码实现可以发现,除了jdk8新增的线程池newWorkStealingPool以外,都是基于对ThreadPoolExecutor的封装实现,所以首先讲解ThreadPoolExecutor的具体功能。
ThreadPoolExecutor详解-intsmaze
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:指定线程池中线程数量
maximumPoolSize:最大线程数量
keepAliveTime:线程数量超过corePoolSize时,多于的空闲线程的存活时间(超过这段时间,该空闲线程会被销毁)。
unit:keepAliveTime的时间单位
workQueue:任务队列,提交但是未被执行的任务
threadFactory:创建线程的线程工厂,默认即可
ExecutorService es = new ThreadPoolExecutor(3, 8, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("discard"); } });
任务队列--存放runnable对象-intsmaze
拒绝策略-intsmaze
线程池中的线程用完了,同时等待队列中的任务已经塞满了,再也塞不下新任务了,就需要拒绝策略:处理任务数量超过系统实际承受能力时,处理方式。CallerRunsPolicy:该策略直接在调用者线程中运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是任务提交线程的性能极有可能会急剧下降。
DiscardOldestPolicy:将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
DiscardPolicy:默默丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这可能是最好的一种解决方案。在线程池不空闲的时候,提交的任务都将丢弃,当有空闲的线程时提交的任务会执行。
下面是jdk的拒绝策略源码-intsmaze
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * 直接在调用者线程中运行当前被丢弃的任务,要注意这里是调用Runnable的run()方法,而不是start()方法启动线程,run()以普通方法的形式在主线程中执行任务,会阻塞 * 后面es.submit(new MyTask(i))方法的执行 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * 将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
总结:AbortPolicy策略下,我们要catch异常,这样我们可以捕获到哪些任务被丢弃了。如果采用其他的策略,丢弃的任务无法定位的,只能通过下列程序中es.submit(new MyTask(i));任务之前打印该任务,运行任务的run()逻辑是,在打印任务信息,两处日志比对来定位哪些任务被丢弃了。
public class MyTask implements Runnable{ private int number; public MyTask(int number) { super(); this.number = number; } public void run() { System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }} public static void main(String[] args) {// ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, // new ArrayBlockingQueue(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,// new ArrayBlockingQueue (5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy()); // ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,// new ArrayBlockingQueue (5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy()); ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, new ArrayBlockingQueue (5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy()); for(int i=0;i<10000;i++) { try { System.out.println(i); es.submit(new MyTask(i)); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); System.out.println("------------------------"+i); } } }
线程池执行逻辑源码解析-intsmaze
public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current { @code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * { @code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if { @code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. *如果少于corePoolSize线程正在运行,首先尝试用给定的命令启动一个新的线程任务。 自动调用addWorker检查runState和workerCount, * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. *如果任务可以成功排队,那么我们仍然需要 仔细检查我们是否应该添加一个线程 (因为现有的自从上次检查后死亡)或者那个 自进入该方法以来,该池关闭。 所以我们 重新检查状态,如果有必要的话回滚队列 停止,或者如果没有的话就开始一个新的线程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command);//队列满了,执行拒绝策略 else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } final void reject(Runnable command) { handler.rejectedExecution(command, this);//这里就是调用我们传入的拒绝策略对象的方法 } /** * Dispatch an uncaught exception to the handler. This method is * intended to be called only by the JVM. */ private void dispatchUncaughtException(Throwable e) { getUncaughtExceptionHandler().uncaughtException(this, e); }
常见线程池的构造方法-intsmaze
newFixedThreadPoo-intsmaze
任务队列为LinkedBlockingQueue中(长度无限),线程数量和最大线程数量相同。功能参考前面的任务队列总结。
ExecutorService es=Executors.newFixedThreadPool(5);//参数同时指定线程池中线程数量为5,最大线程数量为5public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}
newSingleThreadExecutor-intsmaze
ExecutorService es=Executors.newSingleThreadExecutor();//线程池中线程数量和最大线程数量均为1.public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));}
newCachedThreadPool-intsmaze
ExecutorService es=Executors.newCachedThreadPool();//指定线程池中线程数量为0,最大线程数量为Integer.MAX_VALUE,任务队列为SynchronousQueuepublic static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());}
newScheduledThreadPool- -定时线程-intsmaze
任务队列为new DelayedWorkQueue(),返回的对象在ExecutorService接口上扩展了在指定时间执行某认为的功能,在某个固定的延时之后执行或周期性执行某个任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);}
newSingleThreadScheduledExecutor- -定时线程-intsmaze
相当于newScheduledThreadPool(int corePoolSize)中corePoolSize设置为1。
ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();
延迟线程池
class MyScheduledTask implements Runnable{ private String tname; public MyScheduledTask(String tname) { this.tname=tname; } public void run() { System.out.println(tname+"任务时延2秒执行!!!"); }}public class intsmaze{ public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool =Executors.newScheduledThreadPool(2); MyScheduledTask mt1=new MyScheduledTask("MT1"); scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS); }}
newWorkStealingPool java8新增连接池-intsmaze
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }//创建指定数量的线程池来执行给定的并行级别,还会使用多个队列减少竞争 public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }//前一个方法的简化,如果当前机器有4个CPU,则目标的并行级别被设置为4。
关闭线程池(很少使用,除了切换数据源时需要控制)-intsmaze
线程池优化-intsmaze
Runtime.getRuntime().availableProcessors()
取得可以CPU数量。