嘘~ 正在从服务器偷取页面 . . .

Java 并发编程


1. 基本概念

1.1 进程 & 线程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。

线程(thread)操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

1.2 协程

当线程数量非常多的时候,依然会有问题。一是系统线程会占用非常多的内存空间,二是过多的线程切换会占用大量的系统时间。

协程运行在线程之上,当一个协程执行完成后,可以选择主动让出,让另一个协程运行在当前线程之上。协程并没有增加线程数量,只是在线程的基础之上通过分时复用的方式运行多个协程,而且协程的切换在用户态完成,切换的代价比线程从用户态到内核态的代价小很多。

协程切换

操作系统并不知道协程的存在,它只知道线程,因此在协程调用阻塞IO操作的时候,操作系统会让线程进入阻塞状态,当前的协程和其它绑定在该线程之上的协程都会陷入阻塞而得不到调度。

因此在协程中不能调用导致线程阻塞的操作。也就是说,协程只有和异步IO结合起来,才能发挥最大的威力。

一般有2种处理方式,处理在协程中调用阻塞IO的操作:

  1. 在调用阻塞IO操作的时候,重新启动一个线程去执行这个操作,等执行完成后,协程再去读取结果。这其实和多线程没有太大区别;
  2. 对系统的IO进行封装,改成异步调用的方式,这需要大量的工作,最好寄希望于编程语言原生支持。

1.3 唤醒

wait/notify/notifyAll 是最常见的线程等待和线程唤醒的办法。

1.3.1 wait

在使用 wait 方法时,必须把 wait 方法写在 synchronized 保护的 while 代码块中,并始终判断执行条件是否满足,如果满足就往下继续执行,如果不满足就执行 wait 方法,而在执行 wait 方法之前,必须先持有对象的 monitor 锁,也就是通常所说的 synchronized 锁。

public void give(String data) {
    synchronized (this) {
        buffer.add(data);
        notify();
    }
}

public String take() throws InterruptedException {
    synchronized (this) {
        while (buffer.isEmpty()) {
            wait();
        }
        return buffer.remove();
    }
}

如果没有同步代码块,在“判断-执行”过程中会产生线程安全问题。消费者可能会一直等待。另外,wait 方法会释放 monitor 锁,这也要求必须首先进入到 synchronized 内持有这把锁。

虚拟唤醒

唤醒-获取锁并不是原子性的,唤醒之后锁可能被其他线程获取。

  1. 线程获取不到锁被阻塞,会在 Contention List 上等待;
  2. 获取到锁的线程调用 wait 后,会主动放弃锁,并在 Wait Set 中等待唤醒;
  3. 线程调用 notify 后,在退出 Synchronized 块释放锁后才会执行唤醒操作。

针对于 wait 代码,需要更改 if 代码为 while。

1.3.2 sleep

为什么 wait/notify/notifyAll 方法被定义在 Object 类中?而 sleep 方法定义在 Thread 类中?

  1. Java 中每个对象都有一把称之为 monitor 监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll 也都是锁级别的操作,它们的锁属于对象,所以把它们定义在 Object 类中是最合适,因为 Object 类是所有对象的父类;
  2. 如果把 wait/notify/notifyAll 方法定义在 Thread 类中,会带来很大的局限性,比如一个线程可能持有多把锁,以便实现相互配合的复杂逻辑,假设此时 wait 方法定义在 Thread 类中,如何实现让一个线程持有多把锁呢?又如何明确线程等待的是哪把锁呢?既然我们是让当前线程去等待某个对象的锁,自然应该通过操作对象来实现,而不是操作线程。

wait/notify & sleep

  1. wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求;
  2. 在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放 monitor 锁;
  3. sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复;
  4. wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法。

1.4 并发 & 并行

串行表示所有任务都一一按先后顺序进行。

并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。

并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行。通过线程间的快速切换,在宏观上同时进行,在微观上分布进行的情况。

1.5 管程

管程(monitor,也就是平时说的锁)是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现)。但是这样并不能保证进程以设计的顺序执行。
JVM 中同步是基于进入和退出管程(monitor)对象实现的,每个对象都会有一个管程(monitor)对象,管程(monitor)会随着 Java 对象一同创建和销毁。执行线程首先要持有管程对象,然后才能执行方法,当方法完成之后会释放管程,方法在执行时候会持有管程,其他线程无法再获取同一个管程。

对共享数据的互斥访问,管程与临界区相比,最大的区别就是:一个线程进入临界区后,必须要一直执行临界区的代码直到结束,才能离开临界区;而管程中执行的线程可以临时放弃对管程的互斥访问,挂起后等待事件,之后再恢复。当某线程临时放弃互斥访问时,允许其它线程执行互斥访问。

1.6 用户/守护线程

Java 中线程可分为:用户线程(普通线程)、守护线程(后台线程)。

所谓守护线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。反过来说,只要任何非守护线程还在运行,程序就不会终止。

用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。

public class Test1 {
    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() 
                               + "::" 
                               + Thread.currentThread().isDaemon());
            while (true) {

            }
        }, "aa");
        // 设置 thread1 为守护线程,需要在 start() 方法之前设置
        thread1.setDaemon(true);
        thread1.start();
        System.out.println(Thread.currentThread().getName() + " over");
        // 如果没有设置守护线程,用户线程没有结束,JVM 也没有结束
    }
}

1.7 JMM

JMM(Java 内存模型 Java Memory Model,简称 JMM)本身是一种抽象的概念 并不真实存在,它描述的是一组规则或规范通过规范定制了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。

JMM 模型

JMM 与处理器、缓存、并发、编译器有关。它解决了 CPU 多级缓存、处理器优化、指令重排等导致的结果不可预期的问题。JMM 是和多线程相关的一组规范,需要各个 JVM 的实现来遵守 JMM 规范,即便同一个程序在不同的虚拟机上运行,得到的程序结果也是一致的。

1.7.1 原子性操作

  1. lock(锁定):作用于主内存的变量,把一个变量标记为一条线程独占状态;
  2. unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定;
  3. read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的 load 动作使用;
  4. load(载入):作用于工作内存的变量,它把 read 操作从主内存中得到的变量值放入工作内存的变量副本中;
  5. use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎;
  6. assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量;
  7. store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的 write 的操作;
  8. write(写入):作用于工作内存的变量,它把 store 操作从工作内存中的一个变量的值传送到主内存的变量中。

Java 内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行

1.7.2 JMM 规范

在并发编程中,我们通常会遇到以下三个问题:原子性问题,可见性问题,有序性问题。

原子性

即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行.

可见性

可见性与 Java 的内存模型有关,模型采用缓存与主存的方式对变量进行操作。每个线程都有自己的缓存空间,对变量的操作都是在缓存中进行的,之后再将修改后的值返回到主存中。

指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。

//线程1执行的代码
int i = 0;
i = 10;
 
//线程2执行的代码
j = i;

假若执行线程 1 的是 CPU1,执行线程 2 的是 CPU2。当线程 1 执行 i = 10,会先把 i 的初始值加载到 CPU1 的高速缓存中,然后赋值为 10,那么在 CPU1 的高速缓存当中 i 的值变为 10 了,却没有立即写入到主存当中。此时线程 2 执行 j = i,它会先去主存读取 i 的值并加载到 CPU2 的缓存当中,注意此时内存当中 i 的值还是 0,那么就会使得 j 的值为 0,而不是 10。

有序性

即程序执行的顺序按照代码的先后顺序执行。

处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。

在多线程的操作中,重排序可能会导致有条件判断的值提前/过晚结束。

1.7.3 变量读取

由于 JVM 运行程序的实体是线程,而每个线程创建时 JVM 都会为其创建一个工作内存(有些地方称为栈空间),工作内存是每个线程的私有数据区域,而 Java 内存模型中规定所有的变量都存储在主内存(从硬件的角度来说,就是内存条),主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行。

首先要将变量从主内存拷贝到线程自己的工作内存空间(从硬件角度来说就是 CPU 的缓存,,比如寄存器、L1、L2、L3 缓存等),然后对变量进行操作,操作完成后将变量写回主内存,不能直接操作主内存中的变量,各个线程的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成。

主存和本地内存调用

1.7.4 重排序

编译器、JVM 或者 CPU 都有可能出于优化等目的,对于实际指令执行的顺序进行调整。

  • 编译器优化:编译器(包括 JVM、JIT 编译器等)出于优化的目的,例如当前有了数据 a,把对 a 的操作放到一起效率会更高,避免读取 b 后又返回来重新读取 a 的时间开销,此时在编译的过程中会进行一定程度的重排;

  • CPU 优化:CPU 同样会有优化行为,这里的优化和编译器优化类似,都是通过乱序执行的技术来提高整体的执行效率;

  • 内存的“重排序”:内存系统内不存在真正的重排序,但是内存会带来看上去和重排序一样的效果,所以这里的“重排序”打了双引号。由于内存有缓存的存在,在 JMM 里表现为主存和本地内存,而主存和本地内存的内容可能不一致,所以这也会导致程序表现出乱序的行为。

1.8 锁

据分类标准把锁分为以下 7 大类别,分别是:

  • 偏向锁/轻量级锁/重量级锁;
  • 可重入锁/非可重入锁;
  • 共享锁/独占锁;
  • 公平锁/非公平锁;
    公平锁的公平的含义在于如果线程现在拿不到这把锁,那么线程就都会进入等待,开始排队,在等待队列里等待时间长的线程会优先拿到这把锁,有先来先得的意思。而非公平锁就不那么“完美”了,它会在一定情况下,忽略掉已经在排队的线程,发生插队现象;
  • 悲观锁/乐观锁;
  • 自旋锁/非自旋锁:
    自旋锁的理念是如果线程现在拿不到锁,并不直接陷入阻塞或者释放 CPU 资源,而是开始利用循环,不停地尝试获取锁,这个循环过程被形象地比喻为“自旋”,就像是线程在“自我旋转”。相反,非自旋锁的理念就是没有自旋的过程,如果拿不到锁就直接放弃,或者进行其他的处理逻辑,例如去排队、陷入阻塞等;
  • 可中断锁/不可中断锁。

2. 线程基础

线程是调度中的最小单位,也是 Java 中的重点关注对象。

2.1 线程实现

本质上只有一种实现线程的方式,在笼统的回答中,我们可能会获得两种,甚至是四种的答案。

2.1.1 基本方法

// 最经典的是通过调用 Runnable 或者继承 Thread 实现
class RunnableThread implements Runnable {
    @Override
    public void run() {
        System.out.println("Hello, World");
    }
}

class ThreadExtends extends Thread {
    @Override
    public void run() {
        System.out.println("Hello, World");
    }
}

2.1.2 线程池和 Callable

第 3 种方法是通过线程创建线程,比如设置线程池为 10,就会有 10 个子线程。本质上是通过线程工厂创建线程的,默认采用 DefaultThreadFactory,它会给线程池创建的线程设置一些默认值,比如:线程的名字、是否是守护线程,以及线程的优先级等。

private static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

但是无论怎么设置这些属性,最终它还是通过 new Thread() 创建线程的 ,只不过这里的构造函数传入的参数要多一些,由此可以看出通过线程池创建线程并没有脱离最开始的那两种基本的创建方式,因为本质上还是通过 new Thread() 实现的。

第 4 种线程创建方式是通过有返回值的 Callable 创建线程。Runnable 创建线程是无返回值的,而 Callable 和与之相关的 Future、FutureTask,它们可以把线程执行的结果作为返回值返回。

class CallableTask implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        return new Random().nextInt();
    }
}

// 创建对应线程池
ExecutorService service = Executors.newFixedThreadPool(10);
// 提交任务,并用 Future 提交返回结果
Future<Integer> future = service.submit(new CallableTask());

源码中的示例代码

无论是 Callable 还是 FutureTask,它们首先和 Runnable 一样,都是一个任务,是需要被执行的,而不是说它们本身就是线程。它们可以放到线程池中执行,如代码所示,submit() 方法把任务放到线程池中,并由线程池创建线程,不管用什么方法,最终都是靠线程来执行的,而子线程的创建方式仍脱离不了最开始讲的两种基本方式,也就是实现 Runnable 接口和继承 Thread 类。

Runnable & Callable

在实际情况下,多线程会需要返回一个确切值(比如:请求网络、查询数据库)。这一点使用 Runnable 无法实现。

implement Runnable 也无法在内部直接抛出 checked Exception。

// Runnable 接口定义
public interface Runnable {
   public abstract void run();
}

// Callable 接口定义
public interface Callable<V> {
     V call() throws Exception;
}

Future

在出现耗时运算时,把运算的过程放到子线程去执行,再通过 Future 去控制子线程执行的计算过程,最后获取到计算结果。这样一来就可以把整个程序的运行效率提高,是一种异步的思想。

Callable 通过 Future 类的各种方法实现返回值等更加强大的功能。

// Future 接口五种方法
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutExceptio
}
常用方法
get()

5 种可能情况

图中 submit 往线程池中提交了一个 Task,这个 Task 实现了 Callable 接口。

// 示例代码,通过 Callable 返回随机值
public class FutureDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Future<Integer> future = executorService.submit(new CallableClass());
        try {
            System.out.println(future.get());
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            executorService.shutdown();
        }
    }

    static class CallableClass implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(3000);
            return new Random().nextInt();
        }
    }
}
isDone()

即便任务抛出异常,isDone 方法依然会返回 true(异常和执行成功都会返回 true,只表示执行结束)。虽然抛出的异常是 IllegalArgumentException,但是对于 get 而言,它抛出的异常依然是 ExecutionException。虽然在任务执行一开始时就抛出了异常,但是真正要等到我们执行 get 的时候,才看到了异常。

cancel()

调用 cancel 时,会有三种情况:

  • 这个任务就会被正常取消,未来也不会被执行,cancel 返回 true;
  • 如果任务已经完成,或者之前已经被取消过了,那么执行 cancel 方法则代表取消失败,返回 false;
  • 这个任务正在执行,cancel 方法是必须传入一个参数。如果传入的参数是 true,执行任务的线程就会收到一个中断的信号;如果传入的是 false 则就代表不中断正在运行的任务,也就是没有任何作用。
FutureTask

除了使用线程池 submit 返回 future 对象之外,FutureTask 内部 implement RunnableFuture。

RunnableFuture 继承 Runnable 和 Future

// 没有使用线程池,不需要内部类静态类共享
public class FutureTaskDemo {
    public static void main(String[] args) {
        FutureTask<Integer> futureTask = new FutureTask<>(new SumTask());
        new Thread(futureTask).start();
        try {
            System.out.println(futureTask.get());
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

class SumTask implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
}
注意事项

批量循环容易产生 block:

当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制。

如果第一个任务没有设置 timeout,之后的任务就会被卡住,Future 没法像 CPU 那样,通过某种算法决定哪个线程先执行。可以用 Future 的带超时参数的 get(long timeout, TimeUnit unit) 方法来解决这个问题。如果在限定的时间内没能返回结果的话,那么便会抛出一个 TimeoutException 异常。

Future 生命周期不能后退:

Future 的生命周期不能后退,一旦完成了任务,它就永久停在了“已完成”的状态。

CompletableFuture

在 Java 8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

  • CompletableFuture 创建异步任务,一般有 supplyAsync 和 runAsync 两个方法:

    • supplyAsync 执行 CompletableFuture 任务,支持返回值;

    • runAsync 执行 CompletableFuture 任务,没有返回值。

  • CompletableFuture 的 thenRun 方法,在某个任务执行完成后,执行回调方法:

    • 调用 thenRun 方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池

    • 调用 thenRunAsync 执行第二个任务时,则第一个任务使用的是传入的线程池,第二个任务使用的是 ForkJoin 线程池

  • CompletableFuture 的 thenAccept/thenAcceptAsync 方法,将 supplyAsync 的返回值作为参数传递到回调函数中;

  • CompletableFuture 的 thenApply/thenApplyAsync 方法,使用场景同上述,但是具有返回值;
  • CompletableFuture 的 exceptionally,某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法;
  • CompletableFuture 的 whenComplete,某个任务执行完成后,执行的回调方法,无返回值;并且 whenComplete 方法返回的 CompletableFuture 的 result 是上个任务的结果

这些只是 CompletableFuture 一部分方法,还有很多比如用于多任务调度的 AND、OR 等等。

public class CompletableFutureDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(
            () -> System.out.println("runAsync ok!"), executor);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("supplyAsync ok!");
            return "cxy621";
        }, executor);
        System.out.println(future1.join());
        System.out.println(future2.join());

        CompletableFuture<Void> thenFuture = future1.thenRun(
            () -> System.out.println("future1 after!"));
        System.out.println(thenFuture.join());

        CompletableFuture<Void> thenAcceptFuture = future2.thenAcceptAsync((a) -> {
            if ("cxy621".equals(a)) {
                System.out.println("right answer");
            }
        }, executor);
        CompletableFuture<String> thenApplyFuture = future2.thenApplyAsync((a) -> {
            if ("cxy621".equals(a)) {
                System.out.println("nice answer");
            }
            return "get cxy621";
        });

        try {
            System.out.println(thenAcceptFuture.get());
            System.out.println(thenApplyFuture.get());
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } finally {
            executor.shutdown();
        }
    }
}

/*
runAsync ok!
supplyAsync ok!
null
cxy621
future1 after!
null
right answer
null
nice answer
get cxy621
*/
旅游平台问题

对于旅游平台而言,需要从不同公司获取对应票价以帮助用户比对。串行的效果肯定不能支持高效率查看,所以采取并行的方式。

之后就要考虑如果某个 website 加载缓慢,会拖慢整个功能的进度。

public class CompletableFutureDemo {
    public static void main(String[] args) {
        CompletableFutureDemo demo = new CompletableFutureDemo();
        System.out.println(demo.getPrices());
    }

    private Set<Integer> getPrices() {
        // 保证线程安全
        Set<Integer> prices = Collections.synchronizedSet(new HashSet<>());
        CompletableFuture<Void> task1 = CompletableFuture.runAsync(new Task(123, prices));
        CompletableFuture<Void> task2 = CompletableFuture.runAsync(new Task(456, prices));
        CompletableFuture<Void> task3 = CompletableFuture.runAsync(new Task(789, prices));
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);

        try {
            allTasks.get(3, TimeUnit.SECONDS);
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            throw new RuntimeException(e);
        }
        return prices;
    }

    private class Task implements Runnable {
        Integer productId;
        Set<Integer> prices;

        public Task(Integer productId, Set<Integer> prices) {
            this.productId = productId;
            this.prices = prices;
        }

        @Override
        public void run() {
            int price = 0;
            try {
                Thread.sleep((long) (Math.random() * 4000));
                price = (int) (Math.random() * 4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            prices.add(price);
        }
    }
}

通过 CompletableTask 解决多个线程的问题,通过子线程的异步处理。如果出现有任一线程时间超过 3s 的情况,就抛出异常。这样也防止串行中一个个等待的问题,提高效率。

2.1.3 一种方式

/**
* 1. 继承线程类方式
*/
public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("通过继承 Thread 类实现线程");
    }
 
}
// 匿名线程调用启动方法
new MyThread().start()
 
// Runnable 本质也是创建 Thread
/**
*  2. 实现Runnable方式
*/
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("通过实现 Runnable 方式实现线程");
    }
 
}
// 创建 MyRunnable 实例作为 Thread 构造方法的参数,所以还是在 new Thread
MyRunnable runnable = new MyRunnable();
Thread thread = new Thread(runnable);
thread.start();

源码对于 Run() 的解释

启动线程需要调用 start() 方法,而 start() 方法最终还会调用 run() 方法。继承 Thread 类之后,会把上述的 run() 方法重写,重写后 run() 方法里就是所需要执行的任务,但它最终还是需要调用 thread.start() 方法来启动线程,而 start() 方法最终也会调用这个已经被重写的 run() 方法来执行它的任务。

事实上创建线程只有一种方式,就是构造一个 Thread 类,这是创建线程的唯一方式。不同点仅仅在于实现线程运行内容的不同

本质上,实现线程只有一种方式,而要想实现线程执行的内容,却有两种方式,也就是可以通过 实现 Runnable 接口的方式,或是继承 Thread 类重写 run() 方法的方式,把我们想要执行的代码传入,让线程去执行。

2.1.4 优劣

实现 Runnable 接口比继承 Thread 类实现线程要好。

  • 从代码的架构考虑。实际上,Runnable 里只有一个 run() 方法,它定义了需要执行的内容,在这种情况下,实现了 Runnable 与 Thread 类的解耦,Thread 类负责线程启动和属性设置等内容,权责分明;
  • 在某些情况下可以提高性能。使用继承 Thread 类方式,每次执行一次任务,都需要新建一个独立的线程,执行完任务后线程走到生命周期的尽头被销毁,如果还想执行任务,就必须再新建一个继承了 Thread 类的类。
    如果此时执行的内容比较少,比如只是在 run() 方法里简单打印一行文字,那么它所带来的开销并不大,相比于整个线程从开始创建到执行完毕被销毁,这一系列的操作比 run() 方法打印文字本身带来的开销要大得多。
    如果使用实现 Runnable 接口的方式,就可以把任务直接传入线程池,使用一些固定的线程来完成任务,不需要每次新建销毁线程,大大降低了性能开销;
  • Java 语言不支持双继承。如果我们的类一旦继承了 Thread 类,那么它后续就没有办法再继承其他的类,这样一来,如果未来这个类需要继承其他类实现一些功能上的拓展,它就没有办法做到了,相当于限制了代码未来的可拓展性。

2.2 停止线程

对于 Java 而言,最正确的停止线程的方式是使用 interrupt。但 interrupt 仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。

对于强制停止可能会造成一些安全问题。比如:线程正在写入一个文件,这时收到终止信号,它就需要根据自身业务判断,是选择立即停止,还是将整个文件写入成功后停止,而如果选择立即停止就可能造成数据不完整。

调用某个线程的 interrupt() 之后,这个线程的中断标记位就会被设置成 true。每个线程都有这样的标记位,在线程执行中需要定期检查标记位。

while (!Thread.currentThread().isInterrupted() && more work to do) {
    do more work
}

2.2.1 sleep 期间感受中断

如果 sleep、wait 等可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个 InterruptedException 异常,同时清除中断信号,将中断标记位设置成 false。

在团队开发过程中,如果内部调用 sleep/wait,可以在方法中使用 try/catch 或在方法签名中声明 throws InterruptedException。

void subTas() {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        // 在这里不处理该异常是非常不好的
    }
}

2.2.2 处理异常

上述的 try/catch 并没有在 catch 中进行处理,在触发 interrupt 后中并没有针对终端信号的内容。对于 run() 方法而言,它本身没有抛出 checkedException 的能力,只能通过 try/catch 来处理异常。

private void reInterrupt() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        e.printStackTrace();
    }
}

在 catch 语句块中调用 Thread.currentThread().interrupt() 函数。因为如果线程在休眠期间被中断,那么会自动清除中断信号。如果这时手动添加中断信号,中断信号依然可以被捕捉到。这样后续执行的方法依然可以检测到这里发生过中断,可以做出相应的处理,整个线程可以正常退出。

2.3 volatile

volatile 是一个稍弱的同步机制,一般情况下开销比非 volatile 变量稍大一些。

一个共享变量(类的成员变量、类的静态成员变量)被 volatile 修饰之后,那么就具备了两层语义:

  1. 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的;
  2. 禁止进行指令重排序。

一个共享变量被 volatile 修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值,保证了可见性。

volatile 经典案例:

//线程1
boolean stop = false;
while(!stop){
    doSomething();
}
 
//线程2
stop = true;
// 可以针对单个赋值和标志判断,但是符合运算比如 i++ 不行

第一:使用 volatile 关键字会强制将修改的值立即写入主存;

第二:使用 volatile 关键字的话,当线程 2 进行修改时,会导致线程 1 的工作内存中缓存变量 stop 的缓存行无效(反映到硬件层的话,就是 CPU 的 L1 或者 L2 缓存中对应的缓存行无效);

第三:由于线程1 的工作内存中缓存变量 stop 的缓存行无效,所以线程1 再次读取变量 stop 的值时会去主存读取。

Java 内存模型具备一些先天的“有序性”,即不需要通过任何手段就能够得到保证的有序性,这个通常也称为 happens-before 原则。如果两个操作的执行次序无法从 happens-before 原则推导出来,那么它们就不能保证它们的有序性,虚拟机可以随意地对它们进行重排序。

volatile 只能保证可见性,但并不能保证原子性。volatile 可以用来修饰 boolean 类型的标记位,对于标记位来讲,直接的赋值操作本身就是具备原子性。

2.3.1 happens-before

如果仅靠 volatile 和 synchronized 保证有序性,很多操作很变得很啰嗦。

两个操作之间存在 happens-before 关系并不意味着一定会遵守对应的原则,如果重排序的执行结果和原执行结果一致,这种重排序就不会违法。

Happens-before 关系用来描述和可见性相关的问题。

  • 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作;
  • 锁定规则:一个 unLock 操作先行发生于后面对同一个锁额 lock 操作。线程 A 在解锁之前的所有操作,对于线程 B 的对同一个锁的加锁之后的所有操作而言,都是可见的;
  • volatile 变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;
  • 传递规则:如果操作 A 先行发生于操作 B,而操作 B 又先行发生于操作 C,则可以得出操作 A 先行发生于操作 C;
  • 线程启动规则:Thread 对象的 start() 方法先行发生于此线程的每个一个动作。子线程在执行 run 方法时,一定能看到父线程在执行 threaB.start() 前的所有操作的结果;
  • 线程中断规则:对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生;
  • 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过 Thread.join() 方法结束、 Thread.isAlive() 的返回值手段检测到线程已经终止执行;
  • 对象终结规则:一个对象的初始化完成先行发生于他的 finalize() 方法的开始。

2.3.2 内存屏障

内存屏障是 CPU 或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作。JMM 的重排序规则就是在生成 JVM 指令时插入特定的内存屏障指令。

变量被 volatile 修饰之后,会添加一个 ACC_VOLATI LE,生成字节码时,JVM 会识别出来。

四大内存屏障

变量规则

  1. 当第一个操作为 volatile 读时,不论第二个操作是什么,都不能重排序。这个操作保证了 volatile 读之后的操作不会被重排到 volatile 读之前;
  2. 当第二个操作为 volatile 写时,不论第一个操作是什么,都不能重排序。这个操作保证了 volatile 写之前的操作不会被重排到 volatile 写之后;
  3. 当第一个操作为 volatile 写时,第二个操作为 volatile 读时,不能重排。

变量规则

插入规则

  • 写:
    • 在每个 volatile 写操作的前⾯插⼊⼀个 StoreStore 屏障;
    • 在每个 volatile 写操作的后⾯插⼊⼀个 StoreLoad 屏障。

volatile 写顺序

  • 读:
    • 在每个 volatile 读操作的后⾯插⼊⼀个 LoadLoad 屏障;
    • 在每个 volatile 读操作的后⾯插⼊⼀个 LoadStore 屏障。

volatile 读顺序

禁止指令重排序

重排序的分类和执行流程:

  1. 编译器优化的重排序:编译器在不改变单线程串行语义的前提下,可以重新调整指令的执行顺序;
  2. 指令级并行的重排序:处理器使用指令级并行技术来讲多条指令重叠执行,若不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序;
  3. 内存系统的重排序:由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是乱序执行。

三类重排序

2.3.3 MESI 协议

MESI(M:modified E:exclusive S:shared I:invalid)在硬件约定了这样一种机制,会采用一种监听模式,一直去监听总线里面消息的传递。通过总线在内存获取数据只要被 lock 前缀修饰,就可以被感知。

2.3.4 双重检查单例模式

class Singleton {
    private static volatile Singleton singleton;

    private Singleton() {
    }

    public static Singleton getInstance() {
        // 在同步代码中进一步 check
        if (singleton == null) {
            synchronized (Singleton.class) {
                if (singleton == null) {
                    singleton = new Singleton();
                }
            }
        }
        return singleton;
    }
}

如果有两个线程同时调用 getInstance 方法,由于 singleton 为 null,两个线程都会进入第一个 if。由于锁的存在,其中一个进入等待状态。如果没有 double-check,在第二重中就会破坏单例。对于第一个 check 而言,如果去掉它,那么所有线程都会串行执行,效率低下。

而对于 singleton = new singleton() 并非是原子操作,在 JVM 中至少做了三件事情。

img

  • 第一步给 singleton 分配内存空间;
  • 第二步开始调用 Singleton 的构造函数等,初始化 singleton;
  • 第三步,将 singleton 对象指向分配的内存空间。

通过 volatile 保证字段更新时的可见性。

2.4 线程状态

Java 中线程的生命周期中一共有 6 种状态。

  1. New(新创建);
  2. Runnable(可运行);
  3. Blocked(被阻塞);
  4. Waiting(等待);
  5. Timed Waiting(计时等待);
  6. Terminated(被终止);

线程六个生命周期

  • New 表示线程被创建但尚未启动的状态:用 new Thread() 新建一个线程时,如果线程没有开始运行 start() 方法,所以也没有开始执行 run() 方法里面的代码,那么此时它的状态就是 New。而一旦线程调用了 start(),它的状态就会从 New 变成 Runnable;
  • Runable 状态对应操作系统线程状态中的两种状态,分别是 Running 和 Ready,也就是说,Java 中处于 Runnable 状态的线程有可能正在执行,也有可能没有正在执行,正在等待被分配 CPU 资源;
  • 在 Java 中 Blocked(被阻塞)、Waiting(等待)、Timed Waiting(计时等待),这三种状态统称为阻塞状态:

    • 进入 synchronized 保护的代码时没有抢到 monitor 锁,从 Runnable 状态进入 Blocked 状态,抢到 monitor 锁,从 Blocked 状态回到Runnable 状态;
      线程进入 Waiting 状态有三种可能性:
      • 没有设置 Timeout 参数的 Object.wait() 方法;
      • 没有设置 Timeout 参数的 Thread.join() 方法;
      • LockSupport.park() 方法。Blocked 仅仅针对 synchronized monitor 锁,在 Java 中还有很多其他的锁,比如 ReentrantLock,如果线程在获取这种锁时没有抢到该锁就会进入 Waiting 状态,因为本质上它执行了 LockSupport.park() 方法,所以会进入 Waiting 状态。同样,Object.wait() 和 Thread.join() 也会让线程进入 Waiting 状态。
        Blocked 在等待其他线程释放 monitor 锁,而 Waiting 则是在等待某个条件,比如 join 的线程执行完毕,或者是 notify()/notifyAll() 。
    • Waiting 和 Timed Waiting 非常相似,区别仅在于有没有时间限制。Timed Waiting 会等待超时,由系统自动唤醒,或者在超时前被唤醒信号唤醒。
      以下情况会让线程进入 Timed Waiting 状态:
      • 设置了时间参数的 Thread.sleep(long millis) 方法;
      • 设置了时间参数的 Object.wait(long timeout) 方法;
      • 设置了时间参数的 Thread.join(long millis) 方法;
    • 设置了时间参数的 LockSupport.parkNanos(long nanos) 方法和 LockSupport.parkUntil(long deadline) 方法。
  • Terminated 终止状态,要想进入这个状态有两种可能:
    • run() 方法执行完毕,线程正常退出;
    • 出现一个没有捕获的异常,终止了 run() 方法,最终导致意外终止。

2.5 线程控制

  • static void sleep(long millis):使当前正在执行的线程停留(暂停执行)指定的毫秒数 (休眠线程);
  • void join():当前线程暂停,等待指定的线程执行结束后,当前线程再继续 (相当于插队加入);
  • void join(int millis):可以等待指定的毫秒之后继续 (相当于插队,有固定的时间);
  • void yield():让出 CPU 的执行权(礼让线程);
  • void setDaemon(boolean on):将此线程标记为守护线程,当运行的线程都是守护线程时,Java 虚拟机将退出(守护线程)。

2.6 线程池

线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,如果显示超过了最大数量,超出的数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

如果线程数过多(Java 程序中的线程与操作系统中的线程是一一对应的),创建线程时会产生系统开销,并且每个线程还会占用一定的内存等资源,更重要的是我们创建如此多的线程也会给稳定性带来危害,因为每个系统中,可创建线程的数量是有一个上限的,不可能无限的创建。

  1. 反复创建线程系统开销比较大,每个线程创建和销毁都需要时间,如果任务比较简单,那么就有可能导致创建和销毁线程消耗的资源比线程执行任务本身消耗的资源还要大;
  2. 过多的线程会占用过多的内存等资源,还会带来过多的上下文切换,同时还会导致系统不稳定。

2.6.1 使用线程池

// 固定 n 个线程放入一个线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

// 一线程一池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

// 带有缓存的线程池,可以实现线程池线程的动态分配
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

线程池参数

线程池核心参数

  • corePoolSize:线程池中的常驻核心线程数;
  • maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值大于等于 1;
  • keepAliveTime:多余的空闲线程存活时间,当空间时间达到 keepAliveTime 值时,多余的线程会被销毁直到只剩下 corePoolSize 个线程为止(非核心线程);
  • unit:keepAliveTime 的单位;
  • workQueue:任务队列,被提交但尚未被执行的任务;
  • threadFactory:表示生成线程池中工作线程的线程工厂,用户创建新线程,一般用默认即可;
  • handler:拒绝策略,表示当线程队列满了并且工作线程大于等于线程池的最大显示数(maxnumPoolSize)时如何来拒绝。

线程创建

线程创建过程

当提交任务后,线程池首先会检查当前线程数,如果此时线程数小于核心线程数,比如最开始线程数量为 0,则新建线程并执行任务,随着任务的不断增加,线程数会逐渐增加并达到核心线程数,此时如果仍有任务被不断提交,就会被放入 workQueue 任务队列中,等待核心线程执行完当前任务后重新从 workQueue 中提取正在等待被执行的任务。

假设任务已经达到了 workQueue 的容量上限,线程池会在 corePoolSize 核心线程数的基础上继续创建线程来执行任务,假设任务被不断提交,线程池会持续创建线程直到线程数达到 maximumPoolSize 最大线程数,如果依然有任务被提交,这就超过了线程池的最大处理能力,这个时候线程池就会拒绝这些任务,我们可以看到实际上任务进来之后,线程池会逐一判断 corePoolSize、workQueue、maximumPoolSize,如果依然不能满足需求,则会拒绝任务。

corePoolSize & maximumPoolSize

corePoolSize & maximumPoolSize

  • 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加线程;
  • 线程池只有在任务队列填满时才创建多于 corePoolSize 的线程,如果使用的是无界队列(例如 LinkedBlockingQueue),那么由于队列不会满,所以线程数不会超过 corePoolSize;
  • 通过设置 corePoolSize 和 maximumPoolSize 为相同的值,就可以创建固定大小的线程池;
  • 通过设置 maximumPoolSize 为很高的值,例如 Integer.MAX_VALUE,就可以允许线程池创建任意多的线程。

2.6.2 拒绝策略

线程池会在以下两种情况下会拒绝新提交的任务。

  1. 当调用 shutdown 等方法关闭线程池后,即便此时可能线程池内部依然有没执行完的任务正在执行,但是由于线程池已经关闭,此时如果再向线程池内提交任务,就会遭到拒绝;
  2. 线程池没有能力继续处理新提交的任务,也就是工作已经非常饱和的时候。

第二种情况针对于 workQueue 和 maximumPoolSize 都满的情况。

  • 第一种拒绝策略是 AbortPolicy,这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException;
  • 第二种拒绝策略是 DiscardPolicy,当新任务被提交后直接被丢弃掉,也不会给任何的通知,相对而言存在一定的风险,因为提交的时候不知道这个任务会被丢弃,可能造成数据丢失;
  • 第三种拒绝策略是 DiscardOldestPolicy,如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点(通常是存活时间最长的任务)。这种策略与第二种不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理也存在一定的数据丢失风险;
  • 第四种拒绝策略是 CallerRunsPolicy,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把返回给调用者进行处理。这样做主要有两点好处:
    • 新提交的任务不会被丢弃,这样也就不会造成业务损失;
    • 由于返回给调用者进行处理务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。

2.6.3 常见线程池

6 种常见的线程池如下(在 JDK 1.8 之后更新了 ForkJoinPool):

  • FixedThreadPool;
  • CachedThreadPool;
  • ScheduledThreadPool;
  • SingleThreadExecutor;
  • SingleThreadScheduledExecutor;
  • ForkJoinPool。

CachedThreadPool

CachedThreadPool 可以称作可缓存线程池。特点在于线程数是几乎可以无限增加(实际最大可以达到 Integer.MAX_VALUE),当线程闲置时还可以对线程进行回收。

也有一个用于存储提交任务的队列,SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。

ScheduledThreadPool

ScheduledThreadPool,支持定时或周期性执行任务。比如每隔 10 秒钟执行一次任务。

ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

service.schedule(new Task(), 10, TimeUnit.SECONDS);

service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);

service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);
  • 第一种方法 schedule 比较简单,表示延迟指定时间后执行一次任务,如果代码中设置参数为 10 秒,也就是 10 秒后执行一次任务后就结束(在 SpringBoot 中可以应用的定时任务);
  • 第二种方法 scheduleAtFixedRate 表示以固定的频率执行任务,第二个参数 initialDelay 表示第一次延时时间,第三个参数 period 表示周期,也就是第一次延时后每次延时多长时间执行一次任务;
  • 第三种方法 scheduleWithFixedDelay 与第二种方法类似,也是周期执行任务,区别在于对周期的定义,之前的 scheduleAtFixedRate 是以任务开始的时间为时间起点开始计时,时间到就开始执行第二次任务,而不管任务需要花多久执行;而 scheduleWithFixedDelay 方法以任务结束的时间为下一次循环的时间起点开始计时。

ForkJoinPool

ForkJoinPool 运行示例图

  • fork() : 异步执行一个子任务;
  • join() : 阻塞当前线程等待子任务的执行结果。

ForkJoinPool 的核心方法 fork() 和 join()

ForkJoinPool 线程池有多种方法可以实现任务的分裂和汇总,其中一种用法如下方代码所示:

class Fibonacci extends RecursiveTask<Integer> { 
    int n;

    public Fibonacci(int n) { 
        this.n = n;
    } 

    @Override
    public Integer compute() { 
        if (n <= 1) { 
            return n;
        } 
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        f2.fork();
        return f1.join() + f2.join();
    }

	// 输出从 0 到 9 的斐波那契列数据
    public static void main(String[] args) throws ExecutionException, InterruptedException { 
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        for (int i = 0; i < 10; i++) { 
            // 提交 ForkJoinTask 任务
            ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i));
            // 使用 get() 等待计算完成
            System.out.println(task.get());
        } 
    }
}

RecursiveTask 类是对ForkJoinTask 的一个简单的包装。

ForkJoinPool 线程池中每个线程都有自己独立的任务队列。除了有一个共用的任务队列之外,每个线程还有一个对应的双端队列 deque,这时一旦线程中的任务被 Fork 分裂了,分裂出来的子任务放入线程自己的 deque 里,而不是放入公共的任务队列中。

如果此时有三个子任务放入线程 t1 的 deque 队列中,对于线程 t1 而言获取任务的成本就降低了,可以直接在自己的任务队列中获取而不必去公共队列中争抢也不会发生阻塞,减少了线程间的竞争和切换。

Thread0 承担 Thread1 的任务

2.6.4 阻塞队列

作为一种线程池中的缓冲机制,WorkQueue 需要满足多线程安全问题。线程池中任务队列采用 BlockingQueue 来保障线程安全。

BlockingQueue 继承链

每个线程池和对应 Queue 对应

LinkedBlockingQueue 容量为 Integer.MAX_VALUE,因为两个线程池的核心线程数是固定的。由于线程池的任务队列永远不会放满,所以线程池只会创建核心线程数量的线程;

SynchronousQueue 是不存储元素的阻塞队列,也即是单个元素的队列。CachedThreadPool 线程池并不需要一个任务队列来存储任务;

DelayedWorkQueue 内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构。

阻塞队列方法

  • 抛异常:如果试图的操作无法立即执行,抛一个异常;
  • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false);
  • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行;
  • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

ArrayBlockingQueue 和 LinkedBlockingQueue 都是遵循 FIFO 原则。自定义排序需要使用 PriorityBlockingQueue。通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。同时,插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。是无界队列,而且会自动扩容。

2.6.5 自定义线程池

在正常的开发中,不会使用 JDK 内置的线程。

  • FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM;
  • CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

阿里巴巴 Java 开发手册

2.6.6 “线程复用”

在线程池中,同一个线程可以从 BlockingQueue 中不断提取新任务来执行,其核心原理在于线程池对 Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”。

在这个“循环任务”中,不停地检查是否还有任务等待被执行,如果有则直接去执行这个任务,也就是调用任务的 run 方法,把 run 方法当作和普通方法一样的地位去调用,相当于把每个任务的 run() 方法串联了起来,所以线程数量并不增加。

public void execute(Runnable command) { 
    if (command == null) 
        throw new NullPointerException();
    int c = ctl.get();
    // 如果当前线程数小于核心线程数,调用 addWorker
    if (workerCountOf(c) < corePoolSize) {
        /*
        addWorker 方法的主要作用是在线程池中创建一个线程并执行第一个参数传入的任务,
        它的第二个参数是个布尔值,如果布尔值传入 true 
        代表增加线程时判断当前线程是否少于 corePoolSize,小于则增加新线程,大于等于则不增加;
        同理,如果传入 false 代表增加线程时判断当前线程是否少于 maxPoolSize,
        小于则增加新线程,大于等于则不增加。
        */
        if (addWorker(command, true)) 
            return;
        c = ctl.get();
    } 
    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command)) 
            reject(command);
        else if (workerCountOf(recheck) == 0) 
            addWorker(null, false);
    } 
    // 如果超出 maximumPoolSize 就执行拒绝策略
    else if (!addWorker(command, false)) 
        reject(command);
}

// 简化后的 runWorker 方法代码如下所示
runWorker(Worker w) {
    Runnable task = w.firstTask;
    while (task != null || (task = getTask()) != null) {
        try {
            task.run();
        } finally {
            task = null;
        }
    }
}
/*
通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。
直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)。
*/

2.6.7 关闭线程池

ThreadPoolExecutor 中涉及关闭线程池的方法,主要有 5 种:

  • void shutdown();
  • boolean isShutdown();
  • boolean isTerminated();
  • boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
  • List shutdownNow()。
  • shutdown():可以安全地关闭一个线程池,调用 shutdown() 方法之后线程池并不是立刻就被关闭,因为这时线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,调用 shutdown() 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭;
  • isShutdown():返回 true 或者 false 来判断线程池是否已经开始了关闭工作。isShutdown() 方法的返回的结果为 true 并不代表线程池此时已经彻底关闭了,这仅仅代表线程池开始了关闭的流程;
  • isTerminated():不仅代表线程池已关闭,同时代表线程池中的所有任务都已经都执行完毕了;
  • awaitTermination():本身并不是用来关闭线程池的,而是主要用来判断线程池状态的:
    • 等待期间(包括进入等待状态之前)线程池已关闭并且所有已提交的任务(包括正在执行的和队列中等待的)都执行完毕,相当于线程池已经“终结”了,方法便会返回 true;
    • 等待超时时间到后,第一种线程池“终结”的情况始终未发生,方法返回 false;
    • 等待期间线程被中断,方法会抛出 InterruptedException 异常。
  • shutdownNow():在执行 shutdownNow 方法之后,首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行,然后会将任务队列中正在等待的所有任务转移到一个 List 中并返回,可以根据返回的任务 List 进行操作。

当调用 shutdownNow 或者 shutdown 方法去尝试关闭 Java 线程池,都只是异步的通知线程池,此时线程池不会立即停止。如果要同步等待线程池彻底关闭后才继续往下执行,需要调用 awaitTermination 方法进行同步等待。

2.7 非阻塞队列

当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。当阻塞队列是满时,往队列中添加元素的操作将会被阻塞。

针对 BlockingQueue,主要有 ArrayBlockingQueue 和 LinkedBlockingQueue 两种实现方式。主要是数组和链表的区别, ArrayBlockingQueue 生产与消费之间共用一把锁,而 LinkedBlockingQueue 生产与消费时用不同的锁竞争。

  • 生产者与消费者之间没有太大竞争,倾向于单消费者,单生产者,且两者之间冲突较小,这种情况下数组寻址是明显要比链表去指向 next 的操作要更快的;

  • 基本可以确定队列大小,且队列大小稳定在一定的数量,这个时候数组占用内存是比链表小的。

ConcurrentLinkedQueue 相对阻塞队列来说,采用的是 CAS 无锁操作,没有 take 和 put 方法,主用 poll 与 offer,无界。但并不是任何情况下,非阻塞队列的效率都大于阻塞队列。

  • 数据入队速度过快,出队速度过慢,这个时候 ConcurrentLinkedQueue 如果不借助其他限制手段,随着时间的推移,JVM 必然会进行频繁的 FULL GC,严重的情况下甚至会发生 OOM。使用 BlockingQueue 可以更好的控制内存的状况;
  • 数据入队速度过慢,出队速度过快,这个时候消费者线程如果一定想要拿到数据而不进行阻塞,将进入大量时间的自旋状态,浪费 CPU 资源;
  • 入队与出队速度相仿。这时候要考虑速度,有多少个线程在同时做操作,线程操作的频率如何? 在大部分场景下,ConcurrentLinkedQueue 的性能是要比 BlockingQueue 要好。如果线程之间的竞争足够又高又快,CAS 操作的 CPU 消耗以及线程操作的成功率是极低的,反而不如用锁竞争控制效率来的高。

3. JUC

JUC 是 java.util.concurrent 工具包的简称,是一个处理线程的工具包。JDK 1.5 开始出现。

3.1 Atomic

Atomic 类有很多种,它们都在 java.util.concurrent.atomic 包中。基本都是通过 CAS(CompareAndSwap) 实现,而 CAS 的具体实现依赖于体系结构提供的指令。

Atomic 底层是基于无锁 unsafe-CAS 算法,保证线程安全的同时,效率比 synchronized 的效率要高。因为 synchronized 会存在锁竞争,直到升级到重量级锁。

在 Atomic 包里一共有 12个类,四种原子更新方式:

  • 原子更新基本类型;
  • 原子更新数组;
  • 原子更新引用;
  • 原子更新字段。

3.1.1 CAS

CAS,即 Compare and Swap,中文翻译为“比较并交换”。

对于 JUC 包中,CAS 理论是实现整个 Java 并发包的基石。从整体来看,concurrent 包的实现示意图如下:

实现意识图

CAS,即一种对内存中的共享数据进行操作的指令,而且该操作是原子的读写操作。其过程如下:

首先 CPU 将内存中的将要被修改的数据与预期的值进行比较,如果这两个值相等,CPU 则会将内存中数值替换为新值,否则不做操作。最后,CPU 会将旧值返回。在 Java 中,CAS 的含义就是“我认为的原本的值是什么,如果你是,则更换为新值,否则不做修改同时麻烦告诉我该值时多少”。

在 CAS 中,总共存在三个操作数:预期值 A、内存中的 V、修改的值 B。当且仅当预期值 A 和内存中的值 V 相同,则将内存V 值修改为 B,否则返回 V。使用这种机制编写的算法也叫作非阻塞算法,标准定义了一个线程的失败或者挂起是不会影响其他线程的失败或者挂起。

public final int getAndAdd(int delta) {
    for (;;) {
        int current = get();
        int next = current + delta;
        if (compareAndSet(current, next))
            return current;
    }
}
// 通过 current 和本地数据进行比较,如果符合条件返回 next
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

尽管 CAS 机制可以使我们不依赖与同步,不影响和挂起其他线程,它大大提升了运行的效率,但是它会导致一个 ABA 的问题,如下:

假如有两个线程 A、B,他们都读取内存中的数据 V,假如这个时候线程 A,先将 V 修改为 V1,然后又修改为 V,这个时候线程 B 的 compareAndSet 仍然能成功,对于线程 B 而言该值 V 并没有发生任何变化,而实际上它已经变化了,只不过最后又还原了而已。

为了解决这类问题,可以使用 AtomicStampedReference 。通过一个版本号来控制数据的变化,如果遵循使用规范,即每次进行修改时都将版本号加一,那么就可以杜绝 ABA 问题。

class LockTest {
    private AtomicStampedReference<LockTest> reference = new AtomicStampedReference<>(null, 0);
    private int testNumber;

    // stamp 表示对应版本号,每个版本号对应一个 Object
    public void changeObject() {
        LockTest newObject = new LockTest();
        while (true) {
            int previousStamp = reference.getStamp();
            LockTest previousObject = reference.getReference();
            if (reference.compareAndSet(previousObject, newObject, previousStamp, previousStamp + 1)) {
                break;
            }
        }
    }
}

内部 pair 结构

CAS 自旋时间过长不成功,会给 CPU 带来较大的开销

自旋

如果线程同步代码块内容不多,可能来回切换线程的开销比实际业务代码执行开销还要大。

自旋锁并不会放弃 CPU 时间片,而是通过自旋等待锁的释放,底层是一个无限的 while 循环。

// 通过 AtomicLong 的 getAndLong 方法为例
public final long getAndAddLong (Object var1,long var2, long var4){
    long var6;
    do {
        var6 = this.getLongVolatile(var1, var2);
    } while (!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
    return var6;
}

3.1.2 原子更新基本类型

用于通过原子的方式更新基本类型,Atomic 包提供了以下三个类:

  • AtomicBoolean:原子更新布尔类型;
  • AtomicInteger:原子更新整型;
  • AtomicLong:原子更新长整型。

AtomicInteger 的常用方法如下:

  • int addAndGet(int delta):以原子方式将输入的数值与实例中的值(AtomicInteger 里的 value)相加,并返回结果;
  • boolean compareAndSet(int expect, int update):如果输入的数值等于预期值,则以原子方式将该值设置为输入的值;
  • int getAndIncrement():以原子方式将当前值加 1(注意:这里返回的是自增前的值);
  • void lazySet(int newValue):最终会设置成 newValue。使用 lazySet 设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值;
  • int getAndSet(int newValue):以原子方式设置为 newValue 的值,并返回旧值。
public class Test1 {
    public static void main(String[] args) {
        // 使用 AtomicInteger 避免统计时的线程问题                  
        AtomicInteger sum = new AtomicInteger(0);
        for (int i = 0; i < 10000; i++) {
            sum.addAndGet(1);
        }
        System.out.println(sum);
    }
}

Atomic 包提供了三种基本类型的原子更新,但是 Java 的基本类型里还有 char,float 和 double 等。Atomic 包里的类基本都是使用 Unsafe 实现的,Unsafe 只提供了三种 CAS 方法,compareAndSwapObject,compareAndSwapInt 和 compareAndSwapLong。

通过 AtomicBoolean 源码,发现其是先把 Boolean 转换成整型,再使用 compareAndSwapInt 进行 CAS,所以原子更新 double 也可以用类似的思路来实现。

将 boolean 转为 int

性能问题

针对于业务中并发量很大的情况,Atomic 类也会存在性能问题。

以 AtomicLong 为例,内部的 value 被 volatile 修饰,可以保证自身的可见性。但是 volatile 会让线程的共享内存不断地 flush 和 refresh,这会浪费大量资源。

flush 和 reflush

LongAdder

LongAdder 引入分段累加的概念,内部共有两个参数参与计数:变量 base,数组 Cell[]。

base 用在竞争不激烈的情况下,直接把累加结果改到 base 变量上;当竞争激烈时,各个线程会分散累加到自己所对应的那个 Cell[] 数组的某一个对象中,不会共用一个。

// 在 16 个线程中使用 LongAdder
public class LongAdderDemo {
    public static void main(String[] args) throws InterruptedException {
        LongAdder counter = new LongAdder();
        ExecutorService service = Executors.newFixedThreadPool(16);
        try {
            for (int i = 0; i < 100; i++) {
                service.submit(new Task(counter));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            // 关闭线程池
            service.shutdown();
        }
        Thread.sleep(2000);
        System.out.println(counter.sum());
    }

    static class Task implements Runnable {
        private final LongAdder counter;
        public Task(LongAdder counter) {
            this.counter = counter;
        }
        @Override
        public void run() {
            counter.increment();
        }
    }
}

竞争激烈的时候,LongAdder 会通过计算出每个线程的 hash 值来给线程分配到不同的 Cell 上去,每个 Cell 相当于是一个独立的计数器,这样一来就不会和其他的计数器干扰。Cell 之间并不存在竞争关系,在自加的过程中,就大大减少了刚才的 flush 和 refresh,以及降低了冲突的概率。

LongAdder 在保证高效的同时,也需要消耗更多的空间。

Accumulator

Accumulator 就是一个更通用版本的 Adder,LongAdder 的 API 只有对数值的加减,而 LongAccumulator 提供了自定义的函数操作。

LongAccumulator counter = new LongAccumulator((x, y) -> x + y, 0);
LongAccumulator result = new LongAccumulator((x, y) -> x * y, 0);
LongAccumulator min = new LongAccumulator((x, y) -> Math.min(x, y), 0);
LongAccumulator max = new LongAccumulator((x, y) -> Math.max(x, y), 0);
// 第二个参数作为第一次运算的 x,执行自定义运算
// 之后会把上一次的结果赋值给 x,传递的数据给 y

使用了线程池,多个线程之间可以并行计算,效率比之前的串行高得多。

// 计算 5!
LongAccumulator accumulator = new LongAccumulator((x, y) -> x * y, 1);
ExecutorService executorService = Executors.newFixedThreadPool(8);
IntStream.range(2, 6).forEach(i -> executorService.submit(() -> accumulator.accumulate(i)));
Thread.sleep(1000);
System.out.println(accumulator.getThenReset());

Accumulator 不会要求计算顺序,在使用这个类时,需要采用拥有交换律的运算规则。

3.1.3 原子更新数组类

通过原子的方式更新数组里的某个元素,Atomic 包提供了以下三个类:

  • AtomicIntegerArray:原子更新整型数组里的元素;
  • AtomicLongArray:原子更新长整型数组里的元素;
  • AtomicReferenceArray:原子更新引用类型数组里的元素。

AtomicIntegerArray 类主要是提供原子的方式更新数组里的整型,其常用方法如下:

  • int addAndGet(int i, int delta):以原子方式将输入值与数组中索引 i 的元素相加;
  • boolean compareAndSet(int i, int expect, int update):如果当前值等于预期值,则以原子方式将数组位置i的元素设置成 update 值。
//addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加;
public final int addAndGet(int i, int delta) {
    return getAndAdd(i, delta) + delta;
}
 
//getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1;
public final int getAndIncrement(int i) {
    return getAndAdd(i, 1);
}
 
//compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新
public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

3.1.4 原子更新引用类型

原子更新基本类型的 AtomicInteger,只能更新一个变量,如果要原子的更新多个变量,就需要使用这个原子更新引用类型提供的类。Atomic 包提供了以下三个类:

  • AtomicReference:原子更新引用类型;
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段;
  • AtomicMarkableReference:原子更新带有标记位的引用类型。可以原子的更新一个布尔类型的标记位和引用类型。构造方法是 AtomicMarkableReference(V initialRef, boolean initialMark)。
public class Main {
    private static AtomicReference<User> reference = new AtomicReference<>();

    public static void main(String[] args) {
        User user1 = new User("hello", 20);
        reference.set(user1);
        User user2 = new User("world",30);
        User user = reference.getAndSet(user2);
        System.out.println(user);
        System.out.println(reference.get());
    }

    static class User {
        private String name;
        private int age;

        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }
    }
}

3.1.5 原子更新字段类

如果只需要某个类里的某个字段,那么就需要使用原子更新字段类,Atomic 包提供了以下三个类:

  • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器;
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器;
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更数据和数据的版本号,可以解决使用 CAS 进行原子更新时,可能出现的 ABA 问题。

原子更新字段类都是抽象类,每次使用都时候必须使用静态方法 newUpdater 创建一个更新器。原子更新类的字段的必须使用 public volatile 修饰符。

3.2 Unsafe

Unsafe 是位于 sun.misc 包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升 Java 运行效率、增强 Java 语言底层资源操作能力方面起到了很大的作用。

由于 Unsafe 类使 Java 语言拥有了类似 C 语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。 在程序中过度、不正确使用 Unsafe 类会使得程序出错的概率变大,使得 Java 这种安全的语言变得不再“安全”,因此对 Unsafe 的使用一定要慎重。UnSafe 根据内存偏移地址获取数据。

Unsafe 还可以申请堆外内存,但是堆外内存不受 GC 管理,内存用完后,一定要手动释放,因为GC不会释放。通过 allocateMemory(size) 和 freeMemory(size) 进行申请和释放。

Unsafe 的典型应用:

  1. 堆外内存操作。DirectByteBuffer 是 Java 用于实现堆外内存的一个重要类,通常用在通信过程中做缓冲池,如在 Netty、MINA 等 NIO 框架中应用广泛。DirectByteBuffer 对于堆外内存的创建、使用、销毁等逻辑均由 Unsafe 提供的堆外内存 API 来实现;
  2. ReentrantLock、Atomic 等 API 通过 CAS 修改 state 等等,底层用的也是 Unsafe;
  3. 线程调度。如 LockSupport.park() 和 LockSupport.unpark() 实现线程的阻塞和唤醒。而 LockSupport 的 park、unpark 方法实际是调用 Unsafe 的 park、unpark 方式来实现;
  4. 内存屏障,通过 Unsafe 的 loadFence 方法加入一个内存屏障,目的是避免指令重排。

3.2.1 变量句柄

在 JDK 9 之后,引入变量句柄的概念。变量句柄是一个变量或一组变量的引用,包括静态域,非静态域,数组元素和堆外数据结构中的组成部分等。变量句柄的含义类似于已有的方法句柄。

变量句柄由 Java 类 java.lang.invoke.VarHandle 来表示。可以使用类 java.lang.invoke.MethodHandles.Lookup 中的静态工厂方法来创建 VarHandle 对象。通过变量句柄,可以在变量上进行各种操作。这些操作称为访问模式。不同的访问模式尤其在内存排序上的不同语义。目前一共有 31 种访问模式,而每种访问模式都在 VarHandle 中有对应的方法。这些方法可以对变量进行读取、写入、原子更新、数值原子更新和比特位原子操作等。VarHandle 还可以用来访问数组中的单个元素,以及把 byte[] 数组 和 ByteBuffer 当成是不同原始类型的数组来访问。

因为可以进行原子操作等等,所以 VarHandle 已经逐渐替代 Unsafe 的使用(因为使用 Unsafe 是不安全操作,直接对内存进行操作)。

public class HandleTarget {
    public int count = 1;
}

public class VarHandleTest {
    private HandleTarget handleTarget = new HandleTarget();
    private VarHandle varHandle;
    @Before
    public void setUp() throws Exception {
        this.handleTarget = new HandleTarget();
        this.varHandle = MethodHandles
                .lookup()
                .findVarHandle(HandleTarget.class, "count", int.class);
    }

    @Test
    public void testGet() throws Exception {
        // 通过 junit 测试,获取值为 1
        Assert.assertEquals(1, this.varHandle.get(this.handleTarget));
    }
}

4. 同步

4.1 synchronized

虽然可以使用 synchronized 来定义方法,但 synchronized 并不属于方法定义的一部分,因此,synchronized 关键字不能被继承。如果在父类中的某个方法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此,子类的方法也就相当于同步了。

synchronized 保证可见性

synchronized 作用不同地方效果不一样:

  • synchronized 关键字加到方法上,同步方法的锁对象是 this;
  • synchronized 关键字加到静态方法上,同步静态方法的锁对象是 类名.class。

monitorentermonitorexit 这两个 jvm 指令,主要是基于 Mark WordObject monitor 来实现的。

4.1.1 原理

synchronized 同步语句块的实现使用的是 monitorentermonitorexit 指令,其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置。

synchronized 修饰的方法并没有 monitorenter 指令和 monitorexit 指令,取得代之的确实是 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法。

jvm 对象内存分配

每个 Java 对象都可以用作一个实现同步的锁,这个锁也被称为内置锁或 monitor 锁,获得 monitor 锁的唯一途径就是进入由这个锁保护的同步代码块或同步方法,线程在进入被 synchronized 保护的代码块之前,会自动获取锁,并且无论是正常路径退出,还是通过抛出异常退出,在退出的时候都会自动释放锁。

在 HotSpot 虚拟机中,monitor 采用 ObjectMonitor 实现。

ObjectMapper 底层源码

同步代码块

  • monitorenter:

执行 monitorenter 的线程尝试获得 monitor 的所有权,会发生以下这三种情况之一:

  1. 如果该 monitor 的计数为 0,则线程获得该 monitor 并将其计数设置为 1。然后,该线程就是这个 monitor 的所有者;
  2. 如果线程已经拥有了这个 monitor ,则它将重新进入,并且累加计数;
  3. 如果其他线程已经拥有了这个 monitor,那个这个线程就会被阻塞,直到这个 monitor 的计数变成为 0,代表这个 monitor 已经被释放了,于是当前这个线程就会再次尝试获取这个 monitor。
  • monitorexit:

monitorexit 的作用是将 monitor 的计数器减 1,直到减为 0 为止。代表这个 monitor 已经被释放了,已经没有任何线程拥有它了,也就代表着解锁,所以,其他正在等待这个 monitor 的线程,此时便可以再次尝试获取这个 monitor 的所有权

同步方法

对于 synchronized 方法,会有一个叫作 ACC_SYNCHRONIZED 的 flag 修饰符,来表明它是同步方法。

当某个线程要访问某个方法的时候,会首先检查方法是否有 ACC_SYNCHRONIZED 标志,如果有则需要先获得 monitor 锁,然后才能开始执行方法,方法执行之后再释放 monitor 锁。

4.1.2 优化

synchronized 核心优化方案主要包含以下 4 个:

  1. 锁膨胀;
  2. 锁消除;
  3. 锁粗化;
  4. 自适应自旋锁。

锁膨胀

JDK 1.6 对锁的实现引入了大量的优化,如偏向锁、轻量级锁、自旋锁、适应性自旋锁、锁消除、锁粗化等技术来减少锁操作的开销。

锁主要存在四种状态,依次是:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,他们会随着竞争的激烈而逐渐升级。锁可以升级不可降级,为了提高获得锁和释放锁的效率。

锁膨胀是指 synchronized 从无锁升级到偏向锁,再到轻量级锁,最后到重量级锁的过程,它叫做锁膨胀也叫做锁升级。

锁膨胀流程

JDK 1.6 之前,synchronized 是重量级锁,也就是说 synchronized 在释放和获取锁时都会从用户态转换成内核态(在操作系统中有说明,这两个状态切换代价很大),而转换的效率是比较低的。但有了锁膨胀机制之后,synchronized 的状态就多了无锁、偏向锁以及轻量级锁了,这时候在进行并发操作时,大部分的场景都不需要用户态到内核态的转换了,这样就大幅的提升了 synchronized 的性能。

偏向锁

HotSpot 作者经过研究实践发现,在大多数情况下,锁不存在多线程竞争,总是由同一线程多次获得的,为了让线程获得锁的代价更低,于是就引进了偏向锁。

偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行,轻量级锁的获取及释放依赖多次 CAS 原子指令,而偏向锁只需要在置换 ThreadID 的时候依赖一次 CAS 原子指令(由于一旦出现多线程竞争的情况就必须撤销偏向锁,所以偏向锁的撤销操作的性能损耗必须小于节省下来的 CAS 原子指令的性能消耗)。

获取锁的过程
  1. 访问 Mark Word(JVM 中对象 Header 部分的字段,偏向锁在这里存储)中偏向锁的标识(锁标志位为“01”状态,是否为偏向锁为“0”)是否设置成 1,锁标志位是否为 01 —— 确认为可偏向状态;
  2. 如果为可偏向状态,则测试线程 ID 是否指向当前线程,如果是,进入步骤(5),否则进入步骤(3);
  3. 如果线程 ID 并未指向当前线程,则通过 CAS 操作竞争锁。如果竞争成功,则将 Mark Word 中线程 ID 设置为当前线程 ID,然后执行(5);如果竞争失败,执行(4);
  4. 如果 CAS 获取偏向锁失败,则表示有竞争。当到达全局安全点(safepoint)时获得偏向锁的线程被挂起,偏向锁升级为轻量级锁,然后被阻塞在安全点的线程继续往下执行同步代码;
  5. 执行同步代码。
解锁过程

偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行)。

三者转换图示

轻量级锁

引入轻量级锁的目的是在没有多线程竞争的前提下,减少传统的重量级锁使用操作系统 Mutex Lock(互斥锁)产生的性能消耗。如果使用 Mutex Lock 每次获取锁和释放锁的操作都会带来用户态和内核态的切换,这样系统的性能开销是很大的。如果存在同一时间访问同一锁的情况,就会导致轻量级锁膨胀为重量级锁。

当关闭偏向锁或者多个线程竞争偏向锁时就会导致偏向锁升级为轻量级锁,轻量级锁的获取和释放都通过 CAS 完成的,其中锁获取可能会通过一定次数的自旋来完成。

需要强调一点:轻量级锁并不是用来代替重量级锁的,它的本意是在没有多线程竞争的前提下,减少传统的重量级锁使用产生的性能消耗。轻量级锁所适应的场景是线程交替执行同步块的情况,如果同一时间多个线程同时访问时,就会导致轻量级锁膨胀为重量级锁。

加锁过程
  1. 在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的 Mark Word 的拷贝,官方称之为 Displaced Mark Word;
  2. 拷贝对象头中的 Mark Word 复制到锁记录中;
  3. 拷贝成功后,虚拟机将使用 CAS 操作尝试将对象的 Mark Word 更新为指向 Lock Record 的指针,并将 Lock record 里的 owner 指针指向 object mark word。如果更新成功,则执行步骤(3),否则执行步骤(4);
  4. 如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,即表示此对象处于轻量级锁定状态;
  5. 如果这个更新操作失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行。否则说明多个线程竞争锁,轻量级锁就要膨胀为重量级锁,锁标志的状态值变为“10”,Mark Word中存储的就是指向重量级锁(互斥量)的指针,后面等待锁的线程也要进入阻塞状态。 而当前线程便尝试使用自旋来获取锁,自旋就是为了不让线程阻塞,而采用循环去获取锁的过程。

轻量级锁 CAS 操作之前堆栈与对象的状态

轻量级锁 CAS 操作之后堆栈与对象的状态

解锁过程
  1. 通过 CAS 操作尝试把线程中复制的 Displaced Mark Word 对象替换当前的 Mark Word;
  2. 如果替换成功,整个同步过程就完成了;
  3. 如果替换失败,说明有其他线程尝试过获取该锁(此时锁已膨胀),那就要在释放锁的同时,唤醒被挂起的线程。
重量级锁

synchronized 是依赖监视器 Monitor 实现方法同步或代码块同步的,代码块同步使用的是 monitorenter 和 monitorexit 指令来实现的,monitorenter 指令是在编译后插入到同步代码块的开始位置,而 monitorexit 是插入到方法结束处和异常处的,任何对象都有一个 Monitor 与之关联,当且一个 Monitor 被持有后,它将处于锁定状态。

锁消除

锁消除指的是在某些情况下,JVM 虚拟机如果检测不到某段代码被共享和竞争的可能性,就会将这段代码所属的同步锁消除掉,从而到底提高程序性能的目的。

锁消除的依据是逃逸分析的数据支持,如 StringBuffer 的 append() 方法,或 Vector 的 add() 方法,在很多情况下是可以进行锁消除的,比如以下这段代码:

public String method() {
    StringBuffer sb = new StringBuffer();
    for (int i = 0; i < 10; i++) {
        sb.append("i:" + i);
    }
    return sb.toString();
}

以上代码经过编译之后的字节码如下:

StringBuffer 被转化为 StringBuilder

锁粗化

锁粗化是指,将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁

一系列连续加锁和解锁的操作,也会导致不必要的性能开销,从而影响程序的执行效率,比如这段代码:

public String method() {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < 10; i++) {
        // 伪代码:加锁操作
        sb.append("i:" + i);
        // 伪代码:解锁操作
    }
    return sb.toString();
}

如果在 for 循环中定义锁,那么锁的范围很小,但每次 for 循环都需要进行加锁和释放锁的操作,性能是很低的;但如果我们直接在 for 循环的外层加一把锁,那么对于同一个对象操作这段代码的性能就会提高很多,如下伪代码所示:

public String method() {
    StringBuilder sb = new StringBuilder();
    // 伪代码:加锁操作
    for (int i = 0; i < 10; i++) {
        sb.append("i:" + i);
    }
    // 伪代码:解锁操作
    return sb.toString();
}

锁粗化的作用:如果检测到同一个对象执行了连续的加锁和解锁的操作,则会将这一系列操作合并成一个更大的锁,从而提升程序的执行效率

自适应自旋锁

自旋锁是指通过自身循环,尝试获取锁的一种方式,伪代码实现如下:

// 尝试获取锁
while(!isLock()){
}

自旋锁优点在于它避免一些线程的挂起和恢复操作,因为挂起线程和恢复线程都需要从用户态转入内核态,这个过程是比较慢的,所以通过自旋的方式可以一定程度上避免线程挂起和恢复所造成的性能开销

自适应自旋锁是指,线程自旋的次数不再是固定的值,而是一个动态改变的值,这个值会根据前一次自旋获取锁的状态来决定此次自旋的次数如果线程自旋成功了,则下次自旋的次数会增多,如果失败,下次自旋的次数会减少。

4.2 Lock

4.2.1 ReentrantLock

synchronized 加锁很重,必需一直等待,没有其他机制。ReentrantLock 是 Lock 接口的一个最主要的实现类,在对比 synchronized 和 Lock 的时候,也会选择 Lock 的主要实现类来进行对比。

// 传统 synchronized 做法
public class Counter {
    private int count;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }
}

// 使用 ReentrantLock 改写
public class Counter {
    private final Lock lock = new ReentrantLock();
    private int count;

    public void add(int n) {
        lock.lock();
        try {
            count += n;
        } finally {
            lock.unlock();
        }
    }
}

因为synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。

ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁。和synchronized不同的是,ReentrantLock可以尝试获取锁:

if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        ...
    } finally {
        lock.unlock();
    }
}

在获取锁失败之后可以进行 false 之后的代码块,防止一直尝试获取 lock。

4.2.2 Lock & synchronized

在 JDK 1.5 里,ReentrantLock 的性能是明显优于 synchronized 。在 JDK 1.6 里,synchronized 做了优化,性能差别已经不明显了。

性能之差不再明显

用法

Lock 不是 Java 语言内置的,synchronized 是 Java 语言的关键字,因此是内置特性。Lock 是一个类,通过这个类可以实现同步访问。

synchronized 关键字可以加在方法上,不需要指定锁对象(此时的锁对象为 this),也可以新建一个同步代码块并且自定义 monitor 锁对象;而 Lock 接口必须显示用 Lock 锁对象开始加锁 lock() 和解锁 unlock(),并且一般会在 finally 块中确保用 unlock() 来解锁,以防发生死锁。

与 Lock 显式的加锁和解锁不同的是 synchronized 的加解锁是隐式的,尤其是抛异常的时候也能保证释放锁,但是 Java 代码中并没有相关的体现;

加锁解锁顺序

对于 Lock 而言如果有多把 Lock 锁,Lock 可以不完全按照加锁的反序解锁,比如我们可以先获取 Lock1 锁,再获取 Lock2 锁,解锁时则先解锁 Lock1,再解锁 Lock2,加解锁有一定的灵活度:

lock1.lock();
lock2.lock();
...
lock1.unlock();
lock2.unlock();

synchronized 解锁的顺序和加锁的顺序必须完全相反。

灵活性

一旦 synchronized 锁已经被某个线程获得了,此时其他线程如果还想获得,那它只能被阻塞,直到持有锁的线程运行完毕或者发生异常从而释放这个锁。如果持有锁的线程持有很长时间才释放,那么整个程序的运行效率就会降低,而且如果持有锁的线程永远不释放锁,那么尝试获取锁的线程只能永远等下去。

Lock 类在等锁的过程中,如果使用的是 lockInterruptibly 方法,那么如果觉得等待的时间太长了不想再继续等待,可以中断退出,也可以用 tryLock() 等方法尝试获取锁,如果获取不到锁也可以做别的事,更加灵活。

拥有限制

synchronized 锁只能同时被一个线程拥有,但是 Lock 锁没有这个限制。在读写锁中的读锁,是可以同时被多个线程持有的。

synchronized 是内置锁

synchronized 是内置锁,由 JVM 实现获取锁和释放锁的原理,还分为偏向锁、轻量级锁、重量级锁(在 JDK 1.6 之后升级)。

Lock 根据实现不同,有不同的原理,例如 ReentrantLock 内部是通过 AQS 来获取和释放锁的。

设置公平/非公平

公平锁是指多个线程在等待同一个锁时,根据先来后到的原则依次获得锁。ReentrantLock 等 Lock 实现类可以根据自己的需要来设置公平或非公平,synchronized 则不能设置。

性能区别

在 Java 5 以及之前,synchronized 的性能比较低,但是到了 Java 6 以后,发生了变化,因为 JDK 对 synchronized 进行了很多优化,比如自适应自旋、锁消除、锁粗化、轻量级锁、偏向锁等,所以后期的 Java 版本里的 synchronized 的性能并不比 Lock 差。

4.2.3 加锁解锁

与 Lock 接口加解锁相关的主要有 5 个方法,分别是 lock()、tryLock()、tryLock(long time, TimeUnit unit) 和 lockInterruptibly()、unlock()。

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

lock()

lock() 是最基础的获取锁的方法。在线程获取锁时如果锁已被其他线程获取,则进行等待,是最初级的获取锁的方法。

最佳实践是执行 lock() 后,首先在 try{} 中操作同步资源,如果有必要就用 catch{} 块捕获异常,然后在 finally{} 中释放锁,以保证发生异常时锁一定被释放。

Lock lock = ...;
lock.lock();
try{
    //获取到了被本锁保护的资源,处理任务
    //捕获异常
}finally{
    lock.unlock();   //释放锁
}

lock() 方法不能被中断,这会带来很大的隐患:一旦陷入死锁,lock() 就会陷入永久等待。

tryLock()

tryLock() 用来尝试获取锁,如果当前锁没有被其他线程占用,则获取成功,返回 true,否则返回 false,代表获取锁失败。

创建 lock() 方法之后使用 tryLock() 方法并用 if 语句判断它的结果,如果 if 语句返回 true,就使用 try finally 完成相关业务逻辑的处理,如果 if 语句返回 false 就会进入 else 语句,代表它暂时不能获取到锁,可以先去做一些其他事情。

public void tryLock(Lock lock1, Lock lock2) throws InterruptedException {
    while (true) {
        if (lock1.tryLock()) {
            try {
                if (lock2.tryLock()) {
                    try {
                        System.out.println("获取到了两把锁,完成业务逻辑");
                        return;
                    } finally {
                        lock2.unlock();
                    }
                }
            } finally {
                lock1.unlock();
            }
        } else {
            Thread.sleep(new Random().nextInt(1000));
        }
    }
}

tryLock() 的重载方法是 tryLock(long time, TimeUnit unit)。在等待了一段指定的超时时间后,线程会主动放弃这把锁的获取,避免永久等待;在等待的期间,也可以随时中断线程,这就避免了死锁的发生。

lockInterruptibly()

这个方法的作用就是去获取锁,如果这个锁当前是可以获得的,那么这个方法会立刻返回,但是如果这个锁当前是不能获得的(被其他线程持有),那么当前线程便会开始等待,除非它等到了这把锁或者是在等待的过程中被中断了,否则这个线程便会一直在这里执行这行代码。

lockInterruptibly() 是可以响应中断的。相比于不能响应中断的 synchronized 锁,lockInterruptibly() 可以让程序更灵活,可以在获取锁的同时,保持对中断的响应。我们可以把这个方法理解为超时时间是无穷长的 tryLock(long time, TimeUnit unit),因为 tryLock(long time, TimeUnit unit) 和 lockInterruptibly() 都能响应中断,只不过 lockInterruptibly() 永远不会超时。

public void lockInterruptibly() {
    try {
        lock.lockInterruptibly();
        try {
            System.out.println("操作资源");
        } finally {
            lock.unlock();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

4.2.4 ReadWriteLock

整体思路是两把锁,第 1 把锁是写锁,获得写锁之后,既可以读数据又可以修改数据,而第 2 把锁是读锁,获得读锁之后,只能查看数据,不能修改数据。读锁可以被多个线程同时持有,所以多个线程可以同时查看数据。

要么是一个或多个线程同时有读锁,要么是一个线程有写锁,但是两者不会同时出现。

比于 ReentrantLock 适用于一般场合,ReadWriteLock 适用于读多写少的情况。

读锁插队策略

ReentrantReadWriteLock 可以设置为公平或者非公平:

ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(true);

ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(false);

公平锁两个方法实现:

final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}
// 当调用 hasQueuePredecessors 为 true 时,读和写都会 block

非公平锁两个方法实现:

final boolean writerShouldBlock() {
    return false; // writers can always barge
}
// false 表示写锁可以随时插队

final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}

Thread5 试图插队

第一种策略:允许插队

由于现在有线程在读,而线程 5 又不会特别增加它们读的负担,因为线程们可以共用这把锁,所以第一种策略就是让线程 5 直接加入到线程 2 和线程 4 一起去读取。

但是如果想要读取的线程不停地增加,导致读锁长时间内不会被释放,导致线程 3 长时间内拿不到写锁,写锁的线程会陷入“饥饿”状态。

第二种策略:不允许插队

按照这种策略线程 5 会被放入等待队列中,并且排在线程 3 的后面,让线程 3 优先于线程 5 执行,这样可以避免“饥饿”状态。

即便是非公平锁,只要等待队列的头结点是尝试获取写锁的线程,那么读锁依然是不能插队的,目的是避免“饥饿”。

锁的升降级

锁的降级取决于不能一直使用最高级的写锁,如果写的情况很少,写锁的存在会大幅度降低效率。

public class CachedData {
    Object data;
    volatile boolean cacheValid;

    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) {
            //在获取写锁之前,必须首先释放读锁。
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                //这里需要再次判断数据的有效性,因为在我们释放读锁和获取写锁的空隙之内,可能有其他线程修改了数据。
                if (!cacheValid) {
                    data = new Object();
                    cacheValid = true;
                }
                //在不释放写锁的情况下,直接获取读锁,这就是读写锁的降级。
                rwl.readLock().lock();
            } finally {
                //释放了写锁,但是依然持有读锁
                rwl.writeLock().unlock();
            }
        }
        try {
            System.out.println(data);
        } finally {
            //释放读锁
            rwl.readLock().unlock();
        }
    }
}

读写锁的特点是如果线程都申请读锁,是可以多个线程同时持有的,可是如果是写锁,只能有一个线程持有,并且不可能存在读锁和写锁同时持有的情况。

所以一般不支持锁的升级,如果这样做,必须每次保证只有一个线程可以升级。否则就会造成多个线程都会试图升级写锁的死锁问题。

4.3 AQS

AQS 是一个用于构建锁、同步器等线程协作工具类的框架。在 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、ThreadPoolExcutor 的 Worker 中都有运用(JDK 1.8)。

AQS 内部结构

4.3.1 AQS 组成

AQS 最核心的三大部分就是状态队列期望协作工具类去实现的获取/释放等重要方法

state 状态

AQS 通过 state 表示状态,在不同类中表示不同的意思。private volatile int state; volatile 保证可见性,提供线程安全。

FIFO 队列

这个队列最主要的作用是存储等待的线程,AQS 的一大功能就是充当线程的“排队管理器”。

获取/释放方法

获取操作通常会依赖 state 变量的值,根据 state 值不同,协作工具类也会有不同的逻辑,并且在获取的时候也经常会阻塞。

释放方法是站在获取方法的对立面的,通常和刚才的获取方法配合使用。

5. 线程问题

5.1 线程安全

在实际开发中经常会遇到线程不安全的情况,一共有 3 种典型的线程安全问题:

  1. 运行结果错误;
  2. 发布和初始化导致线程安全问题;
  3. 活跃性问题。

5.1.1 运行结果错误

多线程同时操作一个变量导致的运行结果错误:

public class WrongResult {
    volatile static int i;
    public static void main(String[] args) throws InterruptedException {
        Runnable r = new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    i++;
                }
            }
        };
        Thread thread1 = new Thread(r);
        thread1.start();
        Thread thread2 = new Thread(r);
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(i);
    }
}

由于 i++ 并非是原子操作,它分为 读取 +1 赋值 三步。这样导致 thread1 中修改的 i 有时会不能被 thread2 读取。

两次操作,但 i 的最终值可能只 +1

5.1.2 发布和初始化导致的问题

发布和初始化供其他类或对象使用是常见的操作,但如果操作的时间或地点不对,就可能导致线程安全问题:

public class WrongInit {
    private Map<Integer, String> students;
    public WrongInit() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                students = new HashMap<>();
                students.put(1, "王小美");
                students.put(2, "钱二宝");
                students.put(3, "周三");
                students.put(4, "赵四");
            }
        }).start();
    }
    public Map<Integer, String> getStudents() {
        return students;
    }
    public static void main(String[] args) throws InterruptedException {
        WrongInit multiThreadsError6 = new WrongInit();
        System.out.println(multiThreadsError6.getStudents().get(1));
    }
}

在构造方法中用线程初始化数据可能会空指针异常,因为线程启动需要一定的时间。

5.1.3 活跃性问题

活跃性问题就是程序始终得不到运行的最终结果,最典型的有三种,分别为死锁、活锁和饥饿。

活锁与死锁非常相似,也是程序一直等不到结果。正在运行的线程并没有阻塞,它始终在运行中,却一直得不到结果。

5.2 性能问题

5.2.1 调度开销

上下文切换

上下文切换会挂起当前正在执行的线程并保存当前的状态,然后寻找下一处即将恢复执行的代码,唤醒下一个线程。

缓存失效

一旦进行了线程调度,切换到其他线程,CPU就会去执行不同的代码,原有的缓存就很可能失效了,需要重新缓存新的数据,这也会造成一定的开销。线程调度器为了避免频繁地发生上下文切换,通常会给被调度到的线程设置最小的执行时间。

5.2.2 协作开销

除了线程调度之外,线程协作同样也有可能带来性能问题。线程之间如果有共享数据,为了避免数据错乱,为了保证线程安全,就有可能禁止编译器和 CPU 对其进行重排序等优化,也可能出于同步的目的,反复把线程工作内存的数据 flush 到主存中,然后再从主内存 refresh 到其他线程的工作内存中等等。

这些问题在单线程中并不存在,但在多线程中为了确保数据的正确性,就不得不采取上述方法,因为线程安全的优先级要比性能优先级更高,这也间接降低了性能。

5.3 CPU 核心 & 线程数

《Java 并发编程实战》的作者 Brain Goetz 推荐的计算方法:线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)

CPU 密集型任务:任务会长时间占用 CPU 资源;

耗时 IO 型任务:CPU 资源不会太多,但是 IO 操作耗时。

  • 线程的平均工作时间所占比例越高,就需要越少的线程;
  • 线程的平均等待时间所占比例越高,就需要越多的线程;
  • 针对不同的程序,进行对应的实际测试就可以得到最合适的选择。

5.4 ThreadLocal

一个 Thread 里面只有一个ThreadLocalMap ,而在一个 ThreadLocalMap 里面却可以有很多的 ThreadLocal,每一个 ThreadLocal 都对应一个 value。一个 Thread 是可以调用多个 ThreadLocal,Thread 内部就采用了 ThreadLocalMap 这样 Map 的数据结构来存放 ThreadLocal 和 value。

Thread、ThreadLocal 及 ThreadLocalMap

5.4.1 使用场景

ThreadLocal 有两种典型的使用场景:

  1. ThreadLocal 用作保存每个线程独享的对象,为每个线程都创建一个副本,这样每个线程都可以修改自己所拥有的副本, 而不会影响其他线程的副本,确保了线程安全;
  2. ThreadLocal 用作每个线程内需要独立保存信息,以便供其他方法更方便地获取该信息的场景。每个线程获取到的信息可能都是不一样的,前面执行的方法保存了信息后,后续方法可以通过 ThreadLocal 直接获取到,避免了传参,类似于全局变量的概念。

场景一

对于海量请求的情况,如果每次都进行创建会导致创建销毁的损耗过大。但是如果简单地使用 static,会有安全性问题,此时可以用 ThreadLocal。

public class ThreadLocalDemo {
    public static ExecutorService service = Executors.newFixedThreadPool(16);

    public static void main(String[] args) {
        try {
            for (int i = 1; i <= 1000; i++) {
                int finalI = i;
                service.submit(() -> {
                    String date = new ThreadLocalDemo().date(finalI);
                    System.out.println(date);
                });
            }
        } finally {
            service.shutdown();
        }
    }

    public String date(int second) {
        Date date = new Date(1000L * second);
        // 获取 ThreadLocal 中的 SimpleDateFormat
        // 防止每次在 date 方法中创建 simpleDateFormat 销毁
        SimpleDateFormat simpleDateFormat = ThreadSafeFormatter.local.get();
        return simpleDateFormat.format(date);
    }
}

class ThreadSafeFormatter {
    // ThreadLocal 中做初始化
    public static ThreadLocal<SimpleDateFormat> local = 
            ThreadLocal.withInitial(() -> new SimpleDateFormat("mm:ss"));
}

在 Java 开发手册中也明确 SimpleDateFormat 是线程不安全类

场景二

场景二在 Web 开发中经常使用,通过 ThreadLocal 存储我们在拦截器中获得的数据(比如 Token)。

每个线程拥有自己的数据

5.4.2 ThreadLocal & synchronized

ThreadLocal 并不是用来解决共享资源问题的。虽然 ThreadLocal 确实可以用于解决多线程情况下的线程安全问题,但其资源并不是共享的,而是每个线程独享的。相比于使用“锁”而言,把资源变成了各线程独享的资源,避免了同步操作。

当 ThreadLocal 用于解决线程安全问题的时候,也就是把一个对象给每个线程都生成一份独享的副本的,在这种场景下,ThreadLocal 和 synchronized 都可以理解为是用来保证线程安全的手段。

5.4.3 内存泄漏

ThreadLocal 内部使用弱引用防止因为 key 指向 ThreadLocal 对象不能被 gc 回收,造成内存泄漏。使用弱引用,就可以使 ThreadLocal 对象在方法执行完毕后顺利被回收且 Entry 的 key 引用指向为 null。

在执行 ThreadLocal 的 set、remove、rehash 等方法时,它都会扫描 key 为 null 的 Entry,如果发现某个 Entry 的 key 为 null,则代表它所对应的 value 也没有作用了,所以它就会把对应的 value 置为 null,这样,value 对象就可以被正常回收了。

所以在创建 ThreadLocal 时需要手动 remove,防止弱引用特殊情况导致的内存泄漏问题。

可能的内容泄露问题

// remove 方法源码
public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

在 finally 后面调用 remove

5.5 控制并发

5.5.1 Semaphore

信号量的一个最主要的作用就是,来控制那些需要限制并发访问量的资源。信号量会维护“许可证”的计数,而线程去访问共享资源前,必须先拿到许可证。线程可以从信号量中去“获取”一个许可证。

在资源有限的情况下保证线程运行的稳定

在前面的线程使用 acquire() 获取 permit,任务执行结束后可以用 release() 归还。

除此之外,还有 tryAcquire(类似 trylock)尝试获取许可证;availablePermits 可以查看可用许可证数量。

// 执行一个耗时为 2s 的慢方法,通过 Semaphore 限制每次处理三个任务
public class SemaphoreDemo {
    // 限制允许线程数为 3
    static Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        ExecutorService executors = Executors.newFixedThreadPool(50);
        try {
            for (int i = 0; i < 100; i++) {
                executors.submit(new Task());
            }
        } finally {
            executors.shutdown();
        }
    }
    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + " 获得许可证,花费 2 秒执行任务");
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + " 执行完毕");
            semaphore.release();
        }
    }
}
  • 获取和释放的许可证数量尽量保持一致,否则比如每次都获取 2 个但只释放 1 个甚至不释放,那么信号量中的许可证就慢慢被消耗完了,最后导致里面没有许可证了,那其他的线程就再也没办法访问了;
  • 在初始化的时候可以设置公平性,如果设置为 true 则会让它更公平,但如果设置为 false 则会让总的吞吐量更高;
  • 信号量支持跨线程、跨线程池的,而且并不是哪个线程获得的许可证,就必须由这个线程去释放。事实上,对于获取和释放许可证的线程是没有要求的,比如线程 A 获取了然后由线程 B 释放,这完全是可以的,只要逻辑合理即可。

5.5.2 CountDownLatch

CountDownLatch 在 JDK 1.5 之后加入,是 JDK 提供的并发流程控制的工具类。

public CountDownLatch(int count){}; 构造函数传入一个参数,count 是需要倒数的数值。

await() 调用 await() 的线程开始等待,直到 count 为 0 时才继续执行;await(long timeout, TimeUnit unit),重载方法可以设置超时时间

countDown() 将 count 值减 1,直到减为 0 时,之前等待的线程会被唤起

CountDownLatch 无法被重用,在倒数完之后无法重新操作。

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(5);
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            Runnable runnable = () -> {
                try {
                    Thread.sleep((long) (Math.random() * 10000));
                    System.out.println(Thread.currentThread().getName() + " 运动员跑步已经结束");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    count.countDown();
                }
            };
            executor.submit(runnable);
        }
        System.out.println("运动员正在跑步");
        count.await();
        System.out.println("结束");
        executor.shutdown();
    }
}

class CountDownLatchDemo1 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(1);
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            Runnable runnable = () -> {
                System.out.println(Thread.currentThread().getName() + " 运动员准备就绪");
                try {
                    count.await();
                    System.out.println(Thread.currentThread().getName() + " 运动员跑步开始");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            };
            executor.submit(runnable);
        }

        try {
            Thread.sleep(2 * 1000);
            System.out.println("2s 结束,比赛准备开始");
            count.countDown();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            executor.shutdown();
        }
        Thread.sleep(2 * 1000);
        System.out.println("比赛结束");
    }
}

5.5.3 CyclicBarrier

CyclicBarrier(CyclicBarrier 和 CountDownLatch 功能相近,但前者更像是水坝,只有在流量达到一定情况才能够放任执行)可以构造出一个集结点,当某一个线程执行 await() 的时候,它就会到这个集结点开始等待,等待这个栅栏被撤销。直到预定数量的线程都到了这个集结点之后,这个栅栏就会被撤销,之前等待的线程就在此刻统一出发,继续去执行剩下的任务。

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> System.out.println("3 年之期已到"));
        // 设置栅格释放之后执行代码
        for (int i = 0; i < 6; i++) {
            new Thread(new Task(i + 1, cyclicBarrier)).start();
            // 注意这里是多线程
        }
    }

    static class Task implements Runnable {
        private final int id;

        private final CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println(MessageFormat.format("{0}同学准备出发", id));
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(MessageFormat.format("{0}同学到达站点", id));
                cyclicBarrier.await();
                System.out.println(MessageFormat.format("{0}同学出发", id));
            } catch (InterruptedException | BrokenBarrierException e) {
                throw new RuntimeException(e);
            }

        }
    }
}

/*
4 同学准备出发
5 同学准备出发
6 同学准备出发
3 同学准备出发
1 同学准备出发
2 同学准备出发
4 同学到达站点
6 同学到达站点
1 同学到达站点
3 年之期已到
1 同学出发
6 同学出发
4 同学出发
5 同学到达站点
2 同学到达站点
3 同学到达站点
3 年之期已到
3 同学出发
5 同学出发
2 同学出发
*/
  • CountDownLatch 作用于事件,但 CyclicBarrier 作用于线程。CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待数字倒数到 0;
  • CyclicBarrier 还可以随时调用 reset 方法进行重置,CountDownLatch 在倒数到 0 并且触发门闩打开后,就不能再次使用了;
  • CyclicBarrier 有执行动作 barrierAction,而 CountDownLatch 没这个功能。

5.5.4 Condition

通过 Condition 的 await 使得一个线程变成等待状态,在另一个线程调用 signal 时触发信息并反馈给第一个线程,执行之后的操作(需要等待第二个线程完全退出本身的锁)。

Condition 实现消息队列:

public class BlockingQueueDemo {
    private Queue<Object> queue;
    private int max = 16;
    private ReentrantLock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BlockingQueueDemo() {
        queue = new LinkedList<>();
    }

    public BlockingQueueDemo(int max) {
        this.max = max;
        queue = new LinkedList<>();
    }

    public void put(Object o) {
        lock.lock();
        try {
            if (queue.size() == max) {
                notFull.await();
            }
            queue.add(o);
            notEmpty.signalAll();
            // 添加元素,满足 notEmpty 条件,释放
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    public Object take(Object o) {
        lock.lock();
        try {
            if (queue.size() == 0) {
                notEmpty.await();
            }
            Object item = queue.remove();
            notFull.signalAll();
            return item;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
}

通过 synchronized 配合 wait() 和 notify() 也能完成对应操作,但是 Condition 转化成了一种相应的对象,lock 将保证阻塞更加可控。

参考文章

  1. Java JUC 简介
  2. 多线程 - 廖雪峰的官方网站
  3. JUC 并发编程目录
  4. Java 并发编程学习宝典(漫画版)
  5. Java 并发编程 78 讲
  6. 什么是协程?
  7. 并发与并行的区别是什么?
  8. 应该如何理解管程?
  9. Java 用户线程 & 守护线程的区别
  10. 异步编程利器:CompletableFuture 详解
  11. Java 创建线程的方式本质只有一种 new Thread
  12. Java 并发编程:volatile 关键字解析
  13. volatile 为什么不能保证原子性
  14. Atomic 原子类和 Unsafe 魔法类 详解
  15. Atomic 原子操作类详解
  16. Java 并发工具类之 LongAdder 原理总结
  17. 【Java并发编程实战】“JUC”:CAS 操作
  18. Unsafe -> VarHandle
  19. Atomic 包的 4 种类型详解
  20. synchronized 中的 4 个优化
  21. Java 锁与线程的那些事
  22. Java 并发编程:Synchronized 底层优化(偏向锁、轻量级锁)
  23. synchronized 优化手段之锁膨胀机制
  24. synchronized 中的 4 个优化
  25. 虚拟唤醒
  26. ForkJoinPool 大型图文现场
  27. LinkedBlockingQueue 和 ConcurrentLinkedQueue 的选择问题
  28. Java 信号量 Semaphore 介绍
  29. Java 并发编程 CountDownLatch、CyclicBarrier 和 Semaphore

文章作者: 陈鑫扬
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 陈鑫扬 !
评论
  目录