Interview question: How to design a blocking queue?

foreword

I was asked this question in the interview, and the answer was terrible, so I record it here.

The answer at that time was to use the while loop to judge whether it is empty when reading. If so, sleep for a period of time, and then judge again. If it is not empty, read and return. When inserting, use the while loop to judge whether it is full. sleep for a period of time, and then re-judgment, if not full, then insert.

The issue of thread safety is not considered at all, and the same way of circular waiting is too violent, which is really a bad design.

analyze

We know that the difference between a blocking queue and a normal queue is that the acquisition is blocked when the queue is empty, and the insertion is blocked when the queue is full. When blocked, our acquiring or inserting thread enters the blocking state, and when there is an element in the queue or there is remaining space, our thread continues to execute, then there are actually two key points in this question, how does the thread block? How does the thread know that it is OK and continue to execute after the condition is met (for example, the read thread continues to execute after the queue is not empty)?

For the first question, there are two methods of blocking, wait and sleep.

For the second question, this is actually a problem of communication between threads. The method of cyclic retry is equivalent to actively querying the status, so is there a similar way to call back, and tell me to continue the execution after the queue meets the conditions? Of course there is, that is notify.

ideas

We maintain a lock lock.

In the reading logic, we first judge whether the queue is empty. If it is empty, we lock.wait to make the reading thread wait. If it is not empty, we read an element, and lock.notify to wake up one randomly. Insert thread.

In the insertion logic, we first determine whether the queue is full. If it is full, we lock.wait to make the insertion thread wait. If it is not full, we insert an element, and lock.notify to randomly wake up a reading thread. .

Since the queue cannot be full and empty at the same time, the waiting thread must be the same thread (read or insert).

We maintain a cap capacity, a size queue size, and use an array of size cap to store the value of the queue, and use the head and tail pointers to record the head and tail positions. And declare the above variables as volatile to ensure thread safety.

We will insert and read locks to ensure that only one thread reads and writes at the same time, preventing repeated reads or insertions that exceed capacity.

code

class BlockQueue<T> {
Object[] queue;
private volatile int head, tail, size;
private final int cap;
private final Object lock = new Object();

public BlockQueue(int cap) {
this.cap = cap;
queue = new Object[cap];
}

public void offer(T t) throws InterruptedException{
synchronized (lock) {
if(size == cap) {
lock.wait();
}

queue[tail++] = t;
tail %= cap;
size++;

lock.notify();
}
}

public T poll() throws InterruptedException {
T res;
synchronized (lock) {
if(size == 0) {
lock.wait();
}

res = (T)queue[head++];
head %= cap;
size--;

lock.notify();
}
return res;
}
}

test code

public class Test {
public static void main(String[] args){
BlockQueue<Integer> blockQueue = new BlockQueue<>(10);
// Engage two threads to simulate the producer and the consumer respectively.
// For the first time, let consumers consume faster, and producers produce slower.
// At this point, it is expected that the consumer thread will block and wait. Every time there is a newly produced element, the consumer can consume
// The second time, let consumers consume slower and producers produce faster.
// At this point, it is expected to see that the producer thread will quickly insert elements into the queue at the beginning, and will block waiting after the insertion is full.
// Then the consumer thread consumes one element at a time, and the producer can produce a new element.
Thread producer = new Thread(){
@Override
public void run() {
for(int i = 0; i<1000; i++) {
try {
blockQueue.offer(i);
System.out.println("Production Elements:"+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
producer.start();
Thread consumer = new Thread() {
@Override
public void run() {
while (true) {
try {
int ret = blockQueue.poll();
System.out.println("consumer element: " + ret);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
consumer.start();
}
}

When consumption is slower than production (queue capacity is 10)

consumer element: 0
Production Elements: 0
Production Elements: 1
Production Elements: 2
Production Elements: 3
Production Elements: 4
Production Elements: 5
Production Elements: 6
Production Elements: 7
Production Elements: 8
Production Elements: 9
Production Elements: 10
consumer element: 1
Production Elements: 11
Production Elements: 12
consumer element: 2
consumer element: 3
Production Elements: 13
consumer element: 4
Production Elements: 14
consumer element: 5
Production Elements: 15
consumer element: 6
Production Elements: 16
consumer element: 7
Production Elements: 17
consumer element: 8

When production is slower than consumption

Production Elements: 0
consumer element: 0
Production Elements: 1
consumer element: 1
Production Elements: 2
consumer element: 2
Production Elements: 3
consumer element: 3
Production Elements: 4
consumer element: 4
Production Elements: 5
consumer element: 5
Production Elements: 6
consumer element: 6
consumer element: 7
Production Elements: 7
Production Elements: 8
consumer element: 8
Production Elements: 9
consumer element: 9
Production Elements: 10
consumer element: 10
Production Elements: 11
consumer element: 11
Production Elements: 12
consumer element: 12
consumer element: 13
Production Elements: 13

All meet expectations.

Tags: Java data structure

Posted by Jude on Tue, 17 May 2022 18:42:01 +0300