Simple implementation of Redis and Zookeeper with distributed lock

In a distributed system, threads from different instances will access the same critical resource. At this time, we need a distributed coordination technology to schedule threads. The core implementation is distributed lock.

Characteristics of distributed locks

  • In a distributed environment, the same critical resource \ critical operation can only be accessed \ executed by one thread of one machine at the same time.
  • Highly available lock acquisition and lock release
  • High performance lock acquisition and lock release
  • Reentrant, the same task can acquire locks multiple times
  • With lock failure mechanism
  • Non blocking lock can be realized

Redis simple implementation

Lock

SETNX lock_id 1

SETNX command is short for "SET if Not eXists". Returns 1 when the key does not exist and 0 when the key exists. When the key has a value, we can consider that the resource with the corresponding id is locked. When a thread executes the command, if 0 is returned, it indicates that the resource has been locked, and obtaining the lock failed. When 1 is returned, it indicates that the resource has not been locked before, and the current thread has successfully obtained the lock.

Release lock

DEL lock_id

The DEL command deletes lock_id to release the lock so that other threads are running setnx lock_ The lock can be obtained when the ID is.

Deadlock prevention

If a thread crashes before unlocking after locking, it will cause no one to release the lock and form a deadlock. To prevent this, we need to set a timeout for the lock. If you simply use the following statement to set the timeout, it will cause problems:

EXPIRE lock_id 30

The reason for the problem is that the two operations between SETNX and express are non atomic, which will lead to deadlock if the thread crashes between SETNX and express. The correct solution is to use the following statement:

SET lock_id 1 NX EX 30

NX and EX are SET command options introduced by Redis after version 2.6.12. NX is similar to SETNX, which only makes SET effective when the key does not exist. EX is to SET the expiration time of the key. See the official documents for specific options SET key value [EX seconds] [PX milliseconds] [NX|XX].

Prevent accidental deletion

In some cases, the lock may be deleted by mistake, that is, the lock obtained by the thread is deleted by other threads. To prevent this, we can set the value of the lock to the id of the thread or client:

SET lock_id THREAD_ID NX EX 30

Then, obtain the value of the lock before deleting, compare it with its own id, and delete the lock only when the id is the same. This involves executing two commands:

GET lock_id
# If the id is the same:
DEL lock_id

When we see the execution of two commands, we find another problem: this operation is not atomic. If the value of the lock changes between the GET command and the DEL command, it will also be deleted by mistake. In order to solve this problem, the official Solution Is Lua script:

EVAL "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 lock_id THREAD_ID

The first parameter of EVAL is Lua statement, and the second parameter is number (in this case, a key and a parameter). The following parameters are key and the parameters passed in to Lua. The Lua script executed by Redis is atomic, and the Lua executed is as follows:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

So far, we have completed the simple Redis lock implementation.

ZooKeeper distributed lock principle

Four nodes of ZooKeeper

  • PERSISTENT node is the default node type. After the client that created the node disconnects from Zookeeper, the node still exists.
  • PERSISTENT_SEQUENTIAL node is called sequential node. When creating a node, Zookeeper numbers the name of the node according to the time sequence of creation.
  • In contrast to the persistent node, the temporary node will be deleted when the client creating the node disconnects from Zookeeper.
  • As the name suggests, temporary sequential node combines the characteristics of temporary node and sequential node: when creating a node, Zookeeper numbers the name of the node according to the time sequence of creation; When the client creating the node disconnects from Zookeeper, the temporary node will be deleted.

Lock

First, create a persistent node in Zookeeper. When the first client wants to obtain the lock, it needs to create a temporary order node Lock1 under this node.

After that, Client1 finds all temporary order nodes under the node and sorts them to determine whether the node Lock1 created by itself is the one with the highest order. If it is the first node, the lock is successfully obtained.

Client2 finds and sorts all the following temporary order nodes, and judges whether the node Lock2 created by itself is the one with the highest order. It is found that node Lock2 is not the smallest. Therefore, client2 registers Watcher with node Lock1 whose ranking is only higher than it to listen for the existence of Lock1 node. This means that client2 failed to grab the lock and entered the waiting state. This forms a waiting queue

Unlock

When the task is completed, Client1 will display the instruction calling to delete node Lock1. Because Client2 has been monitoring the existence status of Lock1, when the Lock1 node is deleted, Client2 will immediately receive a notification. At this time, Client2 will query all nodes under the parent node again to confirm whether the node Lock2 created by itself is the smallest node at present. If it is the smallest, Client2 naturally obtains the lock.

realization

public class ZkLockImpl implements ZkLock {

    private static final Logger LOG = LoggerFactory.getLogger(ZkLock.class);

    private final String lockPath;
    private final ZkClient zkClient;
    private final ThreadLocal<String> curNode = new ThreadLocal<>();
    private final ThreadLocal<String> preNode = new ThreadLocal<>();

    /**
     * Constructor for basic ZkLock
     *
     * @param servers Servers list for zookeeper, see {@link ZkClient#ZkClient(java.lang.String, int, int)}
     * @param sessionTimeout Session Timeout for zookeeper, see {@link ZkClient#ZkClient(java.lang.String, int, int)}
     * @param connectionTimeout Connection Timeout for zookeeper, see {@link ZkClient#ZkClient(java.lang.String, int, int)}
     * @param lockPath the path of this lock in zookeeper
     */
    public ZkLockImpl(String servers, int sessionTimeout, int connectionTimeout, String lockPath){
        this.lockPath = lockPath;
        this.zkClient = new ZkClient(servers, sessionTimeout, connectionTimeout);
        if(!zkClient.exists(lockPath)){
            zkClient.createPersistent(lockPath);
            LOG.info("Connected to [{}], lock path:[{}] created",servers,lockPath);
        }else {
            LOG.info("Connected to [{}], lock path:[{}] existed",servers,lockPath);
        }
    }

    /**
     * Try if the thread occupied lock currently.
     *
     * @return Ture if thread occupied the lock
     */
    private boolean tryLock() {
        List<String> lockQueue = zkClient.getChildren(lockPath);
        Collections.sort(lockQueue);
        if(lockQueue.size()>0&&(lockPath+"/"+lockQueue.get(0)).equals(curNode.get())){
            LOG.debug("Lock [{}] acquired",lockPath);
            return true;
        }else{
            int index = lockQueue.indexOf(curNode.get().substring(lockPath.length()+1));
            preNode.set(lockPath+"/"+lockQueue.get(index-1));
            LOG.debug("Lock [{}] is occupied, set preNode to [{}]",lockPath,preNode.get());
            return false;
        }
    }

    public void lock() {
        CountDownLatch latch = new CountDownLatch(1);

        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) {

            }

            @Override
            public void handleDataDeleted(String dataPath) {
                LOG.debug("Node [{}] has been deleted",dataPath);
                latch.countDown();
            }
        };

        if(null==curNode.get()){
            curNode.set(zkClient.createEphemeralSequential(lockPath+"/","Lock"));
            LOG.debug("curNode [{}] has been created",curNode.get());
        }else {
            throw new LockException("ZkLock is not reentrant");
        }

        if(!tryLock()&&zkClient.exists(preNode.get())){
            zkClient.subscribeDataChanges(preNode.get(),listener);
            while(zkClient.exists(preNode.get())&&!tryLock()){
                try {
                    LOG.debug("Thread blocked in lock [{}]",lockPath);
                    latch.await();
                } catch (InterruptedException e) {
                    LOG.error(e.getMessage());
                }
            }
            zkClient.unsubscribeDataChanges(preNode.get(),listener);
        }
    }

    public boolean releaseLock() {
        if(null!=curNode.get()&&zkClient.exists(curNode.get())&&tryLock()){
            if(zkClient.delete(curNode.get())){
                LOG.debug("Lock [{}] released, Node [{}] deleted",lockPath,curNode.get());
                curNode.remove();
                return true;
            }
        }
        LOG.error("Illegally lock release");
        return false;
    }
}

See me for the project GitHub

reference resources

http://www.imodou.com.cn/article/67

https://www.funtl.com/zh/apache-dubbo-zookeeper/Zookeeper-%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81.html#zookeeper-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E7%9A%84%E5%8E%9F%E7%90%86

Tags: Java Distribution

Posted by titeroy on Wed, 11 May 2022 18:08:01 +0300