disruptor (the most complete in History)

Lock Free framework series:

1 what is a disruptor?

Disruptor is a high-performance queue developed by LMAX, a British foreign exchange trading company. Its original intention is to solve the delay problem of memory queue (it is found in the performance test that it is in the same order of magnitude as I/O operation).

The system developed based on Disruptor can support 6 million orders per second in a single thread. After the QCon speech in 2010, it won the attention of the industry. In 2011, Martin Fowler, an expert in enterprise application software, specially wrote a long introduction. In the same year, it also won the official Duke award of Oracle.

At present, many well-known projects, including Apache Storm, Camel and Log4j 2, have applied Disruptor to obtain high performance.

It should be noted that the queues mentioned here are internal memory queues rather than distributed queues such as Kafka. In addition, the Disruptor feature described in this article is limited to 3.3.4.

2. Problems of Java built-in queue

Before introducing Disruptor, let's take a look at the problems with the commonly used thread safe built-in queue. The built-in queue of Java is shown in the following table.

queue Boundedness lock data structure
ArrayBlockingQueue bounded Lock arraylist
LinkedBlockingQueue optionally-bounded Lock linkedlist
ConcurrentLinkedQueue unbounded No lock linkedlist
LinkedTransferQueue unbounded No lock linkedlist
PriorityBlockingQueue unbounded Lock heap
DelayQueue unbounded Lock heap

The bottom layer of the queue is generally divided into three types: array, linked list and heap. In general, heap is to realize the queue with priority characteristics, which is not considered for the time being.

From the two data structures of array and linked list, the typical queue based on array thread safety is ArrayBlockingQueue, which mainly ensures thread safety by locking; Thread safety queues based on linked lists are divided into two categories: LinkedBlockingQueue and ConcurrentLinkedQueue. The former also realizes thread safety through locking, while the latter and the LinkedTransferQueue in the above table are realized through the unlocked method of atomic variable compare and swap (hereinafter referred to as "CAS").

However, there is a problem of pseudo sharing when performing CAS operations on variables of volatile type. For details, please refer to the special article:

Pseudo sharing (illustration)

Disruptor uses a scheme similar to the above to solve the problem of pseudo sharing.

3 how does the disruptor framework solve the problem of pseudo sharing?

There is an important class Sequence in the Disruptor, which packages a volatile modified long type data value. Whether it is the array based buffer RingBuffer in the Disruptor, or the producer and consumer, they all have their own independent sequences. In the RingBuffer buffer buffer, the Sequence indicates the write progress. For example, every time the producer wants to write data into the buffer, it needs to call RingBuffer Next() to get the next available relative position. For producers and consumers, Sequence indicates their event Sequence number. Let's take a look at the source code of Sequence class:

  class LhsPadding {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
	protected volatile long value;
}

class RhsPadding extends Value {
	protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static {
		UNSAFE = Util.getUnsafe();
		try {
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		} catch(final Exception e) {
			 throw new RuntimeException(e);
		}
	}
	

​```
public Sequence() {
	this(INITIAL_VALUE);
}

public Sequence(final long initialValue) {
	UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
​```

}

From lines 1 to 11, we can see that the front and back spaces of the actually used variable value are filled by 8 long variables. For a cache line with a size of 64 bytes, it is just filled up (a long variable value, 8 bytes plus the front / rear 7long variables, 7 * 8 = 56, 56 + 8 = 64 bytes). In this way, each time the variable value is read into the cache, the cache line can be filled (for the cache line with the size of 64 bytes, if the cache line size is greater than 64 bytes, the pseudo sharing problem will still occur), so as to ensure that there will be no conflict with other variables each time the data is processed.

Usage scenario of Disruptor

The most commonly used scenario of Disruptor is the "producer consumer" scenario, and the scenario of "one producer, multiple consumers" is required to be processed in sequence.

Currently, the industry's open-source components using Disruptor include Log4j2, Apache Storm, etc. it can be used as a high-performance bounded memory queue to realize that one / more producers correspond to multiple consumers based on the producer consumer model. It can also be considered as an implementation of the observer pattern, or publish subscribe pattern.

For example, we read data sequentially from the BigLog file of MySQL, and then write it to ElasticSearch (search engine). In this scenario, BigLog requires a file and a producer, which is a producer. Writing to ElasticSearch requires strict order, otherwise there will be problems. Therefore, multi consumer threads in the general sense cannot solve this problem. If locking is adopted, the performance will be greatly reduced.

Actual combat: examples of the use of Disruptor

Let's start with a simple example of Disruptor: the producer passes a long value to the consumer, and the consumer consumes the data only by printing it out.

Define an Event

First, define an Event to contain the data to be transferred:

public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 

We also declared an EventFactory to instantiate the Event object because we need the Disruptor to create events for us.

public class LongEventFactory implements EventFactory { 
    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 

Define the event handler (the method that the disruptor will call back to this handler)

We also need an event consumer, that is, an event handler. The event processor simply prints the data stored in the event to the terminal:

/** 
 */public class LongEventHandler implements EventHandler<LongEvent> { 
    @Override 
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { 
        System.out.println(longEvent.getValue()); 
    } 
} 

Define event source: event publisher publishes events

Events will have a source that generates events. In this example, it is assumed that the event is triggered when the disk IO or network reads data. The event source uses a ByteBuffer to simulate the data it receives, that is, the event source will trigger the event when IO reads some data (the trigger event is not automatic, and the programmer needs to trigger the event and publish it when reading data):

public class LongEventProducer { 
    private final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { 
        this.ringBuffer = ringBuffer; 
    } 
 
    /** 
     * onData It is used to publish events. Events are published every time they are called 
     * The parameters passed to the consumer through it 
     * 
     * @param bb 
     */public void onData(ByteBuffer bb) { 
            //You can think of ringBuffer as an event queue, so next is to get the following event slot
            long sequence = ringBuffer.next();
            
        try { 
            //Use the above index to take out an empty event for filling 
            LongEvent event = ringBuffer.get(sequence);// for the sequence 
            event.setValue(bb.getLong(0)); 
        } finally { 
            //Publish event 
            ringBuffer.publish(sequence); 
        } 
    } 
} 

Obviously, when publishing events with a simple queue, more details will be involved, because the event object needs to be created in advance.

Publishing an event requires at least two steps:

Get the next event slot and publish the event (try / finally should be used when publishing the event to ensure that the event will be published).

If we use ringbuffer If next () gets an event slot, you must publish the corresponding event. If the event cannot be published, it will cause confusion in the state of the Disruptor. Especially in the case of multiple event producers, it will cause event consumers to stall, so they have to restart the application to recover.

Disruptor 3.0 provides a lambda API. In this way, some complex operations can be placed in the Ring Buffer, so in the disruptor3 For versions after 0, it is best to use Event Publisher or event translator to publish events.

Disruptor3. Event converter after 0 (fill in the business data of the event)

public class LongEventProducerWithTranslator { 
    //A translator can be regarded as an event initializer, which will be called by the publicEvent method
    //Fill Event
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = 
            new EventTranslatorOneArg<LongEvent, ByteBuffer>() { 
                public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { 
                    event.setValue(bb.getLong(0)); 
                } 
            };
            
    private final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { 
        this.ringBuffer = ringBuffer; 
    } 
 
    public void onData(ByteBuffer bb) { 
        ringBuffer.publishEvent(TRANSLATOR, bb); 
    } 
} 

Another advantage of the above method is that the Translator can be separated and easier to unit test. Disruptor provides different interfaces (eventtranslator, eventtranslator onearg, eventtranslator twoarg, etc.) to generate a Translator object. Obviously, the parameters of the method in Translator are passed through RingBuffer.

Assemble

The last step is to combine all the code to complete a complete event processing system. Disruptor simplifies this aspect and uses DSL style code (in fact, it is written in an intuitive way, which can not be regarded as a real DSL). Although the writing method of DSL is relatively simple, it does not provide all options. If you rely on DSL, you can handle most situations.

Note: instead of using a time converter, a simple event publisher is used.

public class LongEventMain { 
    public static void main(String[] args) throws InterruptedException { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // The factory for the event 
        LongEventFactory factory = new LongEventFactory();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;
        // Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
        
        // Connect the handler 
        disruptor.handleEventsWith(new LongEventHandler());
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) { 
            bb.putLong(0, l); 
            
            //Publish event 
            producer.onData(bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

Using Disruptor in Java 8

Disruptor has added support for Java 8 Lambda in its own interface. Most interfaces in the disruptor meet the requirements of Functional Interface (that is, there is only one method in the interface). Therefore, in disruptor, Lambda can be widely used instead of custom classes.

public class LongEventMainJava8 { 
    /** 
     * Register EventHandler and eventproducer with lambda expressions 
     * @param args 
     * @throws InterruptedException 
     */public static void main(String[] args) throws InterruptedException { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;// Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
        // You can use lambda to register an EventHandler 
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue()));
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++) { 
            bb.putLong(0, l); 
            ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

Since the method reference is also a lambda in Java 8, you can also change the above code to the following code:

public class LongEventWithMethodRef { 
    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) 
    { 
        System.out.println(event.getValue()); 
    } 
 
    public static void translate(LongEvent event, long sequence, ByteBuffer buffer) 
    { 
        event.setValue(buffer.getLong(0)); 
    } 
 
    public static void main(String[] args) throws Exception 
    { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;
        // Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
        // Connect the handler 
        disruptor.handleEventsWith(LongEventWithMethodRef::handleEvent);
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) 
        { 
            bb.putLong(0, l); 
            ringBuffer.publishEvent(LongEventWithMethodRef::translate, bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

How does Disruptor achieve high performance?

The high performance of Disruptor mainly reflects the removal of locks, the use of CAS algorithm, and the internal implementation of bounded queues through ring queues.

  • Ring data structure
    To avoid garbage collection, use arrays instead of linked lists. At the same time, the array is more friendly to the cache mechanism of the processor.

  • Element location
    The length of the array is 2^n. the positioning speed is accelerated through bit operation. Subscripts are incremented. Don't worry about index overflow. The index is a long type. Even if the processing speed of 1 million QPS, it will take 300000 years to run out.

  • Lockless design
    Each producer or consumer thread will first apply for the position of operable elements in the array, and then directly write or read data at that position. The whole process ensures the thread safety of operation through atomic variable CAS.

The use of Disruptor is mainly used in scenes with high performance requirements and low latency. It "drains" the performance of the machine in exchange for high processing performance. If your project has high performance requirements and low latency requirements, and needs a lockless bounded queue to implement the producer / consumer model, then Disruptor is your best choice.

Principle: internal Ring Buffer ring queue of Disruptor

What is RingBuffer

RingBuffer is a ring (end-to-end ring) used as a buffer to transfer data between different contexts (threads).
RingBuffer has a sequence number that points to the next available element in the array.

The advantages of using ring queues for Disruptor:

The Disruptor framework is a memory queue that uses CAS operation. Different from ordinary queues, the Disruptor framework uses a ring queue based on array. CAS operation is used whether the producer submits tasks to the buffer or the consumer obtains tasks from the buffer for execution.

Advantages of using ring queues:

First, it simplifies the complexity of multithreading synchronization. When learning the data structure, the implementation of the queue requires two pointers head and tail to point to the head and tail of the queue respectively. For general queues, imagine that if multiple producers submit tasks to the buffer queue at the same time, and the tail pointer needs to be modified after a producer submits a new task, then multiple producers submit tasks, the head pointer will not be modified, but the tail pointer will conflict, For example, a producer P1 needs to write. After obtaining the object value V pointed to by the tail pointer, the tail pointer is modified by another producer P2 before executing the compareAndSet() method. At this time, producer P1 executes the compareAndSet() method and finds that the value V pointed to by the tail pointer is different from the expected value E, resulting in a conflict. Similarly, if multiple consumers continue to obtain tasks from the buffer, the tail pointer will not be modified, but the queue head pointer will conflict (because of the FIFO characteristics of the queue, the queue will start from the beginning).

A characteristic of ring queue is that there is only one pointer, and only one pointer is used to realize the out of column and in column operations. If you use two pointers head and tail to manage the queue, there may be a "pseudo sharing" problem (the pseudo sharing problem will be described in detail below), because when creating the queue, the head and tail pointer variables are often in the same cache line, and the pseudo sharing problem is easy to occur when multiple threads modify the variables in the same cache line.

Second, because the ring queue is used, the size of the queue is fixed when it is created. The ring queue in the Disruptor framework is originally implemented based on the array. Using the array reduces the pressure of the system on memory space management, because unlike the linked list, Java will periodically recycle some objects that are no longer referenced in the linked list, and the array will not have the problem of new allocation and recycling of space.

Principle: waiting strategy of Disruptor

The default waiting strategy of the Disruptor is BlockingWaitStrategy. The internal application of this strategy is a lock and condition variable to control the execution and waiting of threads (the basic synchronization method of Java). Blocking waitstrategy is the slowest waiting strategy, but it is also the option with the lowest CPU utilization and the most stable. However, options can be tailored to different deployment environments to improve performance.

SleepingWaitStrategy

Like BlockingWaitStrategy, the CPU utilization of SleepingWaitStrategy is also relatively low. Its way is to wait in a loop and call locksupport in the middle of the loop Parknanos (1) to sleep, (on Linux system, the sleep time is 60 µ s) However, its advantage is that the production thread only needs to count without executing any instructions. And there is no consumption of conditional variables. However, the delay in the delivery of event objects from producers to consumers increases. SleepingWaitStrategy is best used when low latency is not required and event publishing has little impact on producers. For example, the asynchronous log function.

YieldingWaitStrategy

YieldingWaitStrategy is one of two strategies that can be used in low latency systems. This strategy not only reduces the system latency, but also increases the amount of CPU computation. The YieldingWaitStrategy policy will loop and wait for the sequence to increase to the appropriate value. Thread Yield () allows other prepared threads to execute. If high performance is required and there are fewer event consumer threads than the logical kernel, the YieldingWaitStrategy strategy is recommended. For example, when the thread is turned on.

BusySpinW4aitStrategy

BusySpinWaitStrategy is the waiting strategy with the highest performance and the strategy with the highest requirements for the deployment environment. This performance is best used when the number of event processing threads is smaller than the number of physical cores. For example, when hyper threading technology is disabled.

Principle: parallel mode

Single writer mode

One of the best ways to improve performance in concurrent systems is the single writer principle, which is also applicable to Disruptor. If there is only one event producer in your code, you can set it to single producer mode to improve the performance of the system.

public class singleProductorLongEventMain { 
    public static void main(String[] args) throws Exception { 
        //.....// Construct the Disruptor with a SingleProducerSequencer 
 
        Disruptor<LongEvent> disruptor = new Disruptor(factory, 
                bufferSize, 
                ProducerType.SINGLE, // Single writer mode, 
                executor);//..... 
    } 
} 

One time production, serial consumption

For example, to trigger a registered Event, you need a Handler to store information, a Hanlder to send e-mail, and so on.

/**
  * Serial sequential execution
  * <br/>
  * p --> c11 --> c21
  * @param disruptor
  */
 public static void serial(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());
     disruptor.start();
 }

Diamond execution

 public static void diamond(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());
     disruptor.start();
 }

Chain parallel computing

 public static void chain(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());
     disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());
     disruptor.start();
 }

Mutual isolation mode

 public static void parallelWithPool(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());
     disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());
     disruptor.start();
 }

Channel mode

It is executed serially, and there are two instances of C11 and C21 respectively

/**
  * It is executed serially, and there are two instances of C11 and C21 respectively
   * <br/>
   * p --> c11 --> c21
   * @param disruptor
   */
  public static void serialWithPool(Disruptor<LongEvent> disruptor){
      disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());
      disruptor.start();
  }

go back to ◀ Crazy maker circle

Crazy maker circle - Java high concurrency learning community, opening the door to big factories for everyone

Posted by sps on Mon, 09 May 2022 17:16:22 +0300