跳到主要内容

创建线程的几种方式?

  1. 继承 Thread 类创建线程;
  2. 实现 Runnable 接口创建线程;
  3. 通过 Callable 和 Future 创建线程;
  4. 通过线程池创建线程。

评论

  • 其实创建线程就只有两种方式,一种就是直接通过继承Thread类然后实现里面的run()方法来实现,还有一种就是通过实现Runnable接口,然后把这个实现类传进Thread构造函数里面。这个通过Callable来实现的方法是因为FutureTask实现了Runnable接口,FutureTask可以接收一个Collable参数,然后把这个FutureTask传进Thread里面,FutureTask调用run方法的时候,run方法会调用Callable的call方法然后把call方法的返回值赋给outcome属性。

我的学习

  1. 继承 Thread
class MyThread extends Thread {
public void run() {
System.out.println("通过继承Thread类的方式创建线程");
}
}

public class ThreadExample {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
}
}

  1. 实现 Runnable 接口
class MyRunnable implements Runnable {
public void run() {
System.out.println("通过实现Runnable接口的方式创建线程");
}
}

public class RunnableExample {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
  1. 使用 CallableFuture
import java.util.concurrent.*;

class MyCallable implements Callable<Integer> {
public Integer call() throws Exception {
// 执行任务,返回结果
return 123;
}
}

public class CallableExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<Integer> callable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(callable);
Thread thread = new Thread(futureTask);
thread.start();

// 等待任务执行完毕,并获取其结果
Integer result = futureTask.get();
System.out.println("通过Callable和FutureTask方式创建的线程,结果为:" + result);
}
}
  1. 使用 ExceutotService
import java.util.concurrent.*;

public class ExecutorServiceExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);

executor.submit(new Runnable() {
public void run() {
System.out.println("通过ExecutorService方式创建的线程");
}
});

executor.shutdown(); // 关闭ExecutorService,不再接受新任务
}
}

为什么需要线程池呢?

  • 线程复用
  • 控制最大并发数
  • 管理线程的生命周期

Executors

创建一个线程池,重复使用固定数量的线程,这些线程在共享的无界队列上运行。 在任何时候,最多会有nThreads个线程在处理任务。如果在所有线程都处于活动 状态时提交了其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在 关闭前的执行过程中因故障而终止,则会有新的线程代替它执行后续任务。线程池 中的线程将一直存在,直到明确关闭为止。

Executors.newFixedThreadPool(int nThreads)

创建一个线程池,根据需要创建新线程,但在有可用线程时会重复使用以前构建的 线程。这些线程池通常能提高执行大量短期异步任务的程序的性能。如果有可用线 程,执行调用将重用以前构建的线程。如果没有可用的线程,则会创建一个新线程 并添加到线程池中。六十秒内未使用的线程将被终止并从缓存中删除。因此,闲置 足够长时间的线程池不会消耗任何资源。请注意,可以使用ThreadPoolExecutor 构造函数创建具有类似属性但细节(例如超时参数)不同的线程池。

Executors.newCachedThreadPool()

创建一个 Executor,该 Executor 使用单个工作线程在无界队列中运行。(但 要注意的是,如果在关闭前的执行过程中出现故障,导致单线程终止,那么在需要 执行后续任务时,会有新的线程取而代之)。任务保证按顺序执行,任何时候都不 会有一个以上的任务处于活动状态。与newFixedThreadPool(1)不同的是,保 证返回的执行器不能重新配置以使用额外的线程

Executors.newSingleThreadExecutor()

创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。

Executors.newScheduledThreadPool(int corePoolSize)

线程池的运行过程

  1. 刚开始运行时,线程池是空的。
  2. 一个任务进来,检查池中的线程数量,是否达到 corePoolSize, 如果没有达到,则创建线程,执行任务。
  3. 任务执行完成后,线程不会销毁,而是阻塞,等待下一个任务。
  4. 又进来一个任务,不是直接使用阻塞的线程,而是检查线程池中的线程数大小,是否达到 corePoolSize,如果没有达到,则继续创建新的线程,来执行新的任务,如此往复, 直到线程池中的线程数达到 corePoolSize ,此时停止创建新的线程。
  5. 此时,又来新的任务,会选择线程池中阻塞等待的线程来执行任务,有一个任务进来,唤醒一个线程来执行这个任务,处理完之后,再次阻塞,尝试在 workQueue 上获取 下一个任务,如果线程池中没有可唤醒的线程, 则任务进入 workQueue,排队等待。
  6. 如果队列是无界队列,比如 LinkedBlockingQueue,默认最大容量为 Integer.Max,接近于无界, 可用无限制的接收任务,如果任务是有界队列,比如 ArrayBlockingQueue,可限定队列大小,当线程池中的线程来不及处理, 然后,所有的任务都进入队列,队列的任务数也达到限定大小,此时,再来新的任务,就会入队失败,然后,就会再次尝试在线程池创建线程, 直到线程数达到 maximumPoolSize,停止创建线程。
  7. 此时,队列满了,新的任务无法入队。创建的线程数也达到了 maximumPoolSize,无法再创建新的线程,此时,就会 reject,使用拒绝策略 RejectedExecutionHandler,不让继续提交任务,默认的是 AbortPolicy 策略,拒绝,并抛出异常。
  8. 超出 corePoolSize 数创建的那部分线程,是跟空闲时间 keepAliveTime 相关的,如果超过 keepAliveTime 时间还获取不到 任务,线程会被销毁,自动释放掉。

源码跟踪

  1. 构造函数设置了一些核心参数
  • int corePoolSize 核心线程数:保留在线程池中的线程数,即使这些线程处于空闲状态,除非设置了 allowCoreThreadTimeout
  • int maximumPoolSize 最大线程数:允许在线程池中保留的最大线程数
  • long keepAliveTime 线程的存活时间:当线程数大于内核数是,这是多余的空闲线程在终止前等待新任务的最长时间。
  • TimeUnit unit keepAliveTime 参数的单位
  • BlockingQueue<Runnable> workQueue 工作队列:在执行任务前用于保存任务的队列。此队列将仅保留执行方法提交的 Runnable 任务。
    • LinkedBlockingQueue 一个由链表结构支持的可选有界阻塞队列,如果不指定容量,那么默认容量将等于 Integer.MAX_VALUE
    • ArrayBlockingQueue 一个由数组支持的有界阻塞队列,必须在创建时指定队列的大小。 不会自动扩容。
    • PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列
    • SynchronousQueue 一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。
    • DelayedWorkQueue 一个基于无界阻塞队列
  • ThreadFactory threadFactory 线程工厂:执行程序创建新线程时使用的工厂
  • RejectedExecutionHandler handler 拒绝策略:当执行因达到线程边界和队列容量而受阻时使用的处理程序。 下面四种拒绝策略可以看下源码,很简单。
    • CallerRunsPolicy 直接拒绝任务,在调用者线程中直接执行被拒绝执行任务的 run 方法,除非线程池已经 shutdown,则直接抛弃任务。
    • AbortPolicy 直接丢弃任务,并抛出异常。 抛出的异常是 RejectedExecutionException
    • DiscardPolicy 直接丢弃任务,什么都不做。
    • DiscardOlderPolicy 尝试添加新任务。 抛弃进入队列最早的那个任务,任何尝试把这次拒绝的任务放入队列。
  1. execute 源码

为什么不建议使用 Executors 创建线程,而使用 ThreadPoolExecute 实现类来创建线程?

Executors 创建的 FixedThreadPoolSingleThreadExecutor 使 用的是 LinkedBlockingQueue 阻塞队列,默认大小是 Integer.MAX_VALUE, 可以无限制的将任务添加到队列中。CachedThreadPool 允许创建线程的数量为 Integer.MAX_VALUE,可能会创建大量的线程。在极端的情况下会导致 JVM OOM,系统就挂了。

线程池调优

  • 高并发、任务执行时间短,此类任务可以充分利用 CPU,尽可能减少上下文切换,线程池的线程数可以设置为 CPU core + 1
  • 并发不高,任务执行时间长
    • IO 密集 2 * CPU core
    • CPU 密集 CPU core + 1
  • 高并发、业务执行时间长 考虑拆分、解耦、部分数据缓存,增加服务器。

tasks 每秒的任务数 taskCost 每个任务花费的时间 responseTime 系统容忍的最大响应时间

corePoolSize = tasks * taskCost / responseTime

实际场景

  1. 任务数多但资源占用不大: 电商平台的消息推送或短信通知
BlockingQueue queue = newArrayBlockingQueue<>(4096);
ThreadPoolExecutor executor = newThreadPoolExecutor(16, 16, 0, TimeUnit.SECONDS, queue);
  1. 任务数不多但资源占用大: 日志收集、图片流压缩或批量订单处理等场景
BlockingQueue queue = newArrayBlockingQueue<>(512);
ThreadPoolExecutor executor =newThreadPoolExecutor(16, 64, 30, TimeUnit.SECONDS, queue);
  1. 极端情况,任务多,资源占用大
BlockingQueue queue = newSynchronousQueue<>();
ThreadPoolExecutor executor =newThreadPoolExecutor(64, 64, 0, TimeUnit.SECONDS, queue);