Multi-threaded production and consumption mode

production consumption model
Operating the same shared resource through different threads, this phenomenon is the production and consumption model.

1. Example of a single-threaded production and consumption model:

1.1 Create a resource sharing class

package scxf;
/**
 * resource sharing class
 */
public class Resource {
    //Save the shared resource array
    private Object obj[] = new Object[1];
    //Record production and consumption times
    private int num = 1;
    //Create a synchronization object
    private static final Object lock = new Object();
    //Create a producer
    public void add() throws InterruptedException {
        synchronized (lock) {
            if (obj[0] != null) {
                lock.wait();
            }
            obj[0] = "water" + num;
            System.out.println(Thread.currentThread().getName() + "is pouring water" + obj[0]);
            num++;
            lock.notify();
        }
    }
    //create consumer
    public void delete() throws InterruptedException {
        synchronized (lock) {
            if (obj[0] == null) {
                lock.wait();
            }
            obj[0] = "water" + num;
            System.out.println(Thread.currentThread().getName() + "is pumping:" + obj[0]);
            obj[0] = null;
            lock.notify();
        }
    }
}

1.2 Create a producer class and implement the Runnable interface

package scxf;
/**
 * producer class
 * Continuous water injection results in multiple injections or multiple draws
 */
public class Producre implements Runnable {
    //Define resource sharing objects
    private Resource resource;
    //Loading resource sharing objects through the no-argument constructor
    public Producre(Resource resource) {
        this.resource = resource;
    }
    @Override
    public void run() {
        //Continuous water injection
        while (true) {
            try {
                resource.add();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

1.3 Create a consumer class and implement the Runnable interface

package scxf5;
/**
 * consumer class
 * Continuous water injection results in multiple injections or multiple draws
 */
public class Consumer implements Runnable {
    //Define resource sharing objects
    private Resource resource;
    //Pass in the resource sharing object through the no-argument constructor
    public Consumer(Resource resource) {
        this.resource = resource;
    }
    @Override
    public void run() {
        //continuous pumping
        while (true) {
            try {
                resource.delete();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

1.4 Create test classes, create production and consumption threads for testing

package scxf;
/**
 * The result is multiple fills or multiple draws
 */
public class TestMain {
    public static void main(String[] args) {
        //Create a resource sharing object
        Resource rs = new Resource();
        //Create producer object
        Producre sc = new Producre(rs);
        //Create consumer object
        Consumer xf = new Consumer(rs);
        //Create producer thread
        Thread scthread = new Thread(sc);
        //Create a consumer thread
        Thread xfthread = new Thread(xf);
        //Name the production thread
        scthread.setName("producer one:");
        //Name the consuming thread
        xfthread.setName("consumer one:");
        //Start the producer thread
        scthread.start();
        //start consumer thread
        xfthread.start();
    }
}

1.5 Test results:

2. Example of multi-threaded production and consumption model:

2.1 Create a resource sharing class

package scxf6;
/**
 * resource sharing class
 */
public class Resource {
    //Save the shared resource array
    private Object obj[] = new Object[1];
    //Record production and consumption times
    private int num = 1;
    //Create a synchronization object
    private static final Object lock = new Object();
    //Create a producer
    public void add() throws InterruptedException {
        synchronized (lock) {
            while (obj[0] != null) {
                lock.wait();
            }
            obj[0] = "water" + num;
            System.out.println(Thread.currentThread().getName() + "is pouring water" + obj[0]);
            num++;
            lock.notifyAll();
        }
    }

    //create consumer
    public void delete() throws InterruptedException {
        synchronized (lock) {
            while (obj[0] == null) {
                lock.wait();
            }
            obj[0] = "water" + num;
            System.out.println(Thread.currentThread().getName() + "is pumping" + obj[0]);
            obj[0] = null;
            lock.notifyAll();
        }
    }
}

2.2 Create a resource producer class and implement the runnable interface

package scxf6;
/**
 * producer class
 * Continuous water injection results in multiple injections or multiple draws
 */
public class Producre implements Runnable {
    //Define resource sharing objects
    private Resource resource;
    //Loading resource sharing objects through the no-argument constructor
    public Producre(Resource resource) {
        this.resource = resource;
    }
    @Override
    public void run() {
        //Continuous water injection
        while (true) {
            try {
                resource.add();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

2.3 Create a resource consumer class and implement the runnable interface

package scxf6;
/**
 * consumer class
 * Continuous water injection results in multiple injections or multiple draws
 */
public class Consumer implements Runnable {
    //Define resource sharing objects
    private Resource resource;
    //Pass in the resource sharing object through the no-argument constructor
    public Consumer(Resource resource) {
        this.resource = resource;
    }
    @Override
    public void run() {
        //continuous pumping
        while (true) {
            try {
                resource.delete();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2.4 Create a test class and create multiple threads for testing

package scxf6;
/**
 * The result is multiple fills or multiple draws
 */
public class TestMain {
    public static void main(String[] args) {
        //Create a resource sharing object
        Resource rs = new Resource();
        //Create producer object
        Producre sc = new Producre(rs);
        //Create consumer object
        Consumer xf = new Consumer(rs);
        //Create producer thread
        Thread scthread1 = new Thread(sc);
        Thread scthread2 = new Thread(sc);
        //Create a consumer thread
        Thread xfthread1 = new Thread(xf);
        Thread xfthread2 = new Thread(xf);
        //Name each thread
        scthread1.setName("producer one:");
        scthread2.setName("producer two");
        xfthread1.setName("consumer one:");
        xfthread2.setName("consumer two:");
        //Start the producer thread
        scthread1.start();
        scthread2.start();
        //start consumer thread
        xfthread1.start();
        xfthread2.start();
    }
}

2.5 Test results:

Summarize:
In the above single-threaded production and consumption model, if multiple threads are directly added to the test class, there will be multiple water pumping or multiple water injection. Reason: The awakened thread may be its own peer.
Solution: Change the if statement that judges whether there is water to a while statement.
As a result, a new problem appeared again, resulting in a deadlock. Reason: all threads are in a waiting state, and there are no executable threads.
Solution: replace notify() with notifyAll(). Wake up all threads every time, even if it is your own companion, it will re-judgment, the companion will continue to wait, and the other party will perform the operation.

3. Use the Condition interface instead of the wait and wake up mechanism
Why use the Condition interface instead of the above example?
In the multi-threaded production and consumption process, in order to ensure that all threads are not waited (), use notifyAll() when waking up. Although it can wake up all the threads to ensure that there are threads performing operations, it may wake up its peers every time, so that their peers must be in the Waiting state, this execution efficiency is too low.

After JDK5, the waiting and waking up of a synchronization lock can distinguish whether the current thread is generated or consumed.

 void	await()Causes the current thread to wait until it receives a signal or is interrupted.
 void	signal()Wake up a waiting thread.
 void	signalAll() Wake up all waiting threads.

Note: If you use the Condition interface, you must use the Lock interface. Only use the Lock interface synchronously, and wait and wake up to use the Condition interface.

3.1 Create a resource sharing class and create a Lock interface as a synchronization lock

package scxf7;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * resource sharing class
 */
public class Resource {
    //Save the shared resource array
    private Object obj[] = new Object[1];
    //Record production and consumption times
    private int num = 1;
    //Create the Lock interface as a synchronization lock
    private Lock lock = new ReentrantLock();
    //Responsible for monitoring water injection
    private Condition scCondition = lock.newCondition();
    //Responsible for monitoring pumping
    private Condition xfCondition = lock.newCondition();
    //Create a producer
    public void add() {
        try {
            lock.lock();
            while (obj[0] != null) {
                scCondition.await();
            }
            obj[0] = "water" + num;
            System.out.println(Thread.currentThread().getName() + "is pouring water" + obj[0]);
            num++;
            xfCondition.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    //create consumer
    public void delete() {
        try {
            lock.lock();
            while (obj[0] == null) {
                xfCondition.await();
            }
            //obj[0] = "water" + num;
            System.out.println(Thread.currentThread().getName() + "is pumping" + obj[0]);
            obj[0] = null;
            scCondition.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

3.2 Create a resource sharing producer class and implement the Runnable interface

package scxf7;
/**
 * producer class
 * Continuous water injection results in multiple injections or multiple draws
 */
public class Producre implements Runnable {
    //Define resource sharing objects
    private Resource resource;
    //Loading resource sharing objects through the no-argument constructor
    public Producre(Resource resource) {
        this.resource = resource;
    }
    @Override
    public void run() {
        //Continuous water injection
        while (true) {
                resource.add();
        }
    }
}

3.3 Create a resource sharing consumer class and implement the Runnable interface

package scxf7;
/**
 * consumer class
 * Continuous water injection results in multiple injections or multiple draws
 */
public class Consumer implements Runnable {
    //Define resource sharing objects
    private Resource resource;
    //Pass in the resource sharing object through the no-argument constructor
    public Consumer(Resource resource) {
        this.resource = resource;
    }
    @Override
    public void run() {
        //continuous pumping
        while (true) {
            resource.delete();
        }
    }
}

3.4 Create a test class and create multiple threads for testing

package scxf7;
/**
 * The result is multiple fills or multiple draws
 */
public class TestMain {
    public static void main(String[] args) {
        //Create a resource sharing object
        Resource rs = new Resource();
        //Create producer object
        Producre sc = new Producre(rs);
        //Create consumer object
        Consumer xf = new Consumer(rs);
        //Create producer thread
        Thread scthread1 = new Thread(sc);
        Thread scthread2 = new Thread(sc);
        //Create a consumer thread
        Thread xfthread1 = new Thread(xf);
        Thread xfthread2 = new Thread(xf);
        //Name each thread
        scthread1.setName("producer one:");
        scthread2.setName("producer two");
        xfthread1.setName("consumer one:");
        xfthread2.setName("consumer two:");
        //Start the producer thread
        scthread1.start();
        scthread2.start();
        //start consumer thread
        xfthread1.start();
        xfthread2.start();
    }
}

Test Results:

Summarize:
If we do not use Lock, we will use synchronized code (synchronized) to achieve thread synchronization.
To use synchronous code, you need to use the wait(), notify(), notifyAll() methods provided by Object. The disadvantage is that although the notifyAll() provided by Object can wake up all threads, it may also wake up the peers and make them enter the waiting state, which reduces the execution efficiency.
Using the methods provided by the Condition interface await() , signal() , signalAll() they will only wake up the other thread, not the companion thread, which greatly improves the execution efficiency.
The Condition interface requires the Lock interface object newCondition() to create the Condition interface object when using it, so the Lock interface synchronization method is used instead of the synchronization code method.

What is the difference between sleep and wait?
sleep inherits the Thread class, and wait inherits the Object class.
sleep relies on the system clock and CPU scheduling mechanism, and wait calls the notify() and notifyAll() methods to wake up the thread.
sleep does not release the acquired lock resources, and wait releases the acquired lock resources.

What is the difference between notify() and notifyAll()?
notify() wakes up a random thread, notifyAll() wakes up all threads.
notify() may cause deadlock, notifyAll() will not.

Tags: Java

Posted by mogen on Wed, 18 May 2022 17:36:42 +0300