You are the most shining light in the crowd
preface
AQS, the full English name of AbstractQueuedSynchronizer, is directly translated into abstract queue synchronizer. It's jdk1 5. A tool class for solving concurrency problems is created by the famous Doug Lea. Different from the synchronized keyword, AQS solves concurrency problems through code.
Review concurrency issues
Concurrency refers to the security of shared resources in a multithreaded environment.
The current bank account can be operated through bank card and mobile banking. What will happen if we go to the bank with bank card and passbook at the same time?
package demo.pattren.aqs; public class Money { /** * Suppose the current account has 1000 yuan */ private int money = 1000; /** * Withdraw money */ public void drawMoney(){ this.money--; } public static void main(String[] args) throws InterruptedException { Money money = new Money(); for(int i=0; i<1000; i++){ new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money.drawMoney(); },i + "").start(); } Thread.sleep(2000); System.out.println("Current account balance:" + money.money); } }
Do you think you can go to the bank to do A wave of things right away? Ha ha, you think too much. If you could do this, the bank would have gone bankrupt. Let's mainly analyze the reason for this problem. The JVM memory is A JMM structure. The data operated by each thread is copied and backed up from the main memory, while multiple threads will have multiple backups. When the backup data in the thread is modified, the value will be updated to the main memory. For example, multiple threads obtain the account balance of 500 yuan at the same time, thread A saves 100, and thread A refreshes 600 to the main memory, $\ color{red} {main memory does not actively notify other threads that the value has been modified} $, so the value of main memory is different from that of other threads. If other threads operate the account balance on the basis of 500, this is obviously not the result we want.
Solving concurrency problems
JDK provides a variety of ways to solve multithreading safety.
volatile keyword
Volatile is A keyword provided by JDK and is used to modify variables. The variables modified by volatile can ensure the visibility of multiple threads. In the above case, A modifies the balance of the account, and then refreshes the latest value to the main memory. At this time, the main memory will synchronize the latest value to other threads.
volatile solves the problem of data reading consistency under multithreading, $\ color{red} {ensures visibility, but it does not guarantee the atomicity of write operations,
When multiple threads write at the same time, that is, multiple threads flush the latest value in the thread to the main memory at the same time, there will be a problem.
Modifying the money variable with volatile keyword can not solve the thread safety problem.
Atomic operation class
Atomic operation class is a series of tool classes provided by JDK to ensure atomic operation. Atomic class can ensure the safe operation of its value in multi-threaded environment.
package demo.pattren.aqs; import java.util.concurrent.atomic.AtomicInteger; public class AtomicMoney { /** * Suppose the current account has 1000 yuan */ private AtomicInteger money = new AtomicInteger(1000); /** * Withdraw money */ public void drawMoney(){ //Subtraction operation of AtomicInteger this.money.getAndDecrement(); } public static void main(String[] args) throws InterruptedException { AtomicMoney money = new AtomicMoney(); for(int i=0; i<1000; i++){ new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money.drawMoney(); },i + "").start(); } Thread.sleep(2000); System.out.println("Current account balance:" + money.money); } }
The results of many tests are 0, which is consistent with the expectation. Atomic operation class uses CAS(Compare and swap) mechanism to ensure the atomicity of operation. Compared with lock, its concurrency is higher.
synchronized keyword
The synchronized keyword is used at the jvm level to ensure thread safety. It ensures thread safety by adding monitorenter and monitorexit commands before and after the code block, and in jdk1 6. The synchronized keyword has been greatly optimized, and the performance has been greatly improved. It is certain that thread safety can be guaranteed through synchronized, so it is also a good choice to use synchronized. Of course, the irreversible upgrade of synchronized lock can not guarantee the performance at high concurrency.
Lock lock
Finally ushered in the protagonist of this article. In fact, the previous content is not directly related to the theme AQS of the article, so I'll simply mention it. Many of the above are JVM level to ensure thread safety, while AQS handles thread safety entirely through the code level.
(PS: the title of the section is clearly Lock lock. How to write AQS? It deceives me into reading less)
The blogger is afraid of being beaten and is trying his best to explain ~. Class diagram first!
As shown in the figure above, the left side is the abstract queue synchronizer, while the right side is the functions realized by using the queue synchronizer - lock, semaphore, starting gun, etc.
We can not look at the source code first. Let's think for ourselves. What problems should we consider if we want to implement it in pure code?
- Thread mutual exclusion: you can use state to judge. If state=0, you can obtain the lock. If state > 0, you cannot obtain the lock.
- Queuing: threads that cannot acquire locks should be stored and can continue to acquire locks after the locks are released.
- Thread wakeup: when the lock is released, the thread in the waiting state should be waked up.
- Lock re-entry: how to solve the problem of multiple locks for the same entry (if it is not solved, the deadlock will be shown to you in minutes).
For 1 and 2 points, the difficulty should be small, but how to design 3 and 4 points? We rehearse the operation flow through pseudo code.
On the business side, it operates like this.
Lock {Code that needs to be locked} Lock release
Lock and release logic
if(state == 0) Get lock set(state == 1) else Keep waiting while(true){ if(state == 0) Try to acquire the lock again }
After this design, the whole operation flow becomes serial operation again.
This is the same as when we go to the canteen to line up for dinner. The canteen cannot open a window for each student, so multiple students will compete for a limited window. If there is no certain control, the canteen is chaotic every time we eat. A group of students go to eat around the window at the same time. Think about how terrible it is. As a result, there is a queuing mechanism. There can only be one person at a time in a window. At present, after a person leaves the window, the students in line behind can go to play.
Source code interpretation
Now let's go deep into the JDK source code and appreciate the master code design.
Business call code:
package demo.aqs; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockMoney { Lock lock = new ReentrantLock(); /** * Suppose the current account has 1000 yuan */ private int money = 1000; //private int money = 1000; /** * Withdraw money */ public void drawMoney(){ lock.lock(); this.money--; lock.unlock(); } public static void main(String[] args) throws InterruptedException { LockMoney money = new LockMoney(); for(int i=0; i<1000; i++){ new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } money.drawMoney(); },i + "").start(); } Thread.sleep(2000); System.out.println("Current account balance:" + money.money); } }
Tracking Lock method:
Looking directly at the source code, I get carsick in a moment. I try to draw the call link of the lock method. Then combined with the source code explanation.
We follow the arrow to go through the source code, and we can more or less understand the implementation mechanism of AQS.
NonfairSync.lock
final void lock() { //CAS tries to update the state from 0 to 1. If the update is successful, execute if the following code. if (compareAndSetState(0, 1)) //The lock is obtained successfully, and the execution thread executes setExclusiveOwnerThread(Thread.currentThread()); else //Failed to acquire lock. Thread is queued acquire(1); }
When you see this code, do you instantly understand the two problems mentioned above. Firstly, compareAndSetState method uses Unsafe to directly operate memory and optimistic lock, which can ensure that there is and only one thread can operate successfully, which is multi-threaded safe. That is, set the state to 1. Successful threads can preempt the lock (threads are mutually exclusive), while unsuccessful threads will queue up (wait in line). This feeling is much clearer. Let's move on.
AbstractQueuedSynchronizor.acquire
public final void acquire(int arg) { //If tryAcquire fails and acquirequeueueueueued succeeds, call selfInterrupt if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //When the thread fails to acquire the lock and the thread blocking fails, the thread will be interrupted selfInterrupt(); }
The tryAcquire method of abstractqueuedsynchronizer finally calls nonfairTryAcquire of Sync
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //Gets the status value of the current lock int c = getState(); // state = 0 indicates that the current lock is idle. In fact, this code is the same as the previous lock method if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //If it is not equal to 0, judge whether the current thread is the thread holding the lock. If so, execute the code. Here, the problem of reentry lock is solved else if (current == getExclusiveOwnerThread()) { //Current status value + 1 (see the previous parameters) int nextc = c + acquires; // This is because the maximum value of int is exceeded if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //Update the value of state setState(nextc); return true; } return false; }
By reading the source code, you can find that the tryAcquire method returns true when the current thread obtains the lock successfully or re enters the lock, otherwise it returns false. At the same time, this method solves the problem of lock reentry in point 4 mentioned above. The truth is getting closer and closer.
To understand the code of the addWaiter method, first look at the node object used in the method. Node object is the encapsulation of Thread object to make it have the function of Thread. At the same time, it also has prev, next and other attributes. It is clear that node is an object with a linked list structure
//Previous node volatile Node prev; //Next node volatile Node next;
At the same time, abstractqueuedsynchronizer contains head and tail attributes
//Node is the head node of the linked list private transient volatile Node head; //Tail Node of Node linked list private transient volatile Node tail;
private Node addWaiter(Node mode) { //Wrap the current thread as a Node object Node node = new Node(Thread.currentThread(), mode); //Get the tail node. When this code is run for the first time, there is no tail node //So if the positive value is null, the following enq method will be executed Node pred = tail; //When you run the code again, the tail node is no longer null(enq method initializes the tail node. You can look down at the source code of enq method first) if (pred != null) { //The leading node of the current node points to the previous trailing node node.prev = pred; //CAS attempts to set the tail node from pred to node if (compareAndSetTail(pred, node)) { //If the setting is successful, the next node of pred will execute node pred.next = node; return node; } } enq(node); return node; }
The above explanation sounds a little confusing.
Don't worry, let's look at the enq method first
private Node enq(final Node node) { //Dead cycle for (;;) { //Get tail node Node t = tail; //If the tail Node is empty, initialize the tail Node and the head Node as the same newly created Node object if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //Set the current node as the tail node, and point the next of the previous tail node to the current node node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; //Exit loop return t; } } } }
What did enq do specifically:
- For the first cycle, initialize the head node and tail node new Node()
- In the second cycle, set the Node object encapsulated by the current thread as the tail Node, and point the next of the previous tail Node to this Node
It requires some time + space imagination, but if you are familiar with the linked list structure, it is not too difficult to understand here.
Let's think about the execution process dynamically:
- The first thread enters the lock method. At this time, the lock can be obtained and executed directly. It will not enter the addWaiter method
- The second thread enters the lock method. We assume that the first thread has not released the lock. At this time, it enters the enq method and enq initializes the linked list.
- The third thread and more threads enter the lock method. At this time, the enq method is no longer executed, but linked to the linked list after initialization.
acquireQueued
final boolean acquireQueued(final Node node, int arg) { //local variable boolean failed = true; try { //local variable boolean interrupted = false; //Dead cycle for (;;) { //Get front node final Node p = node.predecessor(); //If the front node is head and the attempt to obtain the lock is successful, it will not be blocked if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //Blocking operation to judge whether it should be blocked and whether the blocking is successful if ( //Block after lock preemption failure shouldParkAfterFailedAcquire(p, node) && //The Unsafe operation blocks the thread parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire analysis
//Node pred front node, Node node current node private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //Get the waiting state of the front node int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * Wake up SIGNAL, that is, if the front node is normal, set waitStatus to SIGNAL, indicating that the front node can wake up the current node, and then * the current node will be blocked at ease (if the front node is not normal, it may cause itself not to be awakened, then * can't sleep at ease) */ return true; if (ws > 0) { /* * Find the Node with waitstatus < = 0 in the front Node and set it as the front Node of the current Node * This status indicates that the node is not in the normal state, so delete it from the linked list until the node in the normal state is found */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * When waitStatus = 0 or PROPAGATE(-3), CAS is set to SIGNAL(-1) * This status indicates that the thread is normal, but wake-up is not set. Generally, it is the previous node of tail, so it needs to be set to wake-up * status (SIGNAL) */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
The diagram is as follows.
So far, we have learned about the stored procedure of AQS for threads that need to wait.
The unlocking of AQS and the follow-up of fair lock, unfair lock, shared lock and exclusive lock.
reference material:
https://www.cnblogs.com/water...
https://www.jianshu.com/p/d61...