目录
一、ThreadPool线程池
1、概述
2、线程池的架构
3、线程池使用方式
4、线程池底层原理
5、线程池的七个参数
6、线程池底层工作流程
7、拒绝策略
8、自定义线程池
二、Fork/Join分支合并框架
1、概述
2、Fork/Join分支合并框架
三、CompletableFuture异步回调
一、ThreadPool线程池
1、概述
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销。进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
线程池的主要特点:
1、降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
2、提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
3、提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
2、线程池的架构
3、线程池使用方式
Executors工具类
Executors常用方法:
1.static ExecutorService newFixedThreadPool(int nThreads):创建一个可重用固定线程数的
线程池,以共享的无界队列方式来运行这些线程。
2.static ExecutorService newSingleThreadExecutor():创建一个使用单个 worker 线程的
Executor,以无界队列方式来运行该线程。
3.static ExecutorService newCachedThreadPool():创建一个可根据需要创建新线程的线程池,
但是在以前构造的线程可用时将重用它们。
Executors.newFixedThreadPool(int nThreads):一池N线程。
Executors.newSingleThreadExecutor():一个任务一个任务执行,一池一线程。
Executors.newCachedThreadPool():线程池根据需求创建线程,可扩容,遇强则强。
newFixedThreadPool()示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
try {
for (int i = 1; i <=10; i++) {
threadPool1.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
}catch (Exception e) {
e.printStackTrace();
} finally {
threadPool1.shutdown();
}
}
}
newSingleThreadExecutor();示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
try {
for (int i = 1; i <=10; i++) {
threadPool2.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
}catch (Exception e) {
e.printStackTrace();
} finally {
threadPool2.shutdown();
}
}
}
newCachedThreadPool();示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
ExecutorService threadPool3 = Executors.newCachedThreadPool();
try {
for (int i = 1; i <=10; i++) {
threadPool3.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
}catch (Exception e) {
e.printStackTrace();
} finally {
threadPool3.shutdown();
}
}
}
4、线程池底层原理
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ExecutorService threadPool3 = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
上面三个多线程底层创建都是new ThreadPoolExecutor();
5、线程池的七个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
int corePoolSize:常驻线程数量(核心)
int maximumPoolSize:最大线程数量
long keepAliveTime:线程存活时间
TimeUnit unit:线程存活时间
BlockingQueue<Runnable> workQueue:阻塞队列
ThreadFactory threadFactory:线程工厂
RejectedExecutionHandler handler:拒绝策略
6、线程池底层工作流程
如上图所示:
线程池工作流程:线程池执行execute()方法,才开始创建线程。线程池先执行①corePool(常驻线程)中的任务,其次把任务放到②阻塞队列中,再其次任务放到③最大线程池剩余空间中(并新创建线程执行③中的任务,而②中的阻塞任务会继续等待),还超过上述所有个数之和的情况下,会启动④也就是拒绝策略生效了。
7、拒绝策略
AbortPolic(默认):直接抛出RejectedExecutionException异常阻止系统正常运行。
CallerRunsPolicy:"调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交当前任务。
DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的策略。
8、自定义线程池
阿里巴巴Java开发手册
[强制]线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会堆积大量的线程,从而导致OOM。
=============================================================
自定义线程池示例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(2,5,2L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3)
,Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 1; i <=10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
}catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
二、Fork/Join分支合并框架
1、概述
Fork/Join它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情:
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
2、Fork/Join分支合并框架
RecursiveTask 递归任务:继承后可以实现递归(自己调自己)调用的任务。
@since 1.7
public abstract class RecursiveTask<V> extends ForkJoinTask<V>
RecursiveTask构造方法:
1.RecursiveTask()
RecursiveTask方法:
1.protected abstract V compute():此任务执行的主要计算。
2.protected boolean exec():实现递归任务的执行约定。
3.V getRawResult():返回ForkJoinTask将返回的结果。join(),即使此任务异常完成,
或者如果未知此任务已完成,则为null。
4.protected void setRawResult(V value):强制返回给定的值作为结果。
-----------------------------------------------------------------------
ForkJoinTask<V>
@since 1.7
public abstract class ForkJoinTask<V> extends Object
implements Future<V>, Serializable
ForkJoinTask<V>构造方法:
1.ForkJoinTask()
ForkJoinTask<V>常用方法:
1.ForkJoinTask<V> fork():安排异步执行此任务。
2.V get():如有必要,等待计算完成,然后检索其结果。
3.V join():完成计算后返回计算结果。
-----------------------------------------------------------------------
ForkJoinPool
public class ForkJoinPool extends AbstractExecutorService
ForkJoinPool构造方法:
1.ForkJoinPool()
ForkJoinPool常用方法:
1.void execute(ForkJoinTask<?> task):安排(异步)执行给定任务。
2.void execute(Runnable task):在将来的某个时间执行给定的命令。
3.<T> ForkJoinTask<T> submit(Callable<T> task):提交一个返回值的任务以供执行,
并返回一个表示任务挂起结果的未来。
4.<T> ForkJoinTask<T> submit(ForkJoinTask<T> task):提交要执行的ForkJoinTask。
5.ForkJoinTask<?> submit(Runnable task):提交可运行任务以执行,并返回表示该任务的未来。
6.<T> ForkJoinTask<T> submit(Runnable task, T result):提交可运行任务以执行,
并返回表示该任务的未来。
7.void shutdown():启动有序关机,执行以前提交的任务,但不接受新任务。
Fork/Join示例:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
//求1+2+3+....100。需要拆分任务不超过10个数进行相加拆分
public class ForkJoinDemo {
public static void main(String[] args) throws Exception {
MyTask myTask = new MyTask(0,100);
//创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
//获取最终合并之后结果
Integer result = forkJoinTask.get();
System.out.println(result);
//关闭池对象
forkJoinPool.shutdown();
}
}
@SuppressWarnings("serial")
class MyTask extends RecursiveTask<Integer>{
private static final Integer VALUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
//判断
if ((end-begin) <= VALUE) {
for (int i = begin; i <= end; i++) {
result = result + i;
}
}else {
int middle = (begin+end)/2;
//拆分左边
MyTask task01 = new MyTask(begin, middle);
//拆分右边
MyTask task02 = new MyTask(middle+1, end);
//调用方法拆分
task01.fork();
task02.fork();
//合并结果
result = task01.join() +task02.join();
}
return result;
}
}
三、CompletableFuture异步回调
同步:
异步:
CompletableFuture异步回调示例:
import java.util.concurrent.CompletableFuture;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
//同步调用
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName() + " :completableFuture1");
});
completableFuture1.get();
//异步调用
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + " :completableFuture2");
return 1024;
});
completableFuture2.whenComplete((t,u)->{
System.out.println("--t--"+t);//t:表示成功的返回值
System.out.println("--u--"+u);//u:表示失败异常的信息
}).get();
//异步调用
CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + " :completableFuture3");
//模拟异常
int i = 10/0;
return 1024;
});
completableFuture3.whenComplete((t,u)->{
System.out.println("--t--"+t);//t:表示成功的返回值
System.out.println("--u--"+u);//u:表示失败异常的信息
}).get();
}
}
实际开发中一般不会用CompletableFuture异步回调。都会使用MQ消息队列的方式。
Java JUC高并发编程(一)