标签:目标 declared ref 数据库 原子性 共享资源 private main version
public class Test { //value1:线程不安全 private static int value1 = 0; //value2:使用乐观锁 private static AtomicInteger value2 = new AtomicInteger(0); //value3:使用悲观锁 private static int value3 = 0; private static synchronized void increaseValue3(){ value3++; } public static void main(String[] args) throws Exception { //开启1000个线程,并执行自增操作 for(int i = 0; i < 1000; ++i){ new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } value1++; value2.getAndIncrement(); increaseValue3(); } }).start(); } //打印结果 Thread.sleep(1000); System.out.println("线程不安全:" + value1); System.out.println("乐观锁(AtomicInteger):" + value2); System.out.println("悲观锁(synchronized):" + value3); } }
AtomicInteger是java.util.concurrent.atomic包提供的原子类,利用CPU提供的CAS操作来保证原子性;除了AtomicInteger外,还有AtomicBoolean、AtomicLong、AtomicReference等众多原子类。
public class AtomicInteger extends Number implements java.io.Serializable { //存储整数值,volatile保证可视性 private volatile int value; //Unsafe用于实现对底层资源的访问 private static final Unsafe unsafe = Unsafe.getUnsafe(); //valueOffset是value在内存中的偏移量 private static final long valueOffset; //通过Unsafe获得valueOffset static { try { valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } } }
public final int addAndGet(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta) + delta; } public final int getAndAddInt(Object var1, long var2, int var4){ int var5; do{ var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
内部调用unsafe的getAndAddInt方法,在getAndAddInt方法中主要是看compareAndSwapInt方法:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
在unsafe.cpp找到方法CompareAndSwapInt,可以依次看到变量obj、offset、e和x,其中addr就是当前内存位置指针,最终再调用Atomic类的cmpxchg方法。
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); oop p = JNIHandles::resolve(obj); jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END
static jint cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value);
和volatile类型,CAS也是依赖不同的CPU会有不同的实现,在src/os_cpu目录下可以看到不同的实现,以atomic_linux_x86.inline.hpp为例,是这么实现的:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; }
可以看到底层是通过指令cmpxchgl来实现,如果程序是多核环境下,还会先在cmpxchgl前生成lock指令前缀,反之如果是在单核环境下就不需要生成lock指令前缀。为什么多核要生成lock指令前缀?因为CAS是一个原子操作,原子操作隐射到计算机的实现,多核CPU的时候,如果这个操作给到了多个CPU,就破坏了原子性,所以多核环境肯定得先加一个lock指令,不管这个它是以总线锁还是以缓存锁来实现的,单核就不存在这样的问题了。
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
compareAndSet有四个参数,分别表示:预期引用、更新后的引用、预期标志、更新后的标志。源码部分很好理解预期的引用 == 当前引用,预期的标识 == 当前标识,如果更新后的引用和标志和当前的引用和标志相等则直接返回true,否则通过Pair生成一个新的pair对象与当前pair CAS替换。Pair为AtomicStampedReference的内部类,主要用于记录引用和版本戳信息(标识),定义如下:
private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } private volatile Pair<V> pair;
Pair记录着对象的引用和版本戳,版本戳为int型,保持自增。同时Pair是一个不可变对象,其所有属性全部定义为final,对外提供一个of方法,该方法返回一个新建的Pari对象。pair对象定义为volatile,保证多线程环境下的可见性。在AtomicStampedReference中,大多方法都是通过调用Pair的of方法来产生一个新的Pair对象,然后赋值给变量pair。如set方法:
public void set(V newReference, int newStamp){ Pair<V> current = pair; if (newReference != current.reference || newStamp != current.stamp) this.pair = Pair.of(newReference, newStamp); }
下面我们将通过一个例子可以可以看到AtomicStampedReference和AtomicInteger的区别。我们定义两个线程,线程1负责将100 —> 110 —> 100,线程2执行 100 —>120,看两者之间的区别。
public class Test{ private static AtomicInteger atomicInteger = new AtomicInteger(100); private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1); public static void main(String[] args) throws InterruptedException { //AtomicInteger Thread at1 = new Thread(new Runnable(){ @Override public void run(){ atomicInteger.compareAndSet(100,110); atomicInteger.compareAndSet(110,100); } }); Thread at2 = new Thread(new Runnable(){ @Override public void run(){ try { TimeUnit.SECONDS.sleep(2); // at1,执行完 } catch (InterruptedException e){ e.printStackTrace(); } System.out.println("AtomicInteger:" + atomicInteger.compareAndSet(100,120)); } }); at1.start(); at2.start(); at1.join(); at2.join(); //AtomicStampedReference Thread tsf1 = new Thread(new Runnable(){ @Override public void run(){ try{ //让 tsf2先获取stamp,导致预期时间戳不一致 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e){ e.printStackTrace(); } // 预期引用:100,更新后的引用:110,预期标识getStamp() 更新后的标识getStamp() + 1 atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1); atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1); } }); Thread tsf2 = new Thread(new Runnable(){ @Override public void run(){ int stamp = atomicStampedReference.getStamp(); try{ TimeUnit.SECONDS.sleep(2); //线程tsf1执行完 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1)); } }); tsf1.start(); tsf2.start(); } }
运行结果充分展示了AtomicInteger的ABA问题和AtomicStampedReference解决ABA问题。
@Transactional public void updateCoins(Integer playerId){ //根据player_id查询玩家信息 Player player = query("select coins, level from player where player_id = {0}", playerId); //根据玩家当前信息及其他信息,计算新的金币数 Long newCoins = ……; //更新金币数 update("update player set coins = {0} where player_id = {1}", newCoins, playerId); }
为了避免这个问题,悲观锁通过加锁解决这个问题,代码如下所示。在查询玩家信息时,使用select …… for update进行查询;该查询语句会为该玩家数据加上排它锁,直到事务提交或回滚时才会释放排它锁;在此期间,如果其他线程试图更新该玩家信息或者执行select for update,会被阻塞。
@Transactional public void updateCoins(Integer playerId){ //根据player_id查询玩家信息(加排它锁) Player player = queryForUpdate("select coins, level from player where player_id = {0} for update", playerId); //根据玩家当前信息及其他信息,计算新的金币数 Long newCoins = ……; //更新金币数 update("update player set coins = {0} where player_id = {1}", newCoins, playerId); }
版本号机制则是另一种思路,它为玩家信息增加一个字段:version。在初次查询玩家信息时,同时查询出version信息;在执行update操作时,校验version是否发生了变化,如果version变化,则不进行更新。
@Transactional public void updateCoins(Integer playerId){ //根据player_id查询玩家信息,包含version信息 Player player = query("select coins, level, version from player where player_id = {0}", playerId); //根据玩家当前信息及其他信息,计算新的金币数 Long newCoins = ……; //更新金币数,条件中增加对version的校验 update("update player set coins = {0}, version = version + 1 where player_id = {1} and version = {2}", newCoins, playerId, player.version);
标签:目标 declared ref 数据库 原子性 共享资源 private main version
原文地址:https://www.cnblogs.com/liujiarui/p/13174088.html