[Multi-threading] LockSupport usage principle source code analysis

1 Overview

The first half is fully reproduced: https://segmentfault.com/a/1190000014436679

1. Introduction to LockSupport class

The LockSupport class can block the current thread and wake up the specified blocked thread. The operations of blocking and waking up threads are mainly implemented through the park() and unpark(thread) methods.

Each thread has a permit (permit), permit has only two values ​​1 and 0, the default is 0.

  1. When the unpark(thread) method is called, the permission of the thread thread will be set to 1 (note that if the unpark method is called multiple times, it will not accumulate, and the permit value is still 1).
  2. When calling the park() method, if the permit of the current thread is 1, then set the permit to 0 and return immediately. If the permit of the current thread is 0, the current thread will block until another thread sets the permit of the current thread to 1. The park method will set the permit to 0 again and return.

Note: Because permit defaults to 0, the thread must be blocked when the park() method is called at the beginning. After calling the unpark(thread) method, the thread thread is automatically awakened, that is, the park method returns immediately.

2.LockSupport class example

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

// Simple FIFO non-reentrant lock
class FIFOMutex {
    //
    private final AtomicBoolean locked = new AtomicBoolean(false);
    // record waiting thread queue
    private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();

    public void lock() {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        waiters.add(current);

        // If the current thread is not the first in the waiting thread queue, or the locked state is already true, then the current thread will wait
        while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
            System.out.println(Thread.currentThread().getName()+"  park start");
            LockSupport.park(this);
            System.out.println(Thread.currentThread().getName()+"  park end");
            // If the interrupt thread flag of the waiting thread is true, set wasInterrupted to true
            if (Thread.interrupted())
                wasInterrupted = true;
        }

        // Remove the first element. The current thread is the first element, because the while judgment condition
        waiters.remove();
        // If wasInterrupted is true, the current thread issues an interrupt request
        if (wasInterrupted)
            current.interrupt();
        System.out.println(Thread.currentThread().getName()+" lock end" );
    }

    // Wake up threads that may be waiting
    public void unlock() {
        System.out.println(Thread.currentThread().getName()+"  unpark start ");
        // set locked to false
        locked.set(false);
        // Wake up the first element in the current thread queue
        LockSupport.unpark(waiters.peek());
        System.out.println(Thread.currentThread().getName()+"  unpark end ");
    }
}


public class LockSupportTest {

    public static void startThread(String name, final FIFOMutex clock, final CountDownLatch countDownLatch) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                clock.lock();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread().getName()+"  finally");
                    countDownLatch.countDown();
                    clock.unlock();
                }
            }
        }, name).start();
    }

    public static void main(String[] args) {
        FIFOMutex clock = new FIFOMutex();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        startThread("t111", clock, countDownLatch);
        startThread("t222", clock, countDownLatch);
        startThread("t333", clock, countDownLatch);

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main end");
    }
}

As can be seen from this example, the park method will block the current thread, and the unpark(thread) method will immediately wake up the blocked thread and let it continue execution from the park method.

3. LockSupport source code comments

package java.util.concurrent.locks;
import sun.misc.Unsafe;

import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;

/**
 * Provides methods for blocking threads and waking up threads.
 */
public class LockSupport {
    // The constructor is private, so cannot be instantiated externally
    private LockSupport() {}

    // Used to set the parkBlocker property of thread t. This object is logged when a thread is blocked to allow monitoring and diagnostic tools to determine why the thread is blocked.
    private static void setBlocker(Thread t, Object arg) {
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }

    // Wake up a blocked thread thread
    public static void unpark(Thread thread) {
        // Called when thread is not null
        if (thread != null)
            // Wake up blocked thread via UNSAFE's unpark
            UNSAFE.unpark(thread);
    }

    // block the current thread
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        // Set the parkBlocker property of thread t to record thread blocking
        setBlocker(t, blocker);
        // Block the thread through the park method of UNSAFE
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

    // Block the current thread for nanos nanosecond time, and the thread will be woken up and returned after the time is exceeded
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }
    // Block the current thread, the thread will be woken up and returned after the deadline date
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }

    // Get the parkBlocker property of thread t
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }

    // Block the current thread without setting the parkBlocker property
    public static void park() {
        UNSAFE.park(false, 0L);
    }

    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }

    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13;   // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        }
        else if ((r = ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

    // Hotspot implementation via intrinsics API
    private static final Unsafe UNSAFE;
    private static final long parkBlockerOffset;
    private static final long SEED;
    private static final long PROBE;
    private static final long SECONDARY;
    static {
        try {
            UNSAFE = Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) { throw new Error(ex); }
    }

}

The source code of LockSupport is relatively simple, mainly the method for the park series to block the current thread, and the method for unpark to wake up a thread.

Note that the methods of the park series directly block the current thread, so thread variable parameters are not required. The unpark method wakes up the corresponding thread, so the thread variable thread must be passed.

In this article on Java multithreading in detail, we introduced that there are six states for threads, and the park series method threads enter two states: WAITING waiting state or TIMED_WAITING waiting state. Both of these states cause the thread to block at its current location.

So how to wake up threads in these two states?

There are two wake-up methods for the WAITING wait state:

  1. Call the corresponding wakeup method. Here is the unpark method of LockSupport.
  2. Calling the interrupt() method of the thread variable will wake up the thread and throw an InterruptedException.

For the TIMED_WAITING waiting state, it has one more wake-up method than the WAITING state, that is, if the specified time is exceeded, the thread will wake up automatically.

4. Usage

When I used to write a test class, I would add Thread.sleep(Integer.MAX_VALUE) under the test class to let it sleep

 @Test
    public void mainTest() throws InterruptedException {
        Runnable runnable = () -> {
            for (int i = 0; i <= 1000000; i++) {
                System.out.println(Thread.currentThread().getName() + "-----" + i);
                if (i % 20 == 0) {
                    Thread.yield();
                }
            }
        };
        new Thread(runnable, "stack length").start();
        new Thread(runnable, "Xiaomi").start();

        Thread.sleep(Integer.MAX_VALUE);
    }

    

Now there is one more way I can do this

@Test
    public void mainTest1() throws InterruptedException {
        Runnable runnable = () -> {
            for (int i = 0; i <= 100; i++) {
                System.out.println(Thread.currentThread().getName() + "-----" + i);
                if (i % 20 == 0) {
                    Thread.yield();
                }
            }
        };
        new Thread(runnable, "stack length").start();
        new Thread(runnable, "Xiaomi").start();

        LockSupport.park();
    }

I have seen this in the spring boot program

@SpringBootApplication
public class xxxx {

    private static final Logger logger = LoggerFactory.getLogger(xxx.class);

    public static void main(String[] args) {
        
            SpringApplication sa = new SpringApplication(xxx.class);
            sa.run(args);
            LockSupport.park();
    }
}

It is still unclear what kind of law this is, and why it is done this way.

Posted by Cong on Tue, 17 May 2022 19:16:24 +0300