CAS operation and use of atomic classes
1. CAS operation
In Java, locking can solve some concurrency problems, but a bad problem with locking is that when a thread does not obtain the lock, it will be blocked and suspended, which will lead to the switching of thread context and rescheduling overhead. Java provides a non blocking volatile keyword to solve the visibility problem of shared variables, which makes up for the overhead problem caused by locks to a certain extent, but volatile can not guarantee the atomicity problem.
The CAS concurrency primitive embodied in the Java language is sun misc. Each method of the UnSafe class. Call the CAS method in the UnSafe class, and the JVM will help us realize the CAS assembly instruction. This is a function that completely depends on the hardware. It realizes atomic operation through it. Again, CAS is a system primitive, which belongs to the category of operating system. It is a process composed of several instructions to complete a function, and the execution of the primitive must be continuous, and it is not allowed to be interrupted in the execution process, In other words, CAS is an atomic instruction of CPU, which will not cause the so-called data inconsistency problem, that is to say, CAS is thread safe.
2. Atomic operation and principle analysis
The JUC package provides a series of atomic operation classes. These classes are implemented using the non blocking algorithm CAS, which will greatly improve the performance compared with using locks to implement atomic operations. JUC concurrent package contains a series of atomic operation classes such as AtomicInteger, AtomicLong, AtomicBoolean and AtomicReference.
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates // Gets an instance of the unsafe class private static final Unsafe unsafe = Unsafe.getUnsafe(); // Offset to store value private static final long valueOffset; static { try { // Gets the offset of value in AtomicInteger valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } // The actual variable value, declared volatile, is mainly used to ensure memory visibility under multithreading private volatile int value; /** * Creates a new AtomicInteger with the given initial value. * * @param initialValue the initial value */ public AtomicInteger(int initialValue) { value = initialValue; } /** * Creates a new AtomicInteger with initial value {@code 0}. */ public AtomicInteger() { } .... }
Important methods in Unsafe class
The Unsafe class in the rt.jar package of JDK provides atomic operations at the hardware level. The methods in the Unsafe class are all native methods. They use JNI to access the local C + + implementation library.
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); public native Object getObjectVolatile(Object var1, long var2); public native void putObjectVolatile(Object var1, long var2, Object var4); public native int getIntVolatile(Object var1, long var2); public native void putIntVolatile(Object var1, long var2, int var4); public native boolean getBooleanVolatile(Object var1, long var2); public native void putBooleanVolatile(Object var1, long var2, boolean var4); public native byte getByteVolatile(Object var1, long var2); public native void putByteVolatile(Object var1, long var2, byte var4); public native short getShortVolatile(Object var1, long var2); public native void putShortVolatile(Object var1, long var2, short var4); public native char getCharVolatile(Object var1, long var2); public native void putCharVolatile(Object var1, long var2, char var4); public native long getLongVolatile(Object var1, long var2); public native void putLongVolatile(Object var1, long var2, long var4); public native float getFloatVolatile(Object var1, long var2); public native void putFloatVolatile(Object var1, long var2, float var4); public native double getDoubleVolatile(Object var1, long var2); public native void putDoubleVolatile(Object var1, long var2, double var4); public native void putOrderedObject(Object var1, long var2, Object var4); public native void putOrderedInt(Object var1, long var2, int var4); public native void putOrderedLong(Object var1, long var2, long var4); public native void unpark(Object var1); public native void park(boolean var1, long var2);
AtomicInteger increment and decrement operation codes
Call the unsafe method, set the atomicity value to the original value + 1, and the return value is the result of increment
public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; }
Call the unsafe method, set the atomicity value to the original value - 1, and the return value is the result of decrement
public final int decrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, -1) - 1; }
Call the unsafe method, set the atomicity value to the original value + 1, and the return value to the original value
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
Call the unsafe method, set the atomicity value to the original value - 1, and the return value to the original value
public final int getAndDecrement() { return unsafe.getAndAddInt(this, valueOffset, -1); }
The above four codes are implemented by calling getAndAddInt method of unsafe. This function is an atomic operation. The first parameter is the reference of AtomicInteger instance, the second parameter is the offset value of value variable in AtomicInteger, and the third parameter is the value of the second variable to be set.
Including unsafe The code of getandaddint is:
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
var5: the value we copy from the main memory to the working memory (each time we get the latest value from the main memory to our local memory, and then execute compareAndSwapInt() to compare it with the value of the main memory. Because threads cannot directly cross the cache and directly operate the main memory, it is necessary to compare the above methods before performing the plus 1 operation)
Assuming that the execution of compareAndSwapInt returns false, the while method is executed until the expected value is the same as the real value
-
val1: AtomicInteger object itself
-
var2: the address where the object is worth referencing
-
var4: value to be changed
-
var5: the real value in memory found with var1 and var2
- Compare the current value of this object with var5
- If the same, update var5 + var4 and return true
- If different, continue to take values and then compare them until the update is completed
Instead of synchronized, CAS is used to improve concurrency and achieve consistency. The reason is that after each thread comes in, it enters the do while loop, and then continuously obtains the value in memory to judge whether it is the latest, and then performs the update operation.
compareAndSet method in AtomicInteger
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
From this code, we can know that its internal call is unsafe Compareandswapint method: if the value in the atomic variable is equal to expect, update the value with the update value and return true; otherwise, return false.
import java.util.concurrent.atomic.AtomicInteger; /** * @author Wenbo * @version 1.0 * @program * @description * @date 2022/5/25 10:18 */ public class AtomicTest { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(10); System.out.println(atomicInteger.compareAndSet(10,2019) + "\t Current value" + atomicInteger.get()); System.out.println(atomicInteger.compareAndSet(10,1024) + "\t Current value" + atomicInteger.get()); } }
This is because when we execute the first one, the expected value and the original value are met, so the modification is successful, but after the second time, the value of the main memory has been modified to 2019, which does not meet the expected value, so false is returned, and this write fails.
CAS disadvantages
CAS is unlocked and can be used once, but it needs to be compared many times
- The cycle time is long and the cost is high (because the execution is do while. If it is not successful, it will cycle all the time. The worst case is that the value obtained by a thread is different from the expected value, so it will cycle indefinitely)
- Only atomic operations of one shared variable can be guaranteed
- When operating on a shared variable, we can ensure atomic operation by looping CAS
- However, when multiple shared variables are operated, cyclic CAS cannot guarantee the atomicity of the operation. At this time, locks can be used to ensure the atomicity, or multiple variables can be packaged into a class and operated by AtomicReference.
- ABA problem
What is the ABA problem
The classic problem of CAS operation is that if thread 1 uses CAS to modify the initial value A to variable x, thread 1 will first obtain the value of current variable x (A), and then use CAS operation to try to modify the value of X to B. If CAS operation is successful, the program is not necessarily correct, because it is possible that thread 1 uses CAS to modify the value of variable x to B before executing CAS, Then use CAS to change the value of variable x to A. Therefore, although the value of X is A when thread 1 executes CAS, this A is not obtained by thread 1, which is the ABA problem.
The ABA problem arises because the state value of the variable produces A circular transformation, that is, the value of the variable can be from A to B, and then from B to A. If the value of A variable can only be converted in one direction, such as A - > b - > C, and does not form A ring, there will be no problem.
ABA problem based on atomic reference
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** * @author Wenbo * @version 1.0 * @program * @description * @date 2022/5/25 10:56 */ public class ABATest { /** * Ordinary atomic reference wrapper class */ static AtomicReference<Integer> atomicReference = new AtomicReference<>(100); public static void main(String[] args) { new Thread(() -> { // Change 100 to 101, and then change it to 100, that is, ABA atomicReference.compareAndSet(100, 101); atomicReference.compareAndSet(101, 100); }, "t1").start(); new Thread(() -> { try { // Sleep for one second to ensure t1 thread and complete ABA operation TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // Change 100 to 101, and then change it to 100, that is, ABA System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get()); }, "t2").start(); } }
Two threads are created, and then thread t1 performs ABA operations in turn. Thread t2 modifies the value of main memory after 1 second. As a result, it can be seen that it can be modified successfully. This is the ABA problem.
AtomicStampedReference solves ABA problems
AtomicStampedReference class assigns a time stamp to the status value of each variable. Each update needs to compare the expected value with the current value, as well as the expected version number and the current version number, so as to avoid ABA problems. The idea of optimistic lock in database.
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicStampedReference; /** * @author Wenbo * @version 1.0 * @program * @description * @date 2022/5/25 10:56 */ public class ABATest { /** * Ordinary atomic reference wrapper class */ static AtomicReference<Integer> atomicReference = new AtomicReference<>(100); // Pass two values, one is the initial value and the other is the initial version number static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1); public static void main(String[] args) throws InterruptedException { new Thread(() -> { // Get version number int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + "\t First version number" + stamp); // Pause t3 for one second try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // Enter 4 values: expected value, updated value, expected version number and updated version number atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp()+1); System.out.println(Thread.currentThread().getName() + "\t Second version number" + atomicStampedReference.getStamp()); atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp()+1); System.out.println(Thread.currentThread().getName() + "\t Third version number" + atomicStampedReference.getStamp()); }, "t3").start(); new Thread(() -> { // Get version number int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + "\t First version number" + stamp); // Pause T4 for 3 seconds to ensure that the t3 thread also has an ABA problem try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp+1); System.out.println(Thread.currentThread().getName() + "\t Modified successfully:" + result + "\t Current latest actual version number:" + atomicStampedReference.getStamp()); System.out.println(Thread.currentThread().getName() + "\t Current actual latest value" + atomicStampedReference.getReference()); }, "t4").start(); } }