Redis distributed lock - a full understanding of this article (Redission's perfect scheme for realizing distributed lock)

preface

In some scenarios, multiple processes must monopolize and share resources in a mutually exclusive manner. At this time, distributed locking is the most direct and effective.

With the rapid development of technology, the increase of data scale and the increasing popularity of distributed systems, an application is often deployed on multiple machines (multiple nodes). In some scenarios, in order to ensure that the data is not repeated, the same task is required to run on only one node at the same time, that is, to ensure that a method can only be executed by one thread at the same time. In the stand-alone environment, the application is in the same process. It only needs to ensure the thread safety in the single process multithreading environment. It can be achieved through volatile, ReentrantLock, synchronized and concurrent provided by JAVA and contracting some thread safe classes. In the multi machine deployment environment, different machines and different processes need to ensure the safety of threads under multi processes. Therefore, distributed locks came into being.

Several implementation schemes have been seen or used in previous work, including zk based and redis based. Due to the lack of rigorous logic in the implementation, several deadlock case s will pop up on the line from time to time. So, what kind of distributed lock implementation is a better solution?

Comparison of common distributed lock schemes

classification programme Implementation principle advantage shortcoming
Database based Unique index based on mysql table 1. Add a unique index to the table
2. Lock: execute the insert statement. If an error is reported, it indicates that locking failed
3. Unlock: execute delete statement
Make full use of the existing capabilities of DB, and the implementation is simple 1. The lock has no timeout automatic failure mechanism, with deadlock risk
2. Lock reentry and blocking waiting are not supported
3. Operating the database costs a lot and the performance is not high
Atomic operation based on MongoDB findAndModify 1. Lock: execute the findAndModify atomic command to find the document. If it does not exist, it will be added
2. Unlock: delete document
It is also easy to implement, and the performance is much better than the scheme based on MySQL unique index 1. Most company databases use MySQL, which may lack corresponding MongoDB operation and maintenance personnel and developers
2. The lock has no timeout automatic failure mechanism
Based on distributed coordination system Based on ZooKeeper 1. Lock: create temporary ordered nodes in the / lock directory and judge whether the created node serial number is the smallest. If yes, the lock is obtained; No, the previous node in the watch /lock directory whose serial number is smaller than itself
2. Unlock: delete node
1. zk ensures high availability of the system
2. The cursor framework supports a series of distributed lock commands natively, which is easy to use
A zk cluster needs to be maintained separately, which leads to high maintenance cost
Cache based redis command based 1. Lock: execute setnx. If successful, execute expire to add the expiration time
2. Unlock: execute the delete command
The implementation is simple. Compared with the implementation of database and distributed system, this scheme is the lightest and has the best performance 1.setnx and expire are executed in two steps, non atomic operation; If setnx is executed successfully, but expire fails, deadlock may occur
2. The delete command may delete locks not held by the current thread by mistake
3. Blocking waiting and non reentry are not supported
Script capability based on redis Lua 1. Lock: execute SET lock_name random_value EX seconds NX command

2. Unlock: execute Lua script and verify random when releasing the lock_ value 
--ARGV[1] is random_value, KEYS[1] is lock_name

if redis.call("get", KEYS[1]) == ARGV[1] then

    return redis.call("del",KEYS[1])

else

    return 0

end

ditto; In addition to adopting a more rigorous production environment, this solution is not a problem. Lock reentry and blocking waiting are not supported

 

Several common schemes are compared in the table. redis+lua can basically meet the needs of distributed locks in work. However, when I happen to see the redisson distributed lock implementation scheme( Portal )Compared with the above schemes, reisson keeps simple and easy to use, supports lock reentry, supports blocking waiting and Lua script atomic operation. I can't help admiring the author's exquisite conception and superb coding ability. Now let's learn how redisson is implemented.

Distributed locks need to meet four conditions

First, in order to ensure the availability of distributed locks, we should at least ensure that the implementation of locks meets the following four conditions at the same time:

  1. Mutex. At any time, only one client can hold the lock.
  2. There will be no deadlock. Even if one client crashes while holding the lock without actively unlocking, it can ensure that other subsequent clients can lock.
  3. Whoever unties the bell must tie it. Locking and unlocking must be the same client. The client cannot unlock the lock added by others, that is, it cannot unlock by mistake.
  4. Fault tolerance. As long as most Redis nodes operate normally, the client can acquire and release locks.

Implementation of Redisson distributed lock

Redisson distributed reentry lock usage

Redisson supports single point mode, master-slave mode, sentinel mode and cluster mode. Here, take single point mode as an example:

// 1. Construct redisson to realize the necessary Config of distributed lock
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:5379").setPassword("123456").setDatabase(0);
// 2. Construct RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3. Obtain the lock object instance (it cannot be guaranteed to be obtained in the order of threads)
RLock rLock = redissonClient.getLock(lockKey);
try {
    /**
     * 4.Attempt to acquire lock
     * waitTimeout The maximum waiting time for trying to acquire a lock. If it exceeds this value, it is considered that acquiring a lock has failed
     * leaseTime   The holding time of the lock. After this time, the lock will automatically expire (the value should be set to be greater than the business processing time to ensure that the business can be processed within the lock validity period)
     */
    boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
    if (res) {
        //Successfully obtain the lock and process the business here
    }
} catch (Exception e) {
    throw new RuntimeException("aquire lock fail");
}finally{
    //In any case, unlock it in the end
    rLock.unlock();
}

redisson framework relies heavily on Lua script and Netty. The code is very powerful, and the asynchronous and synchronous operation conversion of all kinds of Future and FutureListener.

Think for yourself first. If you want to write a distributed lock component, what should you do? Two interfaces must be defined: lock and unlock; Da Dao Zhi Jian, the author of redisson uses Lua script at the execution level of locking and unlocking, which is very high, and it is important to ensure atomicity. Of course, the author of redisson is awesome after all. In the process of locking and unlocking, he also cleverly makes use of redis's publish and subscribe function, which will be discussed later. Let's first understand the locking and unlocking Lua script.

Lock & unlock script

Locking and unlocking Lua script is the most important part of redisson distributed lock implementation. First of all, without looking at the code, first study the logic of lua scripts

1. Lock Lua script

  • Script input parameter
parameter Example value meaning
Number of keys 1 Number of keys
KEYS[1] my_first_lock_name Lock name
ARGV[1] 60000 Valid time to hold lock: ms
ARGV[2] 58c62432-bb74-4d14-8a00-9908cc8b828f:1 Unique ID: the unique value of set when acquiring the lock, which is redisson client ID(UUID) + thread ID
  • Script content
-- If the lock does not exist: add a new lock, set the lock reentry count to 1, and set the lock expiration time
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- If the lock exists and the unique ID matches: it indicates that the current lock request is a lock reentry request, so the lock reentry count+1,And set the lock expiration time again
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
 
-- If the lock exists, but the unique ID does not match: it indicates that the lock is occupied by other threads, and the current thread has no right to unlock others. It directly returns the remaining expiration time of the lock
return redis.call('pttl', KEYS[1]);
  • Script interpretation

Q: What is the purpose of returning nil?  
A: If and only if nil is returned, it indicates that locking is successful; The client needs to know whether the locking is successful or not

2. Unlock Lua script

  • Script input parameter
parameter Example value meaning
Number of keys 2 Number of keys
KEYS[1] my_first_lock_name Lock name
KEYS[2] redisson_lock__channel:{my_first_lock_name} Unlock message PubSub channel
ARGV[1] 0 Reisson defines 0 as an unlock message
ARGV[2] 30000 Set the expiration time of the lock; The default value is 30 seconds
ARGV[3] 58c62432-bb74-4d14-8a00-9908cc8b828f:1 Unique identification; Same as locking process
  • Script content
-- If the lock does not exist: broadcast the unlocking message directly and return 1
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1; 
end;
 
-- If the lock exists but the unique ID does not match: it indicates that the lock is occupied by other threads, and the current thread is not allowed to unlock the lock held by other threads
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end; 
 
-- If the lock exists and the unique ID matches: first reduce the lock reentry count by 1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
    -- The lock reentry count minus 1 is still greater than 0: it indicates that the lock held by the current thread still has reentry and cannot be deleted, but it can be friendly to help set the expiration period
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
    -- The lock reentry count has been 0: indirectly, the lock has been released. Directly delete the lock and broadcast the unlock message to wake up the threads that have competed for the lock but are still blocked
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1;
end;
 
return nil;

 

  • Script interpretation

 

Q1: what's the use of broadcasting unlock messages?  
A: The purpose is to notify other threads blocked by the scramble lock, release them from the block, and scramble for the lock again.

Q2: what is the difference between the return values 0, 1 and nil?  
A: If and only if 1 is returned, it means that the current request really triggers the unlocking Lua script; But the client doesn't care about the return value of the unlock request. It seems useless?

Source code start

1. Lock process source code

When reading the lock source code, you can directly regard the tryAcquire(leaseTime, unit, threadId) method as executing the lock Lua script. Go directly to org redisson. Redissonlock #trylock (long, long, Java. Util. Concurrent. Timeunit) source code

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        // The maximum waiting time that can be tolerated by obtaining a lock
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
 
        // [core point 1] attempts to obtain the lock. If the return value is null, it indicates that the lock has been obtained
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
 
        // Tolerable waiting time = the maximum waiting time that can be tolerated when acquiring a lock - the time elapsed after performing the above operations
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
 
        current = System.currentTimeMillis();
        // [core point 2] subscribe to unlock messages, see org redisson. pubsub. LockPubSub#onMessage
        /**
     * 4.Subscribe to lock release events and block waiting for lock release through await method, which effectively solves the problem of waste of resources caused by invalid lock application:
     * Based on the amount of information, when the lock is occupied by other resources, the current thread subscribes to the lock release event through the Redis channel. Once the lock is released, a message will be sent to notify the waiting thread to compete
     * When this Await returns false, indicating that the waiting time has exceeded the maximum waiting time for obtaining the lock. Unsubscribe and return the failure to obtain the lock
     * When this Await returns true and enters the loop to try to obtain the lock
     */
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //The countait method is implemented asynchronously to obtain the result of the execution of netscript
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }
 
        // Subscription succeeded
        try {
            // Tolerable waiting time = the maximum waiting time that can be tolerated when acquiring a lock - the time elapsed after performing the above operations
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                // If the tolerable waiting time is exceeded, it directly returns the failure of obtaining the lock
                acquireFailed(threadId);
                return false;
            }
 
            while (true) {
                long currentTime = System.currentTimeMillis();
                // Attempt to acquire lock; If the lock is occupied by other threads, the remaining expiration time of the lock is returned [ditto]
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
 
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
 
                // waiting for message
                currentTime = System.currentTimeMillis();
 
                // [core point 3] adjust the blocking waiting time according to the lock TTL;
                // Note: the implementation here is very ingenious. 1. latch is actually a Semaphore. Calling its tryAcquire method will block the current thread for a period of time, avoiding frequent requests for locks in the while loop;
               //2. The release method of Semaphore will be in the listener message processing method org. Of the subscription unlocking message redisson. pubsub. Lockpubsub#onmessage call; When another thread releases the lock occupied, it will broadcast the unlocking message. The listener will receive the unlocking message and release the Semaphore. Finally, it will wake up the thread blocked here.
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
 
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // Unsubscribe from unlock messages
            unsubscribe(subscribeFuture, threadId);
        }
    }

The implementation of the next lock acquisition method tryAcquire is really to execute the Lua script!

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    // tryAcquireAsync asynchronously executes Lua script, and get method synchronously obtains the returned results
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}
 
//  See org redisson. RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        // The essence is to execute the locking Lua script asynchronously
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            //First judge whether the asynchronous operation is successfully executed. If it is not successful, it will be returned directly. If it is successful, the results will be obtained synchronously
            if (!future.isSuccess()) {
                return;
            }
 
            Long ttlRemaining = future.getNow();
            // lock acquired
            //If ttlRemaining is null, a scheduled scheduling method scheduleExpirationRenewal will be executed
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}
 
// See org redisson. RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
 
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

Summary of locking process

1. In fact, a lock is also a kind of resource. The operation of each thread competing for a lock corresponds to the reisson, which is competing to create a hash structure. Whoever creates it first represents who obtains the lock; The name of the hash is the lock name. The content of the hash contains only one key value pair. The key is the unique id of the redisson client + the id of the thread holding the lock, and the value is the lock reentry count; The expiration time set for the hash is the expiration time of the lock. Put a picture and feel it intuitively:

2. The core of the locking process is three steps
Step 1: try to obtain the lock by executing the lock Lua script;  
Step2: if the lock is not acquired in the first step, subscribe to the unlocking message. When the lock is acquired to the remaining expiration time, call the semaphore method to block it until it is awakened or the wait times out
Step 3: once the thread holding the lock releases the lock, it will broadcast the unlocking message. Thus, the listener of the unlocking message in the second step will release the semaphore, and those threads blocked in acquiring the lock will be awakened and try to acquire the lock again.

For example, the default value of the variable internalLockLeaseTime in RedissonLock is 30000 MS, and the default value of getLockWatchdogTimeout() obtained from the connection manager by calling tryLockInnerAsync() is 30000 Ms. these are related to the watchdog mechanism mentioned in the official redisson document. The watchdog still vividly describes this mechanism. What did the watchdog do and why did it do it, Take a look at the core code

Let's think about a problem first. Suppose that in a distributed environment, multiple service instances request to obtain a lock, and service instance 1 successfully obtains the lock. In the process of executing business logic, the service instance suddenly hangs or hang. Will the lock be released and when? To answer this question, I naturally think of the lua script we analyzed before. When locking for the first time, pexpire is used to set the expiration time for the lock key, which is 30000 milliseconds by default. From this point of view, if the service instance goes down, the lock will eventually be released, and other service instances can continue to obtain the lock to perform business. However, after 30000 milliseconds, if service instance 1 is not down but the business execution is not over, the thread problem will be caused if it is released. How does this redisson solve it? For this, we must realize the mechanism of automatically extending the validity of the lock.
After the lua script is executed asynchronously, a listener is set to handle some work after the asynchronous execution. After the operation is completed, it will execute the operationComplete method. First judge whether the asynchronous operation is successful. If it is not successful, it will return directly. If the execution is successful, it will obtain the results synchronously. If ttlRemaining is null, it will execute a scheduled scheduling method scheduleExpirationRenewal. Recall the previous lua script when locking logic
After processing, a nil is returned; In this way, we will definitely take the scheduled task. Let's take a look at the code of schedule expiration renewal

private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }
 
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
 
        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

First, we will first judge whether there is an entryName in the expirationRenewalMap. This is a map structure. It is mainly to judge whether the lock key of the locking client in this service instance exists. If it already exists, it will be returned directly; The first lock must not exist. The next step is to create a TimeTask to execute after delaying internalLockLeaseTime/3. Here we use the wonderful variables mentioned at the beginning of the article. It is executed once in about 10 seconds and calls an asynchronous execution method

As shown in the figure, a lua script is executed asynchronously. First, judge whether the corresponding key 8a9649f5-f5b5-48b4-beaa-d0c24855f9ab: anylock: 1 exists in the map structure of the lock key. If so, directly call the pexpire command to set the expiration time of the lock key, which is 30000 milliseconds by default.

OK, now the idea is clear. In the above task scheduling method, it is also executed asynchronously, and a listener is set. After the operation is successfully executed, the method will be called back. If the call fails, an error log will be printed and returned, and the update of lock expiration time fails; Then get the result of asynchronous execution. If it is true, it will call itself. In this way, it will delay 10 seconds to execute this logic. Therefore, this logic will be executed every 10 seconds after you successfully obtain the lock. In addition, when the lock key has not expired, it will continue to extend the expiration time of the lock to 30000 milliseconds, that is, as long as the service instance has not hung up and has not actively released the lock, The watchdog will renew your contract every ten seconds to ensure that the lock is always in your hand. Perfect operation.
Up to now, it's OK to add a lock and automatically extend the expiration time of the lock. Then, what happens when other service instances try to add a lock during the period when you execute the business and hold the lock? Or another thread of the current client to obtain the lock? Obviously, it will block. Let's see how it is done through the code. Let's focus on the lock lua code analyzed earlier. When the lock key exists and the unique key of the current client in the map structure corresponding to the lock key also exists, we will call the hincrby command to increase the value of the unique key by one, and pexpire will set the expiration time of the key to 30000 milliseconds, and then return to nil. We can imagine that the lock is successful here, and we will continue to perform scheduled scheduling tasks, Complete the renewal of the expiration time of the lock key. Here, the reentry of the lock is realized.
When the above situation does not happen, the remaining validity period of the current lock will be returned directly, and the renewal logic will not be executed accordingly. At this time, return to the above method. If the locking is successful, return directly; Otherwise, it will enter an endless loop to try to lock, and will keep trying to lock after waiting for a period of time. It will be blocked until the first service instance releases the lock. For different service instances, you will try to obtain a lock, which is similar to the logic above. In this way, the mutual exclusion of locks is realized.

Next, let's look at the logic of lock release. In fact, it is also very simple. Lock is called The unlock () method follows the code flow and finds that it also asynchronously calls a lua script. The lua script should be relatively clear, that is, by judging whether the lock key exists, if not, it will be returned directly; Otherwise, it will judge whether the value of the unique key corresponding to the current client exists. If it does not exist, it will return nil; Otherwise, the value will automatically increase by - 1 to judge whether the value of the unique key is greater than zero. If it is greater than zero, it will return 0; Otherwise, delete the current lock key and return 1; Returning to the upper layer method is also an operation for the return value. If the return value is 1, the previous scheduled renewal task will be cancelled. If it fails, some operations similar to setting the status will be done. These have nothing to do with the unlocking logic. You can not see it.

Unlock process source code

The unlocking process is relatively simple. It is entirely to execute the unlocking Lua script without additional code logic. Just look at org. Org redisson. Redissonlock#unlock code

@Override
    public void unlock() {
        // Execute the unlock Lua script, where the thread id is passed in to ensure that the locking and unlocking are the same thread, so as to avoid unlocking the locks held by other threads by mistake
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }
 
// See org redisson. RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
 
}

 

c. Lock unlock process

In combination with Lua script and source code, the above analyzes the locking process and unlocking process respectively. Now upgrade the challenge difficulty and simulate the process of multiple threads competing for locks. The schematic diagram is as follows, and the three key points have been marked in red font.

 

Summarize the whole process

1. Thread A and thread B compete for locks at the same time. Thread A was lucky to get the lock first. After thread B failed to acquire the lock, it did not give up hope. Instead, it actively subscribed to the unlocking message, and then tried to acquire the lock. By the way, see how long the lock that has not been robbed will expire. Thread B will block the release of the lock on demand.

2. Thread A finished the work with the lock and consciously released the lock held. At the same time, it broadcast the unlocking message and informed other thread grabbing the lock to come back;

3. After receiving the message, LockPubSub, the listener who unlocks the message, releases the semaphore it holds; Thread B was instantly awakened from the blocking, and then grabbed the lock again. This time, it finally grabbed the lock! Then follow the steps, finish the work and unlock it

Other materials

Q1: Why are there curly braces in the name of the subscription channel (e.g. reisson_lock_channel: {my_first_lock_name})?  
A:  
1. In the redis cluster scheme, if the Lua script involves the operation of multiple keys, these keys need to be restricted in the same slot to ensure the atomicity of the execution of the Lua script. Otherwise, Lua script attempted to access a non local key in a cluster node will be reported channel;  
2.HashTag is a substring of {} wrapped key. If HashTag is set, the cluster will decide which slot the key is assigned to according to HashTag; HashTag does not support nesting. Only the contents in the first left bracket {and the first right bracket} are considered as HashTag to participate in slot calculation; Usually, the client encapsulates this computing logic.

// See org redisson. cluster. ClusterConnectionManager#calcSlot
@Override
public int calcSlot(String key) {
    if (key == null) {
        return 0;
    }
 
    int start = key.indexOf('{');
    if (start != -1) {
        int end = key.indexOf('}');
        key = key.substring(start+1, end);
    }
 
    int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
    log.debug("slot {} for {}", result, key);
    return result;
}

3. In the unlocking Lua script, two keys are operated: one is the lock name my_lock_name, one is to unlock the message publishing and subscription channel redisson_lock__channel:{my_first_lock_name}. According to the slot calculation method above, both keys will be calculated according to the content of my_ first_ lock_ Name to calculate, so it can be guaranteed to fall into the same slot

Q2: almost all redisson code interacts with redis server in the form of Lua script. How to track the execution process of these scripts?  
A: Start a redis client terminal and execute the monitor command to print the commands received by the redis server in real time on the terminal; Then debug executes the reisson lock / unlock test case to see which Lua scripts are actually executed during the code operation

eg: test case bits of the overall flow diagram above:

@RunWith(SpringRunner.class)
@SpringBootTest
public class RedissonDistributedLockerTest {
 
    private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class);
 
    @Resource
    private DistributedLocker distributedLocker;
 
    private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor();
 
    private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor();
 
    @Test
    public void tryLockUnlockCost() throws Exception {
        StopWatch stopWatch = new StopWatch("Lock unlock time statistics");
        stopWatch.start();
        for (int i = 0; i < 10000; i++) {
            String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
            Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
            Assert.assertTrue(optLocked.isPresent());
            optLocked.get().unlock();
        }
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
    }
 
    @Test
    public void tryLock() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
        Assert.assertTrue(optLocked.isPresent());
 
        Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000);
        Assert.assertTrue(optLocked2.isPresent());
 
        optLocked.get().unlock();
    }
 
    /**
     * Simulate two threads competing for locks: A obtains the lock first, A releases the lock, and B obtains the lock again
     */
    @Test
    public void tryLock2() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future<Optional<LockResource>> submit = executorServiceB.submit(() -> {
                    countDownLatch.await();
                    log.info("B Attempt to acquire lock: thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        log.info("A Attempt to acquire lock: thread={}", currentThreadId());
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000);
        Assert.assertTrue(optLocked.isPresent());
 
        log.info("A Lock acquired: thread={}", currentThreadId());
        countDownLatch.countDown();
 
        optLocked.get().unlock();
        log.info("A Released lock: thread={}", currentThreadId());
 
        Optional<LockResource> lockResource2 = submit.get();
        Assert.assertTrue(lockResource2.isPresent());
 
        executorServiceB.submit(() -> {
            log.info("B Lock acquired: thread={}", currentThreadId());
            lockResource2.get().unlock();
            log.info("B Released lock: thread={}", currentThreadId());
        });
    }
 
    /**
     * Simulate three threads competing for locks: A obtains the lock first, A releases the lock, and B and C compete for the lock at the same time
     */
    @Test
    public void tryLock3() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
 
        log.info("A Attempt to acquire lock: thread={}", currentThreadId());
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
 
        if (optLocked.isPresent()) {
            log.info("A Lock acquired: thread={}", currentThreadId());
        }
        Assert.assertTrue(optLocked.isPresent());
 
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> {
                    cyclicBarrier.await();
                    log.info("B Attempt to acquire lock: thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> {
                    cyclicBarrier.await();
                    log.info("C Attempt to acquire lock: thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        optLocked.get().unlock();
        log.info("A Released lock: thread={}", currentThreadId());
 
        CountDownLatch countDownLatch = new CountDownLatch(2);
        executorServiceB.submit(() -> {
            log.info("B Lock acquired: thread={}", currentThreadId());
            try {
                submitB.get().get().unlock();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            log.info("B Released lock: thread={}", currentThreadId());
            countDownLatch.countDown();
        });
 
        executorServiceC.submit(() -> {
            log.info("C Lock acquired: thread={}", currentThreadId());
            try {
                submitC.get().get().unlock();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            log.info("C Released lock: thread={}", currentThreadId());
            countDownLatch.countDown();
        });
 
        countDownLatch.await();
    }
 
    private static Long currentThreadId() {
        return Thread.currentThread().getId();
    }
 
    @Test
    public void tryLockWaitTimeout() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString();
 
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000);
        Assert.assertTrue(optLocked.isPresent());
 
        Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> {
                long now = System.currentTimeMillis();
                Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10);
                long cost = System.currentTimeMillis() - now;
                log.info("cost={}", cost);
                return optLockedAgain;
        }).exceptionally(th -> {
            log.error("Exception: ", th);
            return Optional.empty();
        }).join();
 
        Assert.assertTrue(!optLockResource.isPresent());
    }
 
    @Test
    public void tryLockWithLeaseTime() throws Exception {
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000);
        Assert.assertTrue(optLocked.isPresent());
 
        // Reentrant
        Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000);
        Assert.assertTrue(optLockedAgain.isPresent());
    }
 
    /**
     * Simulate 1000 concurrent request guns and a lock
     */
    @Test
    public void tryLockWithLeaseTimeOnMultiThread() throws Exception {
        int totalThread = 1000;
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
        AtomicInteger acquiredLockTimes = new AtomicInteger(0);
 
        ExecutorService executor = Executors.newFixedThreadPool(totalThread);
        for (int i = 0; i < totalThread; i++) {
            executor.submit(new Runnable() {
 
                @Override
                public void run() {
                    tryAcquireLockTimes.getAndIncrement();
                    Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000);
                    if (optLocked.isPresent()) {
                        acquiredLockTimes.getAndIncrement();
                    }
                }
            });
        }
        executor.awaitTermination(15, TimeUnit.SECONDS);
 
        Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
        Assert.assertTrue(acquiredLockTimes.get() == 1);
    }
 
    @Test
    public void tryLockWithLeaseTimeOnMultiThread2() throws Exception {
        int totalThread = 100;
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
        AtomicInteger acquiredLockTimes = new AtomicInteger(0);
 
        ExecutorService executor = Executors.newFixedThreadPool(totalThread);
        for (int i = 0; i < totalThread; i++) {
            executor.submit(new Runnable() {
 
                @Override
                public void run() {
                    long now = System.currentTimeMillis();
                    Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10000, 5);
                    long cost = System.currentTimeMillis() - now;
                    log.info("tryAcquireLockTimes={}||wait={}", tryAcquireLockTimes.incrementAndGet(), cost);
                    if (optLocked.isPresent()) {
                        acquiredLockTimes.getAndIncrement();
                        // Active release lock
                        optLocked.get().unlock();
                    }
                }
            });
        }
        executor.awaitTermination(20, TimeUnit.SECONDS);
 
        log.info("tryAcquireLockTimes={}, acquireLockTimes={}", tryAcquireLockTimes.get(), acquiredLockTimes.get());
        Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
        Assert.assertTrue(acquiredLockTimes.get() == totalThread);
    }
 
}
 
 
public interface DistributedLocker {
 
    Optional<LockResource> tryLock(String lockKey, int waitTime);
 
    Optional<LockResource> tryLock(String lockKey, int waitTime, int leaseTime);
 
}
 
public interface LockResource {
 
    void unlock();
 
}

The Lua script executed is as follows:

Lock: redissonclient getLock("my_first_lock_name"). tryLock(600000, 600000);  
Unlock: redissonclient getLock("my_first_lock_name"). unlock();

# Thread A
## 1.1.1 attempt to acquire lock - > successful
1568357723.205362 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357723.205452 [0 lua] "exists" "my_first_lock_name"
1568357723.208858 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "1"
1568357723.208874 [0 lua] "pexpire" "my_first_lock_name" "600000"
 
 
# Thread B
### Lock acquisition attempt, not expired, return to 1.2
1568357773.338018 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338161 [0 lua] "exists" "my_first_lock_name"
1568357773.338177 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338197 [0 lua] "pttl" "my_first_lock_name"
 
 
## 2.1.1.3 add subscription (non Lua script) - > subscription succeeded
1568357799.403341 [0 127.0.0.1:56421] "SUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
 
 
## 2.1.1.4 try to acquire the lock again - > if the lock is not acquired, return the remaining expiration time of the lock
1568357830.683631 [0 127.0.0.1:56418] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684371 [0 lua] "exists" "my_first_lock_name"
1568357830.684428 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684485 [0 lua] "pttl" "my_first_lock_name"
 
 
# Thread A
## 3.1.1 release the lock and broadcast the unlocking message. 0 represents the unlocking message
1568357922.122454 [0 127.0.0.1:56420] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123645 [0 lua] "exists" "my_first_lock_name"
1568357922.123701 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123741 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "-1"
1568357922.123775 [0 lua] "del" "my_first_lock_name"
1568357922.123799 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"
 
 
# Thread B
## Listen to the unlocking message - > release semaphore, and the blocking is released; 4.1.1.1 try to acquire lock again - > acquire successfully
1568357975.015206 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357975.015579 [0 lua] "exists" "my_first_lock_name"
1568357975.015633 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "1"
1568357975.015721 [0 lua] "pexpire" "my_first_lock_name" "600000"
 
## 4.1.1.3 unsubscribe (non Lua script)
1568358031.185226 [0 127.0.0.1:56421] "UNSUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
 
 
# Thread B
## 5.1.1 release the lock and broadcast the unlocking message
1568358255.551896 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552125 [0 lua] "exists" "my_first_lock_name"
1568358255.552156 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552200 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "-1"
1568358255.552258 [0 lua] "del" "my_first_lock_name"
1568358255.552304 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"

It should be noted that RedissonLock also does not solve the problem of the risk of losing locks when a node hangs up. The reality is that some scenarios cannot be tolerated, so Redisson provides redisonredlock that implements the redislock algorithm. Redisonredlock really solves the problem of single point failure at the cost of building an additional Redis environment for redisonredlock.

Therefore, if the business scenario can tolerate such small probability errors, RedissonLock is recommended. If it cannot be tolerated, RedissonRedLock is recommended.

redlock algorithm

The introduction of redLock algorithm on Redis official website is as follows:

The Redlock algorithm

In the distributed version of the algorithm, we assume that we have N Redis master nodes, which are completely independent. We don't need any replication or other implicit distributed coordination mechanisms. Previously, we have described how to obtain and release locks safely under Redis single instance. We ensure that we will use this method to acquire and release locks on every (N) instance. In our example, we set N to 5, which is a reasonable setting, so we need to run these instances on 5 machines or 5 virtual machines to ensure that they will not all go down at the same time. In order to get the lock, the client should do the following:

  1. Gets the current Unix time in milliseconds.

  2. Try to obtain locks from five instances in turn, using the same key and unique value (such as UUID). When requesting to obtain a lock from Redis, the client should set a maximum waiting time for trying to obtain a lock from a IDS instance (if it exceeds this time, ask the next instance immediately). This timeout should be less than the expiration time of the lock. For example, if your lock automatically expires for 10 seconds, the timeout should be between 5-50 milliseconds. This can prevent the client from waiting for the response result when the server-side Redis has hung up. If the server fails to respond within the specified time, the client should try to obtain the lock from another Redis instance as soon as possible.

  3. The client uses the current time to subtract the time to start acquiring the lock (the time recorded in step 1) to get the time spent acquiring the lock. The lock can only be obtained if and only if the lock is obtained from most Redis nodes (N/2+1, here are three nodes) and the total time used is less than the lock expiration time.

  4. If the lock is obtained, the real effective time of the key = the effective time (the automatic timeout of the key set when obtaining the lock) - the total time taken to obtain the lock (the sum of the total time taken to query each Redis instance) (the result calculated in step 3).

  5. If the lock acquisition fails for some reason (that is, the lock is not obtained in at least "N/2+1" Redis instances, or the "total time taken to acquire the lock" exceeds the "effective time"), The client should unlock all Redis instances (even if some Redis instances are not locked successfully at all, which can prevent some nodes from acquiring locks, but the client does not get a response, so that the lock cannot be acquired again in the next period of time).

Implementation of distributed lock with Redisson (red lock Redisson redlock) and source code analysis (implementation III)

Taking the three stand-alone modes as an example, it should be noted that they are completely independent of each other, and there is no master-slave replication or other cluster coordination mechanisms.


Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
 
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
 
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
 
/**
 * Get multiple RLock objects
 */
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);
 
/**
 * RedissonRedLock is built based on multiple RLock objects (the core difference is here)
 */
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
 
try {
    /**
     * 4.Attempt to acquire lock
     * waitTimeout The maximum waiting time for trying to acquire a lock. If it exceeds this value, it is considered that acquiring a lock has failed
     * leaseTime   The holding time of the lock. After this time, the lock will automatically expire (the value should be set to be greater than the business processing time to ensure that the business can be processed within the lock validity period)
     */
    boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
    if (res) {
        //Successfully obtain the lock and process the business here
    }
} catch (Exception e) {
    throw new RuntimeException("aquire lock fail");
}finally{
    //In any case, unlock it in the end
    redLock.unlock();
}

The core change is to build multiple rlocks, and then build a redisonredlock based on multiple rlocks, because the redLock algorithm is based on multiple independent Redis environments (to distinguish them, it can be called redistribution node). The redistribution node can be either single mode, master / slave mode, sentinel mode, or cluster mode. This means that you can't just build one cluster, one sentinel cluster, or one master-slave architecture as before. You need to build several additional sets of independent redistribution nodes for redisson redLock. For example, depending on the specific situation, you can set up three or five redistributions.

The following figure is an example of using multiple redistribution nodes to finally form a RedLock distributed lock. It should be noted that each redistribution node is independent of each other, and there is no replication or other implied distributed coordination mechanism.


#Redisson implementation redlock algorithm source code analysis (redlock)

Lock core code

org.redisson.RedissonMultiLock#tryLock


public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    if (leaseTime != -1) {
        newLeaseTime = unit.toMillis(waitTime)*2;
    }
    
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    /**
     * 1. Limit on the number of nodes allowed to fail locking (N-(N/2+1))
     */
    int failedLocksLimit = failedLocksLimit();
    /**
     * 2. Traverse all nodes and execute lua locking through EVAL command
     */
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        /**
         *  3.Attempting to lock a node
         */
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // If such exceptions are thrown, all nodes need to be unlocked in order to prevent successful locking but failed response
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // Throwing an exception indicates that obtaining the lock failed
            lockAcquired = false;
        }
        
        if (lockAcquired) {
            /**
             *4. If a lock is acquired, it is added to the acquired lock collection
             */
            acquiredLocks.add(lock);
        } else {
            /**
             * 5. Calculate whether the node that has failed to apply for lock has reached the limit for the number of nodes that can fail to lock (N-(N/2+1))
             * If it has arrived, it will be deemed that the final application for lock failed, and there is no need to continue to apply from the following nodes
             * Because the Redlock algorithm requires that at least N/2+1 nodes are locked successfully before the final lock application is successful
             */
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }
 
            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }
 
        /**
         * 6.Calculate the total time that has been consumed to obtain locks from each node. If it is equal to the maximum waiting time, it is deemed that the final lock application failed and false is returned
         */
        if (remainTime != -1) {
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }
 
    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
 
    /**
     * 7.If the logic is executed normally, the final lock application is considered successful and returns true
     */
    return true;
}

reference

[1]Distributed locks with Redis

[2]Distributed locks with Redis Chinese version

[3]SET - Redis

[4]EVAL command

[5] Redisson

[6]Correct implementation of Redis distributed lock

[7]Redlock implements distributed locks

[8]Redison implements Redis distributed lock

Tags: Java Redis Distributed lock lua

Posted by laide234 on Thu, 19 May 2022 05:32:16 +0300