这篇文章主要介绍了Java并发编程之原子性-Atomic的使用方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
线程安全
当多个线程访问一个类时,该类被认为是线程安全的,无论运行时环境采用的调度方法或这些进程将如何交替执行,并且该类不需要主代码中任何额外的同步或协调。
线程安全主要体现在以下三个方面原子性:提供了互斥访问,同一时刻只能有一个线程对它进行操作可见性:一个线程对主内存的修改可以及时的被其他线程观察到有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序
JUC中的Atomic包详解
原子包中提供了许多Atomicxxx类:
都是ca(compareAndSwap)实现原子性。
写一个简单的例子如下:
@Slf4jpublic class AtomicExample1 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i lt; clientTotal ; i++) { executorService.execute(() -gt; { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); }}
你每次都可以发出我们想要的5000的预期结果。这表明计数方法是线程安全的。
我们来看看count.incrementAndGet()方法。它的第一个参数是对象本身,第二个参数是valueOffset,用来记录值本身在内存中的编译地址。该记录主要用于在更新操作时查找值在内存中的位置,以便于比较。第三个参数是常数1。
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; ... 此处省略多个方法... /** * Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; }}
AtomicInteger在其源代码中使用了一个不安全的类,该类提供了一个getAndAddInt方法。让我们继续检查它的源代码:
public final class Unsafe { private static final Unsafe theUnsafe; ....此处省略很多方法及成员变量.... public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public native int getIntVolatile(Object var1, long var2);}
您可以看到这里使用了一个do while语句来实现主题。在while语句中,其核心是调用compareAndSwapInt()的一个方法。它是一个原生方法,是一个底层方法,并且不是用Java实现的。
假设我们要执行0+1=0的运算。下面是单线程情况下每个参数的值:
更新后:
compareAndSwapInt()方法的第一个参数(var1)是当前对象,即代码示例中的计数。此时,其值为0(预期值)。第二个值(var2)是传递的valueOffset值,其值为12。第三个参数(var4)是常数1。方法中的变量参数(var5)是根据参数一和参数二valueOffset调用底层getIntVolatile方法得到的值,此时其值为0。CompareAndSwapInt()希望达到对象计数的目的。如果当前期望值var1中的值与底层(var5)返回的值相同,则将其更新为值var5+var4。否则,预期值(var5)将被循环,直到当前值与预期值相同。compareAndSwap方法的核心就是我们通常所说的CAS。
原子包下其他类的实现原理,比如AtomicLong,基本和上面一样。
这里,我们来介绍一下类LongAdder。通过上面的分析,我们已经知道AtomicLong使用CAS:它在无限循环中不断尝试修改目标值,直到修改成功。如果竞争不激烈,成功的概率很大。另一方面,如果在竞争激烈的情况下,修改失败的概率很高,就会进行反复的循环尝试,所以性能会受到影响。
对于常见类型的long和double变量,jvm允许将64位读或写操作分成两个32位操作。LongAdder的核心思想是分离热数据。它可以将AtomicLong的内部核心数据值分离到一个数组中。当每个线程访问它时,它通过哈希算法将其映射到其中一个数字进行计数。最终的计数结果就是这个数组的求和累加,其中热数据值会被分割成多个单元格,每个单元格维护自己的内部值,当前对象的实际值由所有单元格累加合成。这样就有效地分离了热点,提高了并行性。LongAdder相当于在AtomicLong的基础上把单点更新压力分散到各个节点,低并发时直接更新base可以保证Atomic的性能基本一致。在高并发时,通过去中心化来提高性能。但是,如果统计数据中有并发更新,可能会导致统计数据中的错误。
当并发计数实际上很高时,可以首先使用LongAdder。当并行度较低或需要精确值时,可以首先使用AtomicLong,这样效率更高。
下面是原子包下原子引用的简单用法的简单演示:
@Slf4jpublic class AtomicExample4 { private static AtomicReferencelt;Integergt; count = new AtomicReferencelt;gt;(0); public static void main(String[] args) { count.compareAndSet(0, 2); count.compareAndSet(0, 1); log.info("count:{}", count.get()); }}
CompareAndSet()分别传入预期值和更新值。只有当期望值等于当前值时,才会将值更新为更新值;
上面的第一个方法可以将值更新为2,但是第二个步骤不能将值更新为1。
下面简单介绍一下AtomicIntegerFieldUpdater的用法(使用原子性更新类的实例):
@Slf4jpublic class AtomicExample5 { private static AtomicIntegerFieldUpdaterlt;AtomicExample5gt; updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count"); @Getter private volatile int count = 100; public static void main(String[] args) { AtomicExample5 example5 = new AtomicExample5(); if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 1, {}", example5.getCount()); } if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 2, {}", example5.getCount()); } else { log.info("update failed, {}", example5.getCount()); } }}
它可以更新类中指定成员变量的值。
注意:修改后的成员变量需要用volatile关键字修饰,不能是static描述的字段。
这个类AtomicStampReference的核心是解决CAS的ABA问题(CAS运行时,其他线程将变量A的值改为B,再改回A,当线程将期望值A与当前变量进行比较时,发现变量A没有变化,于是CAS交换A的值。
实际上这个值已经被其他线程改变了)。
ABA问题的解决方法是,变量每改变一次,版本号就加一。
看看它的核心方法之一compareAndSet():
public class AtomicStampedReferencelt;Vgt; { private static class Pairlt;Tgt; { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static lt;Tgt; Pairlt;Tgt; of(T reference, int stamp) { return new Pairlt;Tgt;(reference, stamp); } } ... 此处省略多个方法 .... public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pairlt;Vgt; current = pair; return expectedReference == current.reference amp;amp; expectedStamp == current.stamp amp;amp; ((newReference == current.reference amp;amp; newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }}
你可以看到它有一个stamp的比较,stamp的值在每次更新的时候都会被维护。
下面介绍AtomicLongArray,它维护一个数组。在这个数组下,我们可以通过原子操作有选择地更新一个索引的对应值。
public class AtomicLongArray implements java.io.Serializable { private static final long serialVersionUID = -2308431214976778248L; private static final Unsafe unsafe = Unsafe.getUnsafe(); ...此处省略.... /** * Atomically sets the element at position {@code i} to the given value * and returns the old value. * * @param i the index * @param newValue the new value * @return the previous value */ public final long getAndSet(int i, long newValue) { return unsafe.getAndSetLong(array, checkedByteOffset(i), newValue); } /** * Atomically sets the element at position {@code i} to the given * updated value if the current value {@code ==} the expected value. * * @param i the index * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(int i, long expect, long update) { return compareAndSetRaw(checkedByteOffset(i), expect, update); }}
最后,编写一个简单的AtomcBoolean用法:
@Slf4jpublic class AtomicExample6 { private static AtomicBoolean isHappened = new AtomicBoolean(false); // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i lt; clientTotal ; i++) { executorService.execute(() -gt; { try { semaphore.acquire(); test(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("isHappened:{}", isHappened.get()); } private static void test() { if (isHappened.compareAndSet(false, true)) { log.info("execute"); } }}
总结
以上是原子包的基本原理和主要使用方法。它使用CAS来保证原子操作,从而达到线程安全的目的。
精彩评论