什么是可见性?为什么会出现”不可见“
我们已经知道
counter.increment();
编译成字节码为
getfield #2 iconst_1iaddputfield #2
上一篇已经说过,这里的字节码的执行过程是在工作内存中,但是getField和putField这二条指令其实是跟主内存有交互的,这里还是以Counter类的increment方法为例。
getField指令会从主存中读取count的值,但是并不是每次都从主存中读,因为CPU高速cache的存在,我们count值有可能会从cache中读,导致读的并不是最新的
putField指令会将count新的值写入主内存,但是也不是立即生效,别的CPU的高速cache中的count不会立即更新,CPU会使用缓存一致性协议来做同步,这个对我们是透明的。
正是因为CPU高速cache的存在,在多核环境中会有可见性的问题。这里额外提一句 ,之所以有高速cache存在,是为提高运行效率,现代CPU的速度比我们的内存快很多,如果每次都锁总线写主存,会导致执行速度下降很多,这是不可以接受的,木桶理论我们都能理解。这里我也画了一张图,来帮助大家理解。
那有没有办法解决可见性带来的问题呢?当然是有的,对于Java,我们可以使用volatile关键字。
volatile
volatile修饰的变量有下面的特性
在写volatile的时候,有monitor release的语义,会刷新各个cpu中该变量的cache,存入最新的值
在读volatile的时候,有monitor acquire的语义,会使当前cpu的cache中该变量的cache失效,从主存中读取最新的值
volatile拥有禁止指令重排序的语义
其中monitor可以理解为锁,moniter release就是释放锁,monitor acquire就是获取锁,这样就是volatile变量的读写都是直接对主存操作的,相当于牺牲一部分性能来换取可见性,这一部分牺牲的性能一般是可以忽略不计的,只需要知道有这么回事就行。
volatile实现原理
给count加上volatile修饰符后,查看编译后的字节码后会发现,字节码层面唯一的变化是给count添加了ACC_VOLATILE标识flag,在运行时会根据这个flag会自动插入内存屏障,保证volatile可见性语义,内存屏障一共有四种,分别是:
LoadLoad
LoadStore
StoreStore
StoreLoad
这里有个文档,比较权威详细的说明了内存屏障的知识,这一块知识大家可以自己继续深入。这里给出文档中的一个实例,比较形象的说明了内存屏障是怎么插入的。
编辑
添加图片注释,不超过 140 字(可选)
再回到上面的例子,我们给count添加上volatile修饰符之后,是不是就能在多线程中得到正确的累加结果呢?我们试验一下,简单起见,我们只开2个线程,每个线程分配一半的计算量。
// Counter.javaprivate volatile int count;// main 方法 Counter counter = new Counter(); int loopCount = 100000000; int halfCount = loopCount / 2; Thread thread1 = new Thread(() -> { for (int i = 0; i < halfCount; i++) { counter.increment(); } }); Thread thread2 = new Thread(() -> { for (int i = halfCount; i < loopCount; i++) { counter.increment(); } }); thread1.start(); thread2.start(); try { thread1.join(); thread2.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("count:" + counter.getCount()); System.out.println("take time:" + (System.currentTimeMillis() - startTime) + "ms");// 运行结果count:51743664take time:2335ms
结果显然还是不对的,而且程序运行的时间长了好几倍了。这是因为volatile只保证了可见性,却没有原子性语义,比如下面这种情况
在T1-T6时间内,初始count=0,经过二次++操作,最后count的值还是1,在我们上面的例子中,5千万次的循环会出现大量类似的错误覆盖写入。根据我们上面分析的volatile的语义,在T5时刻,Thread1对count的修改对Thread2是可见的,这里的可见指的是,如果此时调用getfield指令,拿到的值会是Thread1修改的最新的1,但是遗憾的是,Thread2对此一无所知,只是按着自己的步骤将错误的1写入了count中。
那我们不妨设想下,如果在putfield之前,检查下当前栈中存储的count是不是最新的,如果不是最新的重新读取count,然后重试,如果是最新的,直接写入更新值,似乎这样就能解决我们上面出现的错误写入的问题。看起来似乎是一个不错的想法,但是一定要注意,整个检查过程要保证原子性,否则仍然会有并发问题。事实上JDK中Unsafe包里面的CAS方法就是这个思路,不断循环尝试,这个过程就是自旋,它的底层实现依赖cmpxchgl 和 cmpxchgq这二个汇编指令,不同平台的cpu有不同的实现,但是代码大同小异,我在这里以opekjdk8为例扒一扒CAS的源码,源码比较多我只会贴出关键代码块。
// Unsafe.class中的三个CAS方法,都是native的public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
它对应的native实现在hotspot/src/share/vm/prims/unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); oop p = JNIHandles::resolve(obj); jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x, addr, e)) == e;UNSAFE_END
篇幅关系,这里只贴上compareAndSwapInt的实现,可以看到又调用了Atomic::cmpxchg方法,继续跟进去
unsigned Atomic::cmpxchg(unsigned int exchange_value, volatile unsigned int* dest, unsigned int compare_value) { assert(sizeof(unsigned int) == sizeof(jint), "more work to do"); return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest, (jint)compare_value);}jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) { assert(sizeof(jbyte) == 1, "assumption."); uintptr_t dest_addr = (uintptr_t)dest; uintptr_t offset = dest_addr % sizeof(jint); volatile jint* dest_int = (volatile jint*)(dest_addr - offset); jint cur = *dest_int; jbyte* cur_as_bytes = (jbyte*)(&cur); jint new_val = cur; jbyte* new_val_as_bytes = (jbyte*)(&new_val); new_val_as_bytes[offset] = exchange_value; while (cur_as_bytes[offset] == compare_value) { jint res = cmpxchg(new_val, dest_int, cur); if (res == cur) break; cur = res; new_val = cur; new_val_as_bytes[offset] = exchange_value; } return cur_as_bytes[offset];}
我们跟踪到了调用了cmpxchg这个方法,这个方法不是在atomic.cpp中定义的,查看atomic.hpp,看到了cmpxchg对应的内联函数的定义
inline static jint cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value);// See comment above about using jlong atomics on 32-bit platformsinline static jlong cmpxchg (jlong exchange_value, volatile jlong* dest, jlong compare_value);
这里我们以solaris_x86平台为例,cmpxchg对应的内涵函数定义在hotspot/src/os_cpu/solaris_x86/vm/atomic_solaris_x86.inline.hpp
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "inline jint _Atomic_cmpxchg(jint exchange_value, volatile jint* dest, jint compare_value, int mp) { __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; }
这个是内嵌汇编代码,实话说,汇编这一块的知识我也还给老师了。根据LOCK_IF_MP这个宏定义判断是不是多核心,如果是多核心需要加锁,但是这个锁是cpu总线锁,它的代价比我们应用层中用的Lock代价小得多。同时我们看到cmpxchgl这个关键的指令。追到这一层,我想对于应用开发工程师已经足够了。了解了底层实现,我们来现学现卖实战一波。
使用CAS改造我们的加法器Counter,使其是线程安全的
要使用CAS,肯定要使用Unsafe类,我们还是通过反射来获取Unsafe对象,先看UnsafeUtil类的实现
// UnsafeUtil.java public static Unsafe getUnsafeObject() { Class clazz = AtomicInteger.class; try { Field uFiled = clazz.getDeclaredField("unsafe"); uFiled.setAccessible(true); return (Unsafe) uFiled.get(null); } catch (Exception e) { e.printStackTrace(); } return null; } public static long getVariableOffset(Object target, String variableName) { Object unsafeObject = getUnsafeObject(); if (unsafeObject != null) { try { Method method = unsafeObject.getClass().getDeclaredMethod("objectFieldOffset", Field.class); method.setAccessible(true); Field targetFiled = target.getClass().getDeclaredField(variableName); return (long) method.invoke(unsafeObject, targetFiled); } catch (Exception e) { e.printStackTrace(); } } return -1; }
再来看Counter类的实现
public class Counter { private volatile int count; private Unsafe mUnsafe; private long countOffset; public Counter() { mUnsafe = UnsafeUtil.getUnsafeObject(); countOffset = UnsafeUtil.getVariableOffset(this, "count"); } public void increment() { int cur = getCount(); while (!mUnsafe.compareAndSwapInt(this, countOffset, cur, cur+1)) { cur = getCount(); } } public int getCount() { return this.count; }}
再次开启二个线程,执行我们的累加程序
getfield #2 iconst_1iaddputfield #20
可以看到我们得到正确的累加结果,但是运行时长更长了,但是还好,时间复杂度还是在一个数量级上的。这里要注意一点的是,上述示例代码中,我给count变量增加了volatile关键字,其实就算不加volatile关键字,在这里CAS也是能够正确工作的,但是效率会低一点,我测试下来差不多性能会低5%左右,大家可以思考下为什么不加volatile效率会低?
volatile关键字还有一个禁止指令重排序的语义,一个经典的应用就是DCL单例模式。
到这里,关于可见性我们已经讨论的差不多了,下一篇我们来讨论”原子性“
Java学习视频
Java基础:
Java300集,Java必备优质视频_手把手图解学习Java,让学习成为一种享受
Java项目:
【Java游戏项目】1小时教你用Java语言做经典扫雷游戏_手把手教你开发游戏
【Java毕业设计】OA办公系统项目实战_OA员工管理系统项目_java开发
原文:https://juejin.cn/post/7106441447741587469