创建线程的几种方式?
- 继承 Thread 类创建线程;
- 实现 Runnable 接口创建线程;
- 通过 Callable 和 Future 创建线程;
- 通过线程池创建线程。
评论
- 其实创建线程就只有两种方式,一种就是直接通过继承Thread类然后实现里面的run()方法来实现,还有一种就是通过实现Runnable接口,然后把这个实现类传进Thread构造函数里面。这个通过Callable来实现的方法是因为FutureTask实现了Runnable接口,FutureTask可以接收一个Collable参数,然后把这个FutureTask传进Thread里面,FutureTask调用run方法的时候,run方法会调用Callable的call方法然后把call方法的返回值赋给outcome属性。
我的学习
- 继承
Thread
类
class MyThread extends Thread {
public void run() {
System.out.println("通过继承Thread类的方式创建线程");
}
}
public class ThreadExample {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
}
}
- 实现
Runnable
接口
class MyRunnable implements Runnable {
public void run() {
System.out.println("通过实现Runnable接口的方式创建线程");
}
}
public class RunnableExample {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
- 使用
Callable
和Future
import java.util.concurrent.*;
class MyCallable implements Callable<Integer> {
public Integer call() throws Exception {
// 执行任务,返回结果
return 123;
}
}
public class CallableExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<Integer> callable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(callable);
Thread thread = new Thread(futureTask);
thread.start();
// 等待任务执行完毕,并获取其结果
Integer result = futureTask.get();
System.out.println("通过Callable和FutureTask方式创建的线程,结果为:" + result);
}
}
- 使用
ExceutotService
import java.util.concurrent.*;
public class ExecutorServiceExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(new Runnable() {
public void run() {
System.out.println("通过ExecutorService方式创建的线程");
}
});
executor.shutdown(); // 关闭ExecutorService,不再接受新任务
}
}
为什么需要线程池呢?
- 线程复用
- 控制最大并发数
- 管理线程的生命周期
Executors
创建一个线程池,重复使用固定数量的线程,这些线程在共享的无界队列上运行。 在任何时候,最多会有nThreads个线程在处理任务。如果在所有线程都处于活动 状态时提交了其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在 关闭前的执行过程中因故障而终止,则会有新的线程代替它执行后续任务。线程池 中的线程将一直存在,直到明确关闭为止。
Executors.newFixedThreadPool(int nThreads)
创建一个线程池,根据需要创建新线程,但在有可用线程时 会重复使用以前构建的 线程。这些线程池通常能提高执行大量短期异步任务的程序的性能。如果有可用线 程,执行调用将重用以前构建的线程。如果没有可用的线程,则会创建一个新线程 并添加到线程池中。六十秒内未使用的线程将被终止并从缓存中删除。因此,闲置 足够长时间的线程池不会消耗任何资源。请注意,可以使用ThreadPoolExecutor 构造函数创建具有类似属性但细节(例如超时参数)不同的线程池。
Executors.newCachedThreadPool()
创建一个 Executor,该 Executor 使用单个工作线程在无界队列中运行。(但 要注意的是,如果在关闭前的执行过程中出现故障,导致单线程终止,那么在需要 执行后续任务时,会有新的线程取而代之)。任务保证按顺序执行,任何时候都不 会有一个以上的任务处于活动状态。与newFixedThreadPool(1)不同的是,保 证返回的执行器不能重新配置以使用额外的线程
Executors.newSingleThreadExecutor()
创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。
Executors.newScheduledThreadPool(int corePoolSize)
线程池的运行过程
- 刚开始运行时,线程池是空的。
- 一个任务进来,检查池中的线程数量,是否达到 corePoolSize, 如果没有达到,则创建线程,执行任务。
- 任务执行完成后,线程不会销毁,而是阻塞,等待下一个任务。
- 又进来一个任务,不是直接使用阻塞的线程,而是检查线程池中的线程数大小,是否达到 corePoolSize,如果没有达到, 则继续创建新的线程,来执行新的任务,如此往复, 直到线程池中的线程数达到 corePoolSize ,此时停止创建新的线程。
- 此时,又来新的任务,会选择线程池中阻塞等待的线程来执行任务,有一个任务进来,唤醒一个线程来执行这个任务,处理完之后,再次阻塞,尝试在 workQueue 上获取 下一个任务,如果线程池中没有可唤醒的线程, 则任务进入 workQueue,排队等待。
- 如果队列是无界队列,比如 LinkedBlockingQueue,默认最大容量为 Integer.Max,接近于无界, 可用无限制的接收任务,如果任务是有界队列,比如 ArrayBlockingQueue,可限定队列大小,当线程池中的线程来不及处理, 然后,所有的任务都进入队列,队列的任务数也达到限定大小,此时,再来新的任务,就会入队失败,然后,就会再次尝试在线程池创建线程, 直到线程数达到 maximumPoolSize,停止创建线程。
- 此时,队列满了,新的任务无法入队。创建的线程数也达到了 maximumPoolSize,无法再创建新的线程,此时,就会 reject,使用拒绝策略 RejectedExecutionHandler,不让继续提交任务,默认的是 AbortPolicy 策略,拒绝,并抛出异常。
- 超出 corePoolSize 数创建的那部分线程,是跟空闲时间 keepAliveTime 相关的,如果超过 keepAliveTime 时间还获取不到 任务,线程会被销毁,自动释放掉。
源码跟踪
- 构造函数设置了一些核心参数
int corePoolSize
核心线程数:保留在线程池中的线程数,即使这些线程处于空闲状态,除非设置了 allowCoreThreadTimeoutint maximumPoolSize
最大线程数:允许在线程池中保留的最大线程数long keepAliveTime
线程的存活时间:当线程数大于内核数是,这是多余的空闲线程在终止前等待新任务的最长时间。TimeUnit unit
keepAliveTime
参数的单位BlockingQueue<Runnable> workQueue
工作队列:在执行任务前用于保存任务的队列。此队列将仅保留执行方法提交的Runnable
任务。LinkedBlockingQueue
一个由链表结构支持的可选有界阻塞队列,如果不指定容量,那么默认容量将等于Integer.MAX_VALUE
。ArrayBlockingQueue
一个由数组支持的有界阻塞队列,必须在创建时指定队列的大小。 不会自动扩容。PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列SynchronousQueue
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。DelayedWorkQueue
一个基于堆的无界阻塞队列
ThreadFactory
threadFactory
线程工厂:执行程序创建新线程时使用的工厂RejectedExecutionHandler
handler 拒绝策略:当执行因达到线程边界和队列容量而受阻时使用的处理程序。 下面四种拒绝策略可以看下源码,很简单。CallerRunsPolicy
直接拒绝任务,在调用者线程中直接执行被拒绝执行任务的 run 方法,除非线程池已经shutdown
,则直接抛弃任务。AbortPolicy
直接丢弃任务,并抛出异常。 抛出的异常是RejectedExecutionException
。DiscardPolicy
直接丢弃任务,什么都不做。DiscardOlderPolicy
尝试添加新任务。 抛弃进入队列最早的那个任务,任何尝试把这次拒绝的任务放入队列。
execute
源码
为什么不建议使用 Executors 创建线程,而使用 ThreadPoolExecute 实现类来创建线程?
Executors
创建的 FixedThreadPool
和 SingleThreadExecutor
使
用的是 LinkedBlockingQueue
阻塞队列,默认大小是 Integer.MAX_VALUE
,
可以无限制的将任务添加到队列中。CachedThreadPool
允许创建线程的数量为
Integer.MAX_VALUE
,可能会创建大量的线程。在极端的情况下会导致
JVM OOM
,系统就挂了。
线程池调优
- 高并发、任务执行时间短,此类任务可以充分利用 CPU,尽可能减少上下文切换,线程池的线程数可以设置为 CPU core + 1
- 并发不高,任务执行时间长
- IO 密集 2 * CPU core
- CPU 密集 CPU core + 1
- 高并发、业务执行时间长 考虑拆分、解耦、部分数据缓存,增加服务器。
tasks 每秒的任务数 taskCost 每个任务花费的时间 responseTime 系统容忍的最大响应时间
corePoolSize = tasks * taskCost / responseTime
实际场景
- 任务数多但资源占用不大: 电商平台的消息推送或短信通知
BlockingQueue queue = newArrayBlockingQueue<>(4096);
ThreadPoolExecutor executor = newThreadPoolExecutor(16, 16, 0, TimeUnit.SECONDS, queue);
- 任务数不多但资源占用大: 日志收集、图片流压缩或批量订单处理等场景
BlockingQueue queue = newArrayBlockingQueue<>(512);
ThreadPoolExecutor executor =newThreadPoolExecutor(16, 64, 30, TimeUnit.SECONDS, queue);
- 极端情况,任务多,资源占用大
BlockingQueue queue = newSynchronousQueue<>();
ThreadPoolExecutor executor =newThreadPoolExecutor(64, 64, 0, TimeUnit.SECONDS, queue);