Skip to content

Latest commit

 

History

History
1852 lines (1528 loc) · 65.4 KB

05-无锁并发.md

File metadata and controls

1852 lines (1528 loc) · 65.4 KB

无锁并发

  • CAS 与 volatile (CAS + volatile 实现无锁并发)
  • 原子整数
  • 原子引用
  • 原子累加器
  • Unsafe:提供了 CAS 底层的访问接口

JUC 包提供了一系列的原子性操作类,这些类都是使用非阻塞算法 CAS 实现的,相比使用锁实现原子性操作,这在性能上有了很大提高。前提是,CPU 要多。

问题

有如下需求,保证 account.withdraw 取款方法的线程安全

package unlock;

import java.util.ArrayList;
import java.util.List;

public class TestAccount {
    public static void main(String[] args) {
        Account account = new AccountUnsafe(10000);
        Account.demo(account);
    }
}

class AccountUnsafe implements Account {
    private Integer balance;

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public Integer getBalance() {
        return balance;
    }

    @Override
    public void withdraw(Integer amount) {
        this.balance -= amount;
    }
}

interface Account {
    // 获取余额
    Integer getBalance();
    // 取款
    void withdraw(Integer amount);
    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();
        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> { account.withdraw(10); }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms");
    }
}
// 某次执行结果
// 330 cost: 306 ms 

为什么不安全

withdraw 方法

@Override
public void withdraw(Integer amount) {
    this.balance -= amount;
}
ALOAD 0 // <- this
ALOAD 0
GETFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // <- this.balance
INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱
ALOAD 1 // <- amount
INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱
ISUB // 减法
INVOKESTATIC java/lang/Integer.valueOf (I)Ljava/lang/Integer; // 结果装箱
PUTFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // -> this.balance

多线程执行流程

ALOAD 0 // thread-0 <- this 
ALOAD 0 
GETFIELD cn/itcast/AccountUnsafe.balance // thread-0 <- this.balance 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 拆箱
ALOAD 1 // thread-0 <- amount 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 拆箱
ISUB // thread-0 减法
INVOKESTATIC java/lang/Integer.valueOf // thread-0 结果装箱
PUTFIELD cn/itcast/AccountUnsafe.balance // thread-0 -> this.balance 

ALOAD 0 // thread-1 <- this 
ALOAD 0 
GETFIELD cn/itcast/AccountUnsafe.balance // thread-1 <- this.balance 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 拆箱
ALOAD 1 // thread-1 <- amount 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 拆箱
ISUB // thread-1 减法
INVOKESTATIC java/lang/Integer.valueOf // thread-1 结果装箱
PUTFIELD cn/itcast/AccountUnsafe.balance // thread-1 -> this.balance 

单核的指令交错;多核的指令交错

解决

  • 给对象加锁

  • 无锁:AtomicInteger

package unlock;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class TestAccount {
    public static void main(String[] args) {
        Account account = new AccountSafe(10000);
        Account.demo(account);
    }
}

class AccountSafe implements Account {
    private AtomicInteger balance;

    public AccountSafe(Integer balance) {
        this.balance = new AtomicInteger(balance);
    }

    @Override
    public Integer getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(Integer amount) {
        while (true) {
            // 获取余额的最新值
            int prev = balance.get();
            // 把 prev 修改为 next  线程内完成的
            int next = prev - amount;
            // 把修改的余额,同步到主存中去。
            if (balance.compareAndSet(prev, next)) {
                // 失败就 break 进行下一次的尝试
                break;
            }
        }
        // 可以简化为下面的方法
        // balance.addAndGet(-1 * amount);
    }
}

interface Account {
    // 获取余额
    Integer getBalance();
    // 取款
    void withdraw(Integer amount);
    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();
        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> { account.withdraw(10); }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms");
    }
}

执行测试代码

public static void main(String[] args) {
    Account.demo(new AccountSafe(10000));
}
// 某次的执行结果 0 cost: 302 ms 

CAS与volatile

CAS 介绍

CAS 即 Compare and Swap,其是 JDK 提供的非阻塞原子性操作,它通过硬件保证了比较---更新操作的原子性。JDK 里面的 Unsafe 类提供了一系列的 compareAndSwap/conpareAdnSet 方法,下面以 compareAndSwapLong 方法为例进行简单介绍

boolean compareAndSwapLong(Object obj, long valueOffset,long expect, long update)方法:其中 compareAndSwap 的意思是比较并交换。CAS 有四个操作数,分别为:对象内存位置、对象中的变量的偏移量、变量预期值和新的值。其操作含义是,如果对象 obj 中内存偏移量为 valueOffset 的变量值为 expect,则使用新的值 update 替换旧的值 expect。这是处理器提供的一个原子性指令。

前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?

public void withdraw(Integer amount) {
    while (true) {
        // 需要不断尝试,直到成功为止
        while (true) {
            // 比如拿到了旧值 1000
            int prev = balance.get();
            // 在这个基础上 1000-10 = 990
            int next = prev - amount;
            /*
			 compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
             - 不一致了,next 作废,返回 false 表示失败。比如,别的线程已经做了减法,当前值已经被减成了 990
               那么本线程的这次 990 就作废了,进入 while 下次循环重试
             - 一致,以 next 设置为新值,返回 true 表示成功
            */
            if (balance.compareAndSet(prev, next)) {
                break;
            }
        }
    }
}

其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。

注意:其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住(或者锁缓存),当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。

volatile

获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。

它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改对另一个线程可见。

注意:volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原子性)。而 CAS 必须借助 volatile 才能读取到共享变量的最新值来实现比较并交换的效果。

AtomicInteger

部分源码如下

public class AtomicInteger extends Number implements java.io.Serializable {
    // 内部的值用 volatile 修饰的
    private volatile int value;
}

为什么有时候CAS效率高

无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻

线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火, 等被唤醒又得重新打火、启动、加速...恢复到高速运行,代价比较大

但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

CAS的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一 。
    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响。

Unsafe类

CAS详解

AtomicInteger 原子类中的 CAS 操作

public final boolean compareAndSet(int expectedValue, int newValue) {
    return U.compareAndSetInt(this, VALUE, expectedValue, newValue);
}

上面的 CAS 操作实际上调用的是 Unsafe 类中的 compareAndSetInt 方法。

public final native boolean compareAndSetInt(Object o, long offset,
                                             int expected,
                                             int x);

Unsafe 类是整个 concurrent 包的基础,里面所有的函数都是 native 的。compareAndSetInt 方法有四个参数

  • Object o:对象
  • long offset:对象的成员变量,意思是某个成员变量在对应的类中的内存偏移量(在内存中的位置)
  • int expected:期待的指
  • int x:修改为值 x。

在所有调用 CAS 前,都会通过 Unsafe#objectFieldOffset 这个函数把成员变量转换成一个 offset。

// JDK 11 的 source code
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");

后面执行 CAS 操作的时候,直接操作 offset 了。

Unsafe中的方法

JDK 的 rt.jar 包中的 Unsafe 类提供了硬件级别的原子性操作,Unsafe 类中的方法都是 native 方法,它们使用 JNI 的方式访问本地 C++ 实现库。

Unsafe 的重要方法

static {
    try {
        valueOffset = unsafe.objectFieldOffset // 获得变量 value 在 AtomicLong 对象中的内存偏移地址。
            (AtomicLong.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}
  • long objectFieldOffset(Field field) 方法:返回指定的变量在所属类中的内存偏移地址,该偏移地址仅仅在该 Unsafe 函数中访问指定字段时使用
  • int arrayBaseOffset(Class arrayClass) 方法:获取数组中第一个元素的地址。
  • int arrayIndexScale(Class arrayClass) 方法:获取数组中一个元素占用的字节。
  • boolean compareAndSwapLong(Object obj, long offset, longexpect, long update) 方法:比较对象 obj 中偏移量为 offset 的变量的值是否与 expect 相等,相等则使用 update 值更新,然后返回 true,否则返回 false。
  • public native long getLongvolatile(Object obj, long offset) 方法:获取对象 obj 中偏移量为 offset 的变量对应 volatile 语义的值
  • void putLongvolatile(Object obj, long offset, long value) 方法:设置 obj 对象中 offset 偏移的类型为 long 的 field 的值为 value,支持 volatile 语义。
  • void putOrderedLong(Object obj, long offset, long value) 方法:设置 obj 对象中 offset 偏移地址对应的 long 型 field 的值为 value。这是一个有延迟的 putLongvolatile 方法,并且不保证值修改对其他线程立刻可见。只有在变量使用 volatile 修饰并且预计会被意外修改时才使用该方法。
  • void park(boolean isAbsolute, long time) 方法:阻塞当前线程,其中参数 isAbsolute 等于 false 且 time 等于0表示一直阻塞。time 大于0表示等待指定的 time 后阻塞线程会被唤醒,这个 time 是个相对值,是个增量值,也就是相对当前时间累加 time 后当前线程就会被唤醒。如果 isAbsolute 等于 true,并且 time 大于0,则表示阻塞的线程到指定的时间点后会被唤醒,这里 time 是个绝对时间,是将某个时间点换算为 ms 后的值。另外,当其他线程调用了当前阻塞线程的 interrupt 方法而中断了当前线程时,当前线程也会返回,而当其他线程调用了 unPark 方法并且把当前线程作为参数时当前线程也会返回。
  • void unpark(Object thread) 方法:唤醒调用 park 后阻塞的线程。

Unsafe 的示例

import sun.misc.Unsafe;

public class TestUnsafe {
    static final Unsafe unsafe = Unsafe.getUnsafe();
    static final long stateOffset;
    private volatile long state = 0;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset(TestUnsafe.class.getDeclaredField("state"));
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
            throw new Error(e);
        }
    }

    public static void main(String[] args) {
        TestUnsafe safe = new TestUnsafe();
        boolean b = unsafe.compareAndSwapInt(safe, stateOffset, 0, 1);
        System.out.println(b);
    }
}

这段代码有问题:在获取 Unsafe 对象时,会判断是不是 Bootstrap 类加载器加载的 localClass,在这里就是看是不是 Bootstrap 加载器加载了 TestUnSafe.class。很明显由于 TestUnSafe.class 是使用 AppClassLoader 加载的,所以这里直接抛出了异常。要想访问,只能通过反射获得 Unsafe 对象了。

import sun.misc.Unsafe;

import java.lang.reflect.Field;

public class TestUnsafe {
    static final Unsafe unsafe;
    static final long stateOffset;
    private volatile long state = 0;

    static {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe = (Unsafe) field.get(null);
            stateOffset = unsafe.objectFieldOffset(TestUnsafe.class.getDeclaredField("state"));
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
            throw new Error(e);
        }
    }

    public static void main(String[] args) {
        TestUnsafe safe = new TestUnsafe();
        boolean success = unsafe.compareAndSwapInt(safe, stateOffset, 0, 1);
        System.out.println(success);
    }
}
// true

原子整数

用法

JUC 并发包提供了:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

以 AtomicInteger 为例

public class AtomicNumber {
    public static void main(String[] args) {
        AtomicInteger number = new AtomicInteger(0);
        // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
        System.out.println(number.getAndIncrement());
        // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
        System.out.println(number.incrementAndGet());
        // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
        System.out.println(number.decrementAndGet());
        // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
        System.out.println(number.getAndDecrement());
        // 获取并加值(i = 0, 结果 i = 5, 返回 0)
        System.out.println(number.getAndAdd(5));
        // 加值并获取(i = 5, 结果 i = 0, 返回 0)
        System.out.println(number.addAndGet(-5));
        // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
        // lambda 表达式
        // 其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(number.getAndUpdate(p -> p - 2));
        // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
        // 其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(number.updateAndGet(p -> p + 2));
        // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
        // 其中函数中的操作能保证原子,但函数需要无副作用
        // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
        // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
        System.out.println(number.getAndAccumulate(10, (p, x) -> p + x));
        // 计算并获取(i = 10, p 为 i 的当前值, x 为参数10, 结果 i = 0, 返回 0)
        // 其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(number.accumulateAndGet(-10, (p, x) -> p + x));
    }
}

原理

基于 JDK8 的代码,高版本 JDK 的代码有所不同。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // 保存 value 在 AtomicInteger 对象中的内存偏移地址
    private static final long valueOffset;
		
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
	
    private volatile int value;
}

JDK11 的代码

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    /*
     * This class intended to be implemented using VarHandles, but there
     * are unresolved cyclic startup dependencies.
     */
    private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
    private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");

    private volatile int value;
}

incrementAndGet 方法

实际上调用的还是 unsafe 的方法。

public final int incrementAndGet() {
    // valueOffset 地址偏移  1 在原有值的基础上+1。
    // incrementAndGet,先自增,自增结束后再获取。
    // getAndAdd 先获得再自增 所以 getAndAdd + 1
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

compareAndSet

  • expect,期待的初始值是 expect
  • 如果是,则把初始值更新为 update
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

更为具体的代码注释,参考 Unsafe.java 文件。

updateAndGet

CAS 实现 updateAndGet

public static void updateAndGet() {
    AtomicInteger atomicInteger = new AtomicInteger(5);
    while (true) {
        int pre = atomicInteger.get();
        int next = pre * 10;
        if (atomicInteger.compareAndSet(pre, next)) break;
    }
    System.out.println(atomicInteger.get());
}

原子类累加

多线程中使用 AtomicLong 累加

/**
 * 多线程 AtomicLong 累加
 */
public class AtomicLongAddDemo {
    public static void main(String[] args) throws InterruptedException {
        List<Integer> list1 = Arrays.asList(0, 1, 20, 3, 12, 0, 0, 1, 1, 0, 2, 0, 2, 2, 0, 1, 0, 20);
        List<Integer> list2 = Arrays.asList(0, 0, 20, 3, 12, 0, 0, 1, 1, 0, 2, 0, 2, 2, 0, 1, 0, 20);
        AtomicLong sum = new AtomicLong(0);
        long result = list1.stream().filter(e -> e == 0).count() + list2.stream().filter(e -> e == 0).count();
        Thread th1 = new Thread(() -> {
            list1.stream()
                .filter(e -> e == 0)
                .forEach(e -> sum.getAndIncrement());
        });

        Thread th2 = new Thread(() -> {
            list2.stream()
                .filter(e -> e == 0)
                .forEach(e -> sum.getAndIncrement());
        });
        th1.start();
        th2.start();
        th1.join();
        th2.join();
        System.out.println(result == sum.get());
    }
}

在没有原子类的情况下,实现计数器需要使用一定的同步措施,比如 synchronized 关键字等,但是这些都是阻塞算法,对性能有一定损耗,而原子操作类都使用 CAS 非阻塞算法,性能更好。但是在高并发情况下 AtomicLong 还会存在性能问题。JDK8 提供了一个在高并发下性能更好的原子累加器 LongAdder 类。

其他原子整数

AtomicBoolean

对于 int 或者 long 型变量,需要进行加减操作,所以要加锁;但对于一个 boolean 类型来说,true 或 false 的赋值和取值操作,加上 volatile 关键字就够了,为什么还需要 AtomicBoolean 呢?这是因为往往要实现下面这种由多个操作组合在一起的功能:

if(flag == false){
    flag = true;
    ...
}

也就是要实现 compare 和 set 两个操作合在一起的原子性,而这也正是 CAS 提供的功能。上面的代码,就变成:

if(compareAndSet(false,true)){
    ...
}

AtomicReference

同样的,AtomicReference 也是因为要实现

if(obj == xxx){
    obj = kkk;
    ....
}

这种功能而存在的。

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceDemo {
    static class TestDemo {
        int idx = 123;

        public TestDemo setIdx(int idx) {
            this.idx = idx;
            return this;
        }
    }

    public static void main(String[] args) {
        TestDemo t1 = new TestDemo().setIdx(100);

        // 用 t1 初始化 AtomicReference
        AtomicReference<TestDemo> reference = new AtomicReference<>(t1);
        TestDemo t2 = new TestDemo().setIdx(200);
        // 如果 reference 中的对象是 t1,则将 reference 中的对象的引用设置为 t2
        if (reference.compareAndSet(t1, t2)) {
            // 如果设置成功则打印
            System.out.println(reference.get().idx);
        }
    }
}

如何支持Boolean和Double

较低版本的 JDK 中的 Unsafe 中只提供了三种类型 CAS 操作:int、long、Object。而高版本 JDK 则提供的更为全面。

AtomicStampedReference

CAS 都是基于“值”来做比较的。但如果另外一个线程把变量的值从 A 改为 B,再从 B 改回到 A,那么尽管修改过两次,可是在当前线程做 CAS 操作的时候,却会因为值没变而认为数据没有被其他线程修改过,这就是所谓的 ABA 问题。要解决 ABA 问题,不仅要比较“值”,还要比较“版本号”,而这正是 AtomicStampedReference 做的事情,其对应的 CAS 操作如下

public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}

带版本号的 CAS 代码示例

public class Demo {
    public static void main(String[] args) {
        String str1 = "A";
        String str2 = "B";
        AtomicStampedReference<String> reference = new AtomicStampedReference<>(str1, 1);
        // cas 的时候,也将版本号进行更新
        reference.compareAndSet(str1, str2, reference.getStamp(), reference.getStamp() + 1);
        System.out.println(reference.getReference()); // B CAS 成功
        System.out.println(reference.getStamp()); // 版本号更新为了 2

        // 设置新的版本号,设置为 3(学习 API 的使用)
        reference.attemptStamp(str2, reference.getStamp() + 1);
        System.out.println(reference.getStamp());

        // CAS 失败,期待的版本号和 reference 中的版本号不一样。
        reference.compareAndSet(str2, str1, 2, reference.getStamp());
        System.out.println(reference.getReference());

        // 不提供 happend-before,即不提供顺序的保证,可能会有
        // reference.weakCompareAndSet()
    }
}

weakCompareAndSet 的解释,可能会造成虚假的失败,不提供顺序保障。

AtomicStampedReference 采用的泛型,将需要 CAS 的变量和版本号封装为一个 Pair。因为要同时比较数据的【值】和【版本号】。而 Integer 或者 Long 的 CAS 没法同时比较两个变量,于是就将它们封装为一个对象,然后通过对象引用的 CAS 来实现。

public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}

// CAS
private boolean casPair(Pair<V> cmp, Pair<V> val) {
    return PAIR.compareAndSet(this, cmp, val);
}

AtomicMarkableReference

与 AtomicStampedReference 类似,只不过 Pair 里面的版本号是 boolean 类型的,而不是整型的累加计数。

public AtomicMarkableReference(V initialRef, boolean initialMark) {
    pair = Pair.of(initialRef, initialMark);
}

因为是 boolean 类型,只能有 true、false 两个版本号,所以并不能完全避免 ABA 问题,只是降低了 ABA 发生的概率。

原子累加器

原子累加器:LongAdder。

使用 AtomicLong 时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的 CAS 操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试 CAS 的操作,而这会白白浪费 CPU 资源。LongAdder 把一个变量分解为多个变量,让多个线程去竞争这多个变量,而非单个,在一定程度上可以增强并发度。在汇总获取值时,再进行一次累加即可。

原理概述

使用 LongAdder 时,其在内部会维护多个 Cell 变量,每个 Cell 里面有一个初始值为 0 的 long 型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个 Cell 原子变量时如果失败了,它并不是在当前 Cell 变量上一直自旋 CAS 重试,而是尝试在其他 Cell 的变量上进行 CAS 尝试,这个改变增加了当前线程重试 CAS 成功的可能性。最后,在获取 LongAdder 当前值时,是把所有 Cell 变量的 value 值累加后再加上 base 返回的。

LongAdder 维护了一个延迟初始化的原子性更新数组(默认情况下 Cell 数组是 null)和一个基值变量 base。由于 Cells 占用的内存是相对比较大的,所以一开始并不创建它,而是在需要时创建,也就是惰性加载。当一开始判断 Cell 数组是 null 并且并发线程较少时,所有的累加操作都是对 base 变量进行的。保持 Cell 数组的大小为 $2^n$,在初始化时 Cell 数组中的 Cell 元素个数为 2,数组里面的变量实体是 Cell 类型。Cell 类型是 AtomicLong 的一个改进,用来减少缓存的争用,也就是解决伪共享问题。

对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子性操作都是无规律地分散在内存中的(也就是说多个原子性变量的内存地址是不连续的),多个原子变量被放入同一个缓存行的可能性很小。但是原子性数组元素的内存地址是连续的,所以数组内的多个元素能经常共享缓存行,因此这里使用 @sun.misc.Contended 注解对 Cell 类进行字节填充,这防止了数组中多个元素共享一个缓存行,在性能上是一个提升。

小结

  • 使用多个累加单元,提高并发度,且累加单元是惰性初始化的,以便节省内存,必要时才使用更多的累加单元。
  • 累加的数据是通过 Cell 类型进行包裹的,是为了避免伪共享。

累加器性能比较

private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
    T adder = adderSupplier.get();
    long start = System.nanoTime();
    List<Thread> ts = new ArrayList<>();
    // 4 个线程,每人累加 50 万
    for (int i = 0; i < 40; i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 500000; j++) {
                action.accept(adder);
            }
        }));
    }
    ts.forEach(t -> t.start());
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    long end = System.nanoTime();
    System.out.println(adder + " cost:" + (end - start) / 1000_000);
}

比较 AtomicLong 与 LongAdder

for (int i = 0; i < 5; i++) {
     demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
     demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}

LongAdder 效率高出很多

性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]... 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

LongAdder

LongAdder 是并发大师 Doug Lea 的作品,设计的非常精巧。LongAdder 类有几个关键域

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域,没必要用 cells 这个累加单元
transient volatile long base;
// 在 cells 数组创建或扩容时, 置为 1, 表示加锁。可能多个线程都要扩容,所以要加锁保护。
transient volatile int cellsBusy;

我们需要关注的问题是:

  • LongAdder 的结构是怎样的?
    • 由 Cell 数组,base 和 cellsBusy 组成
  • 当前线程应该访问 Cell 数组里面的哪一个 Cell 元素?
    • 通过 getProbe 方法算出一个随机值 h,然后 (n-1)&h 算出对应那个下标的元素,没有 Cell 元素则创建,有则尝试创建;有则通过 CAS 进行累加。
  • 如何初始化 Cell 数组?
    • 基础累加单元数组 Cells 未创建,且对 base 进行 CAS 失败,则尝试初始化 Cells。
  • Cell 数组如何扩容?
    • 当前 cells 的元素个数小于当前机器 CPU 个数并且当前多个线程访问了 cells 中同一个元素,从而导致冲突使其中一个线程 CAS 失败时才会进行扩容操作。
    • 只有当每个 CPU 都运行一个线程时才会使多线程的效果最佳,也就是当 cells 数组元素个数与 CPU 个数一致时,每个 Cell 都使用一个 CPU 进行处理,这时性能才是最佳的。
  • 线程访问分配的 Cell 元素有冲突后如何处理?
    • 对 CAS 失败的线程重新计算当前线程的随机值 threadLocalRandomProbe,以减少下次访问 cells 元素时的冲突机会。
  • 如何保证线程操作被分配的 Cell 元素的原子性?
    • 线程通过分配的 Cell 元素的 cas 函数来保证对 Cell 元素 value 值更新的原子性

CAS锁

package unlock;

import lombok.extern.slf4j.Slf4j;

import java.sql.Time;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j(topic = "c.LockCas")
public class LockCas {
    private static AtomicInteger state = new AtomicInteger(0);

    public void lock() {
        while (true) {
            if (state.compareAndSet(0, 1)) {
                break;
            }
        }
    }
	// 因为解锁的是拿到锁的线程,所以是安全的。
    public void unlock() {
        log.debug("unlock...");
        state.set(0);
    }
}

测试。发现确实只有一个线程可以拿到锁。

public static void main(String[] args) {
    LockCas lockCas = new LockCas();
    Thread th = new Thread(() -> {
        try {
            log.debug("begin...");
            lockCas.lock();
            log.debug("lock");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lockCas.unlock();
        }
    }, "线程1");

    Thread th2 = new Thread(() -> {
        try {
            log.debug("begin...");
            lockCas.lock();
            log.debug("lock...");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lockCas.unlock();
        }
    });
    th.start();
    th2.start();
}
/*
13:52:35.165 c.LockCas [Thread-0] - begin...
13:52:35.165 c.LockCas [Thread-0] - lock...
13:52:35.165 c.LockCas [Thread-0] - unlock...
13:52:35.165 c.LockCas [线程1] - begin...
13:52:35.165 c.LockCas [线程1] - lock
13:52:36.180 c.LockCas [线程1] - unlock...
*/

原理-伪共享

一个缓存行,加入了多个 Cell 对象,称之为伪贡献。

其中 Cell 即为累加单元

// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
     volatile long value;
     Cell(long x) { value = x; }
 
     // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
     final boolean cas(long prev, long next) {
     	return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
     }
     // 省略不重要代码
}

缓存的相关概念如下:CPU 与内存的速度不匹配问题,所以引入了 CPU cache。

从 cpu 到 大约需要的时钟周期
寄存器 1 cycle
L1 3~4 cycle
L2 10~20 cycle
L3 40~45 cycle
内存 120~240 cycle

因为 CPU 与内存的速度差异很大,需要靠预读数据至缓存来提升效率。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中。

CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:

  • Core-0 要修改 Cell[0]
  • Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效

@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

CPU缓存结构

查看 CPU 缓存

payphone@Payphone:~$ lscpu
Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              8
On-line CPU(s) list: 0-7
Thread(s) per core:  2
Core(s) per socket:  4
Socket(s):           1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               158
Model name:          Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz
Stepping:            10
CPU MHz:             2304.000
CPU max MHz:         2304.0000
BogoMIPS:            4608.00
Virtualization:      VT-x
Hypervisor vendor:   Windows Subsystem for Linux
Virtualization type: container

速度比较

查看 CPU 缓存行

~ cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size

CPU 拿到的内存地址格式是这样的 [高位组标记][低位索引][偏移量]

CPU 缓存读

读取数据流程如下

  • 根据低位,计算在缓存中的索引
  • 判断是否有效
    • 0 去内存读取新数据更新缓存行
    • 1 再对比高位组标记是否一致
      • 一致,根据偏移量返回缓存数据
      • 不一致,去内存读取新数据更新缓存行

CPU 缓存一致性

MESI 协议

  • 1️⃣E、S、M 状态的缓存行都可以满足 CPU 的读请求
  • 2️⃣E 状态的缓存行,有写请求,会将状态改为 M,这时并不触发向主存的写
  • 3️⃣E 状态的缓存行,必须监听该缓存行的读操作,如果有,要变为 S 状态
  • 4️⃣M 状态的缓存行,必须监听该缓存行的读操作,如果有,先将其它缓存(S 状态)中该缓存行变成 I 状态(即 6. 的流程),写入主存,自己变为 S 状态
  • 5️⃣S 状态的缓存行,有写请求,走 4. 的流程
  • 6️⃣S 状态的缓存行,必须监听该缓存行的失效操作,如果有,自己变为 I 状态
  • 7️⃣I 状态的缓存行,有读请求,必须从主存读取

内存屏障

Memory Barrier(Memory Fence)

  • 可见性
    • 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
    • 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
  • 有序性
    • 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
    • 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
sequenceDiagram
participant t1 as t1 线程
participant t2 as t2线程
participant i as static i
participant j as static j
participant k as volatile static k
t1->>i:putstatic 写入
t1->>j:putstatic 写入
t1-x k:putstatic 写入,带写屏障
t2-x k:getstatic 读取,带读屏障
t2->>i:putstatic 读取
t2->>j:putstatic 读取
Loading

源码-LongAdder

先看 increment 方法,发现它调用了 add 方法。

public void increment() {
    add(1L); // 调用的 add 方法
}

接下来细看下 add 方法。『以 cell 为空进行梳理;不为空的,自行梳理』

public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    // cells 是累加单元的数组 || 对基础的累加单元进行累加
    // 如果 cells 是 null,就在 base 
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
        // 如果 cells 不是 null 或者线程执行代码
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[getProbe() & m]) == null || 	//  (c = cs[getProbe() & m]) == null 判断当前线程,有没有对应的 cell,没创建就进入 longAccumulate 创建	 	
            !(uncontended = c.cas(v = c.value, v + x))) // 假如前面的判断都不成立,当前线程有累加单元,则执行累加单元的 cas。x是传过来的1
            // cs == null 成立,进入 longAccumulate,进行 cells 的创建
            longAccumulate(x, null, uncontended);
    }
}
graph LR
subgraph add
cur(当前线程)-->cells
cells-->|为空|cas_base(cas base 累加)
cas_base-->|成功|return
cas_base-->|失败|longAccumulate
cells-->|不为空|是否创建cell(当前线程 cell 是否创建)
是否创建cell-->|创建了|cas_cell(cas cell 累加)
cas_cell-->|成功|return
cas_cell-->|失败|longAccumulate
是否创建cell-->|没创建|longAccumulate
end
Loading

LongAdder-longAccumulate 源码,cells 数组被初始化和扩容的地方。

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 初始化当前线程的变量 threadLocalRandomProbe 的值
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // 1.cell 数组不为空。
        if ((as = cells) != null && (n = as.length) > 0) {
			// doing something
        }
        // 2.cell 数组还没创建。cellsBusy == 0 还没加锁;cells == as 还没有其他线程改变 cs 数组; casCellsBusy() 尝试把 cellsBusy 置为1 进行加锁
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) { // 2.1 看有没有其他线程把 cell 创建了
                    Cell[] rs = new Cell[2]; 
                    rs[h & 1] = new Cell(x); // 2.2 创建类加单元,并把初始的信息x,作为初始值赋值给类加单元
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 3.加锁失败就进入这个 else if。
        // 加锁失败就在 base 上进行累加。累加失败的话,就回到循环
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}
graph LR
subgraph cells 创建
start(循环入口)-->cells不存在&未加锁&未新建
cells不存在&未加锁&未新建-->加锁

加锁-->|成功|创建cells并初始化一个cell
加锁-->|失败|cas_base(cas base 累加)
创建cells并初始化一个cell-->return
cas_base-->|成功|return
cas_base-->|失败|回到循环入口
end
Loading

接下来看 cell 数组存在 &cell 没创建的情况

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            // 获取当前线程,看有没有对应的累加单元,如果是null 说明还没创建。
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) { // 没上锁,那就自己尝试上锁,把自己创建的累加单元加上去。
                        boolean created = false;
                        try {               // Recheck under lock 
                            // 执行加锁后的操作。
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) { // 检测线程对应的数组中空的槽位是不是真的是 null。不是null 说明已经被其占据了
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
    }
}
graph LR
subgraph cell 创建
循环入口(循环入口)-->cells存在&cell没创建(cells存在&cell没创建)
cells存在&cell没创建-->|创建cell|加锁(加锁)
加锁-->|成功|槽位为空
槽位为空-->|成功|return(return)
加锁-->|失败|回到循环入口
槽位为空-->|失败|回到循环入口
end
Loading

接下来看 cell 数组存在和 cell 创建的情况

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
				// doing something
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 累加单元就是a,cas 对原来的值进行累加。累加成功就返回
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // 累加失败:检测是否超过了 CPU 上限,那么扩容就没有意义了。
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale。那么下次循环就会进入 else if(!collide),就不会走扩容逻辑了!!
            else if (!collide)
                collide = true;
            // 没有超过 CPU 上限,就进行扩容。
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i]; // 拷贝到新数组
                        cells = rs; // 新数组替换掉旧数组
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false; // 这个啥意思?
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h); // 改变线程的 cell 对象。你在这个累加单元总是累加失败,那么我给你换一个累加单元
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
			// doing something
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}
graph LR
subgraph cas cell
循环入口(循环入口)-->cas_cell(cas cell 累加)
cas_cell-->|成功|return(return)
cas_cell-->|失败|是否超过CPU上限(是否超过 CPU 上限)
是否超过CPU上限-->|是|改变线程对应的cell(改变线程对应的 cell)
是否超过CPU上限-->|否|加锁(加锁)
加锁-->|失败|改变线程对应的cell
加锁-->|成功|扩容(扩容)
扩容-->回到循环入口(回到循环入口)
改变线程对应的cell-->回到循环入口
end
Loading

sum 方法

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

原子引用

为什么需要原子引用类型?因为要保护的数据不一定是数据类型,其他的可以用 AtomicReference 进行包裹。

  • AtomicReference
  • AtomicMarkableReference
  • AtomicStampedReference

有如下方法

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

interface DecimalAccount {
    // 获取余额
    BigDecimal getBalance();

    // 取款
    void withdraw(BigDecimal amount);

    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(DecimalAccount account) {
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(BigDecimal.TEN);
            }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }
}

试着提供不同的 DecimalAccount 实现,实现安全的取款操作

不安全实现

非原子性操作,不安全。可以用 AtomicReference 进行包裹,然后用 cas 操作进行数据的更新,从而达到线程安全的目的;也可以加 synchronized。

public class AtomicReferenceDemo {
    public static void main(String[] args) {
        DecimalAccount decimalAccountUnsafe = new DecimalAccountUnsafe(new BigDecimal(10000));
        DecimalAccount.demo(decimalAccountUnsafe);
    }
}

class DecimalAccountUnsafe implements DecimalAccount {
    BigDecimal balance;

    public DecimalAccountUnsafe(BigDecimal balance) {
        this.balance = balance;
    }

    @Override
    public BigDecimal getBalance() {
        return balance;
    }

    @Override
    public void withdraw(BigDecimal amount) {
        BigDecimal balance = this.getBalance();
        this.balance = balance.subtract(amount);
    }
}

安全实现-使用sync

public class AtomicReferenceDemo {
    public static void main(String[] args) {
        DecimalAccount account = new DecimalAccountSafeLock(new BigDecimal(10000));
        DecimalAccount.demo(account);
    }
}

class DecimalAccountSafeLock implements DecimalAccount {
    private final Object lock = new Object();
    BigDecimal balance;

    public DecimalAccountSafeLock(BigDecimal balance) {
        this.balance = balance;
    }

    @Override
    public BigDecimal getBalance() {
        return balance;
    }

    @Override
    public void withdraw(BigDecimal amount) {
        synchronized (lock) {
            BigDecimal balance = this.getBalance();
            this.balance = balance.subtract(amount);
        }
    }
}

安全实现-使用CAS

public class AtomicReferenceDemo {
    public static void main(String[] args) {
        DecimalAccount account = new DecimalAccountSafeCas(new BigDecimal(10000));
        DecimalAccount.demo(account);
    }
}

class DecimalAccountSafeCas implements DecimalAccount {
    AtomicReference<BigDecimal> ref;

    public DecimalAccountSafeCas(BigDecimal balance) {
        ref = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return ref.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while (true) {
            BigDecimal prev = ref.get();
            BigDecimal next = prev.subtract(amount);
            if (ref.compareAndSet(prev, next)) {
                break;
            }
        }
    }
}

测试代码

DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal("10000")));
DecimalAccount.demo(new DecimalAccountSafeLock(new BigDecimal("10000")));
DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal("10000")));

// 运行结果
// 4310 cost: 425 ms 
// 0 cost: 285 ms 
// 0 cost: 274 ms

ABA问题

ABA问题

static AtomicReference<String> ref = new AtomicReference<>("A");

public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // 获取值 A
    // 这个共享变量被它线程修改过?
    String prev = ref.get();
    other();
    sleep(1);
    // 尝试改为 C
    log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}

private static void other() {
    new Thread(() -> {
        log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
    }, "t1").start();
    sleep(0.5);
    new Thread(() -> {
        log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
    }, "t2").start();
}

输出

11:29:52.325 c.Test36 [main] - main start... 
11:29:52.379 c.Test36 [t1] - change A->B true 
11:29:52.879 c.Test36 [t2] - change B->A true 
11:29:53.880 c.Test36 [main] - change A->C true

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望:

只要有其它线程『动过了』共享变量,那么自己的 CAS 就算失败,这时,仅比较值是不够的,需要再加一个版本号。

AtomicStampedReference P172

public class ABA {
    static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

    public static void main(String[] args) throws InterruptedException {
        log.debug("main start...");
        // 获取值 A
        String prev = ref.getReference();
        // 获取版本号
        int stamp = ref.getStamp();
        log.debug("版本 {}", stamp);
        // 如果中间有其它线程干扰,发生了 ABA 现象
        other();
        sleep(1);
        // 尝试改为 C
        log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
    }

    private static void other() {
        new Thread(() -> {
            log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
                    ref.getStamp(), ref.getStamp() + 1));
            log.debug("更新版本为 {}", ref.getStamp());
        }, "t1").start();
        sleep(0.5);
        new Thread(() -> {
            log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
                    ref.getStamp(), ref.getStamp() + 1));
            log.debug("更新版本为 {}", ref.getStamp());
        }, "t2").start();
    }
}

输出为

15:41:34.891 c.Test36 [main] - main start... 
15:41:34.894 c.Test36 [main] - 版本 0 
15:41:34.956 c.Test36 [t1] - change A->B true 
15:41:34.956 c.Test36 [t1] - 更新版本为 1 
15:41:35.457 c.Test36 [t2] - change B->A true 
15:41:35.457 c.Test36 [t2] - 更新版本为 2 
15:41:36.457 c.Test36 [main] - change A->C false 

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A -> C,通过 AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。

但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了 AtomicMarkableReference

graph TD
保洁阿姨(保洁阿姨)
主人(主人)
保洁阿姨-.->|倒空|垃圾袋(垃圾袋)
主人-->|检查|垃圾袋
垃圾袋-->|还空|垃圾袋
垃圾袋-->|已满|新垃圾袋
Loading

AtomicMarkableReference

public class GarbageBag {
    String desc;

    public GarbageBag(String desc) {
        this.desc = desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return super.toString() + " " + desc;
    }
}
@Slf4j
public class TestABAAtomicMarkableReference {
    public static void main(String[] args) throws InterruptedException {
        GarbageBag bag = new GarbageBag("装满了垃圾");
        // 参数2 mark 可以看作一个标记,表示垃圾袋满了
        AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
        log.debug("主线程 start...");
        GarbageBag prev = ref.getReference();
        log.debug(prev.toString());
        new Thread(() -> {
            log.debug("打扫卫生的线程 start...");
            bag.setDesc("空垃圾袋");
            while (!ref.compareAndSet(bag, bag, true, false)) {
            }
            log.debug(bag.toString());
        }).start();
        Thread.sleep(1000);
        log.debug("主线程想换一只新垃圾袋?");
        boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
        log.debug("换了么?" + success);
        log.debug(ref.getReference().toString());
    }
}

输出

15:30:09.264 [main] 主线程 start... 
15:30:09.270 [main] cn.itcast.GarbageBag@5f0fd5a0 装满了垃圾
15:30:09.293 [Thread-1] 打扫卫生的线程 start... 
15:30:09.294 [Thread-1] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
15:30:10.294 [main] 主线程想换一只新垃圾袋?
15:30:10.294 [main] 换了么?false 
15:30:10.294 [main] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋

可以注释掉打扫卫生线程代码,再观察输出

原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

函数式接口复习:

  • suppiler 提供者,没有参数,但是需要提供返回结果
  • function 函数,一个参数一个结果 (参数)---> 结果 ,BiFunction (参数1,参数2)---> 结果
  • consumer 消费者,一个参数,没有结果 (参数)---> void,BiConsumer(参数1,参数2)---> void
/**
     * 参数1,提供数组、可以是线程不安全数组或线程安全数组
     * 参数2,获取数组长度的方法
     * 参数3,自增方法,回传 array, index
     * 参数4,打印数组的方法
     */
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
private static <T> void demo(
    Supplier<T> arraySupplier,
    Function<T, Integer> lengthFun,
    BiConsumer<T, Integer> putConsumer,
    Consumer<T> printConsumer) {
    List<Thread> ts = new ArrayList<>();
    T array = arraySupplier.get();
    int length = lengthFun.apply(array);
    for (int i = 0; i < length; i++) {
        // 每个线程对数组作 10000 次操作
        ts.add(new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                putConsumer.accept(array, j % length);
            }
        }));
    }
    ts.forEach(t -> t.start()); // 启动所有线程
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }); // 等所有线程结束
    printConsumer.accept(array);
}

不安全数组

demo(
 ()->new int[10],
 (array)->array.length,
 (array, index) -> array[index]++,
 array-> System.out.println(Arrays.toString(array))
);
// 结果 [9870, 9862, 9774, 9697, 9683, 9678, 9679, 9668, 9680, 9698] 

安全数组

demo(
 ()-> new AtomicIntegerArray(10),
 (array) -> array.length(),
 (array, index) -> array.getAndIncrement(index),
 array -> System.out.println(array)
);
// 结果 [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000] 

字段更新器

如果一个类是自己编写的,则可以在编写的时候把成员变量定义为 Atomic 类型。但如果是一个已经有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater。

  • AtomicReferenceFieldUpdater // 域 字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常

AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater 的构造函数是 protected 修饰的,且 AtomicIntegerFieldUpdater 是个抽象类,不能直接 new 出来。必须通过它提供的一个静态函数来创建。

// tclass – the class of the objects holding the field
// fieldName – the name of the field to be updated
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                          String fieldName) {
    // AtomicIntegerFieldUpdater 的一个实现类
    return new AtomicIntegerFieldUpdaterImpl<U>
        (tclass, fieldName, Reflection.getCallerClass());
}
  • tclass:字节码
  • filedName:成员变量的名称
public class Test5 {
    // 这个变量必须被 volatile 修饰,不然会抛出异常。
    // Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
    private volatile int field;

    public static void main(String[] args) {
        AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");
        Test5 test5 = new Test5();
        fieldUpdater.compareAndSet(test5, 0, 10);
        // 修改成功 field = 10
        System.out.println(test5.field);
        // 修改成功 field = 20
        fieldUpdater.compareAndSet(test5, 10, 20);
        System.out.println(test5.field);
        // 修改失败 field = 20
        fieldUpdater.compareAndSet(test5, 10, 30);
        System.out.println(test5.field);
    }
}
// 输出
// 10
// 20
// 30

Unsafe

概述

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得

package unlock;

import sun.misc.Unsafe;

import java.lang.reflect.Field;

public class UnsafeDemo {
    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
        theUnsafe.setAccessible(true);
        Unsafe unsafe = (Unsafe) theUnsafe.get(null);
        System.out.println(unsafe);
    }
}

Unsafe#CAS操作

public class UnsafeDemo {
    public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
        theUnsafe.setAccessible(true);
        Unsafe unsafe = (Unsafe) theUnsafe.get(null);
        System.out.println(unsafe);
        // id 的偏移量地址
        long idOffset = unsafe.objectFieldOffset(Student.class.getDeclaredField("id"));
        long nameOffset = unsafe.objectFieldOffset(Student.class.getDeclaredField("name"));
        Student student = new Student();
        unsafe.compareAndSwapInt(student, idOffset, 0, 1);
        unsafe.compareAndSwapObject(student, nameOffset, null, "hello");
        System.out.println(student);
    }
}

@Data
class Student {
    volatile int id;
    volatile String name;
}

输出 Student(id=1, name=hello)

自定义一个 AtomicInteger。

class MyAtomicInteger {
    private volatile int value;
    private static long valueOffset;
    private static Unsafe UNSAFE = null;

    public MyAtomicInteger() {
        try {
            moField theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            UNSAFE = (Unsafe) theUnsafe.get(null);
            valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));

        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public int getValue() {
        return value;
    }

    public void decrement(int amount) {
        while (true) {
            int pre = this.value;
            int next = pre - amount;
            if (UNSAFE.compareAndSwapInt(this, valueOffset, pre, next)) {
                break;
            }
        }
    }
}

小结

  • CAS 与 volatile
  • API
    • 原子整数
    • 原子引用
    • 原子数组
    • 字段更新器
    • 原子累加器
  • Unsafe
  • 原理方面
    • LongAdder 源码
    • 伪共享